diff --git a/code/common/process/java/nu/marginalia/process/control/FakeProcessHeartbeat.java b/code/common/process/java/nu/marginalia/process/control/FakeProcessHeartbeat.java index 95d4345b..7b12b07c 100644 --- a/code/common/process/java/nu/marginalia/process/control/FakeProcessHeartbeat.java +++ b/code/common/process/java/nu/marginalia/process/control/FakeProcessHeartbeat.java @@ -3,6 +3,8 @@ package nu.marginalia.process.control; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; + /** Dummy implementation of ProcessHeartbeat that does nothing */ public class FakeProcessHeartbeat implements ProcessHeartbeat { private static final Logger logger = LoggerFactory.getLogger(FakeProcessHeartbeat.class); @@ -30,6 +32,11 @@ public class FakeProcessHeartbeat implements ProcessHeartbeat { logger.info("Progress: {}, {}/{}", step, progress, total); } + @Override + public Iterable wrap(String step, Collection collection) { + return collection; + } + @Override public void close() {} }; diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java b/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java index e4af6bcd..7ad81a3b 100644 --- a/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java +++ b/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java @@ -1,7 +1,12 @@ package nu.marginalia.process.control; +import java.util.Collection; + public interface ProcessAdHocTaskHeartbeat extends AutoCloseable { void progress(String step, int progress, int total); + /** Wrap a collection to provide heartbeat progress updates as it's iterated through */ + Iterable wrap(String step, Collection collection); + void close(); } diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java b/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java index 9c0eeddf..1faf72f0 100644 --- a/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java +++ b/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java @@ -7,6 +7,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.SQLException; +import java.util.Collection; +import java.util.Iterator; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -69,6 +71,35 @@ public class ProcessAdHocTaskHeartbeatImpl implements AutoCloseable, ProcessAdHo logger.info("ProcessTask {} progress: {}%", taskBase, progress); } + /** Wrap a collection to provide heartbeat progress updates as it's iterated through */ + @Override + public Iterable wrap(String step, Collection collection) { + return () -> new Iterator<>() { + private final Iterator base = collection.iterator(); + private final int size = collection.size(); + private final int updateInterval = Math.max(1, size / 100); // update every 1% of the collection, or at least once + private int pos = 0; + + @Override + public boolean hasNext() { + boolean ret = base.hasNext(); + if (!ret) { + progress(step, size, size); + } + return ret; + } + + @Override + public T next() { + // update every 1% of the collection, to avoid hammering the database with updates + if (pos++ % updateInterval == 0) { + progress(step, pos, size); + } + return base.next(); + } + }; + } + public void shutDown() { if (!running) return; @@ -185,6 +216,5 @@ public class ProcessAdHocTaskHeartbeatImpl implements AutoCloseable, ProcessAdHo public void close() { shutDown(); } - } diff --git a/code/common/service/java/nu/marginalia/service/ServiceId.java b/code/common/service/java/nu/marginalia/service/ServiceId.java index 2c09dff4..664c146a 100644 --- a/code/common/service/java/nu/marginalia/service/ServiceId.java +++ b/code/common/service/java/nu/marginalia/service/ServiceId.java @@ -13,7 +13,10 @@ public enum ServiceId { Dating("dating-service"), Status("setatus-service"), - Explorer("explorer-service"); + Explorer("explorer-service"), + + NOT_A_SERVICE("NOT_A_SERVICE") + ; public final String serviceName; diff --git a/code/execution/build.gradle b/code/execution/build.gradle index 4a28f540..288e7caf 100644 --- a/code/execution/build.gradle +++ b/code/execution/build.gradle @@ -17,6 +17,7 @@ dependencies { implementation project(':code:processes:website-adjacencies-calculator') implementation project(':code:processes:crawling-process') + implementation project(':code:processes:live-crawler') implementation project(':code:processes:loading-process') implementation project(':code:processes:converting-process') implementation project(':code:processes:index-constructor-process') diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActor.java b/code/execution/java/nu/marginalia/actor/ExecutorActor.java index e3b2308b..3fe1c92f 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActor.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActor.java @@ -2,12 +2,14 @@ package nu.marginalia.actor; public enum ExecutorActor { CRAWL, + LIVE_CRAWL, RECRAWL, RECRAWL_SINGLE_DOMAIN, CONVERT_AND_LOAD, PROC_CONVERTER_SPAWNER, PROC_LOADER_SPAWNER, PROC_CRAWLER_SPAWNER, + PROC_LIVE_CRAWL_SPAWNER, MONITOR_PROCESS_LIVENESS, MONITOR_FILE_STORAGE, ADJACENCY_CALCULATION, diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java index 86f99455..e5a6fc5a 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java @@ -36,10 +36,12 @@ public class ExecutorActorControlService { ConvertActor convertActor, ConvertAndLoadActor convertAndLoadActor, CrawlActor crawlActor, + LiveCrawlActor liveCrawlActor, RecrawlSingleDomainActor recrawlSingleDomainActor, RestoreBackupActor restoreBackupActor, ConverterMonitorActor converterMonitorFSM, CrawlerMonitorActor crawlerMonitorActor, + LiveCrawlerMonitorActor liveCrawlerMonitorActor, LoaderMonitorActor loaderMonitor, ProcessLivenessMonitorActor processMonitorFSM, FileStorageMonitorActor fileStorageMonitorActor, @@ -61,6 +63,7 @@ public class ExecutorActorControlService { this.node = baseServiceParams.configuration.node(); register(ExecutorActor.CRAWL, crawlActor); + register(ExecutorActor.LIVE_CRAWL, liveCrawlActor); register(ExecutorActor.RECRAWL_SINGLE_DOMAIN, recrawlSingleDomainActor); register(ExecutorActor.CONVERT, convertActor); @@ -71,6 +74,7 @@ public class ExecutorActorControlService { register(ExecutorActor.PROC_CONVERTER_SPAWNER, converterMonitorFSM); register(ExecutorActor.PROC_LOADER_SPAWNER, loaderMonitor); register(ExecutorActor.PROC_CRAWLER_SPAWNER, crawlerMonitorActor); + register(ExecutorActor.PROC_LIVE_CRAWL_SPAWNER, liveCrawlerMonitorActor); register(ExecutorActor.MONITOR_PROCESS_LIVENESS, processMonitorFSM); register(ExecutorActor.MONITOR_FILE_STORAGE, fileStorageMonitorActor); diff --git a/code/execution/java/nu/marginalia/actor/proc/LiveCrawlerMonitorActor.java b/code/execution/java/nu/marginalia/actor/proc/LiveCrawlerMonitorActor.java new file mode 100644 index 00000000..5b0eb69a --- /dev/null +++ b/code/execution/java/nu/marginalia/actor/proc/LiveCrawlerMonitorActor.java @@ -0,0 +1,29 @@ +package nu.marginalia.actor.proc; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqapi.ProcessInboxNames; +import nu.marginalia.process.ProcessService; +import nu.marginalia.service.module.ServiceConfiguration; + +@Singleton +public class LiveCrawlerMonitorActor extends AbstractProcessSpawnerActor { + + @Inject + public LiveCrawlerMonitorActor(Gson gson, + ServiceConfiguration configuration, + MqPersistence persistence, + ProcessService processService) { + super(gson, + configuration, + persistence, + processService, + ProcessInboxNames.LIVE_CRAWLER_INBOX, + ProcessService.ProcessId.LIVE_CRAWLER); + } + + +} diff --git a/code/execution/java/nu/marginalia/actor/task/LiveCrawlActor.java b/code/execution/java/nu/marginalia/actor/task/LiveCrawlActor.java new file mode 100644 index 00000000..6752989a --- /dev/null +++ b/code/execution/java/nu/marginalia/actor/task/LiveCrawlActor.java @@ -0,0 +1,110 @@ +package nu.marginalia.actor.task; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.IndexLocations; +import nu.marginalia.actor.ExecutorActor; +import nu.marginalia.actor.ExecutorActorStateMachines; +import nu.marginalia.actor.prototype.RecordActorPrototype; +import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.crawling.LiveCrawlRequest; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.ProcessService; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageId; +import nu.marginalia.storage.model.FileStorageType; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.util.List; + +@Singleton +public class LiveCrawlActor extends RecordActorPrototype { + + // STATES + private final ActorProcessWatcher processWatcher; + private final MqOutbox mqLiveCrawlerOutbox; + private final FileStorageService storageService; + private final ExecutorActorStateMachines executorActorStateMachines; + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + public record Initial() implements ActorStep {} + public record LiveCrawl(FileStorageId id, long msgId) implements ActorStep { + public LiveCrawl(FileStorageId id) { this(id, -1); } + } + + @Override + public ActorStep transition(ActorStep self) throws Exception { + logger.info("{}", self); + return switch (self) { + case Initial() -> { + // clear the output directory of the loader from any debris from partial jobs that have been aborted + Files.list(IndexLocations.getIndexConstructionArea(storageService)).forEach(path -> { + try { + if (Files.isDirectory(path)) { + FileUtils.deleteDirectory(path.toFile()); + } + else if (Files.isRegularFile(path)) { + Files.delete(path); + } + } catch (Exception e) { + logger.error("Error clearing staging area", e); + } + }); + + + List activeCrawlData = storageService.getActiveFileStorages(FileStorageType.CRAWL_DATA); + if (activeCrawlData.isEmpty()) { + var storage = storageService.allocateStorage(FileStorageType.CRAWL_DATA, "crawl-data", "Crawl data"); + yield new LiveCrawl(storage.id()); + } else { + yield new LiveCrawl(activeCrawlData.getFirst()); + } + } + case LiveCrawl(FileStorageId storageId, long msgId) when msgId < 0 -> { + long id = mqLiveCrawlerOutbox.sendAsync(new LiveCrawlRequest(storageId)); + yield new LiveCrawl(storageId, id); + } + case LiveCrawl(FileStorageId storageId, long msgId) -> { + var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessService.ProcessId.LIVE_CRAWLER, msgId); + + if (rsp.state() != MqMessageState.OK) { + yield new Error("Crawler failed"); + } + + executorActorStateMachines.initFrom(ExecutorActor.CONVERT_AND_LOAD, new ConvertAndLoadActor.Rerank()); + + yield new End(); + } + default -> new Error("Unknown state"); + }; + } + + @Override + public String describe() { + return "Process a set of crawl data and then load it into the database."; + } + + @Inject + public LiveCrawlActor(ActorProcessWatcher processWatcher, + ProcessOutboxes processOutboxes, + FileStorageService storageService, + Gson gson, + ExecutorActorStateMachines executorActorStateMachines) + { + super(gson); + this.processWatcher = processWatcher; + this.mqLiveCrawlerOutbox = processOutboxes.getLiveCrawlerOutbox(); + this.storageService = storageService; + this.executorActorStateMachines = executorActorStateMachines; + + } + + +} diff --git a/code/execution/java/nu/marginalia/process/ProcessOutboxes.java b/code/execution/java/nu/marginalia/process/ProcessOutboxes.java index 2c7e3fad..8600b86c 100644 --- a/code/execution/java/nu/marginalia/process/ProcessOutboxes.java +++ b/code/execution/java/nu/marginalia/process/ProcessOutboxes.java @@ -13,6 +13,7 @@ public class ProcessOutboxes { private final MqOutbox loaderOutbox; private final MqOutbox crawlerOutbox; private final MqOutbox indexConstructorOutbox; + private final MqOutbox liveCrawlerOutbox; @Inject public ProcessOutboxes(BaseServiceParams params, MqPersistence persistence) { @@ -44,6 +45,14 @@ public class ProcessOutboxes { params.configuration.node(), params.configuration.instanceUuid() ); + + liveCrawlerOutbox = new MqOutbox(persistence, + ProcessInboxNames.LIVE_CRAWLER_INBOX, + params.configuration.node(), + params.configuration.serviceName(), + params.configuration.node(), + params.configuration.instanceUuid() + ); } @@ -60,4 +69,6 @@ public class ProcessOutboxes { } public MqOutbox getIndexConstructorOutbox() { return indexConstructorOutbox; } + + public MqOutbox getLiveCrawlerOutbox() { return liveCrawlerOutbox; } } diff --git a/code/execution/java/nu/marginalia/process/ProcessService.java b/code/execution/java/nu/marginalia/process/ProcessService.java index 30f15f6e..89707f94 100644 --- a/code/execution/java/nu/marginalia/process/ProcessService.java +++ b/code/execution/java/nu/marginalia/process/ProcessService.java @@ -7,6 +7,7 @@ import nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator; import nu.marginalia.converting.ConverterMain; import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.index.IndexConstructorMain; +import nu.marginalia.livecrawler.LiveCrawlerMain; import nu.marginalia.loading.LoaderMain; import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.control.ServiceEventLog; @@ -50,6 +51,7 @@ public class ProcessService { public enum ProcessId { CRAWLER(CrawlerMain.class), + LIVE_CRAWLER(LiveCrawlerMain.class), CONVERTER(ConverterMain.class), LOADER(LoaderMain.class), INDEX_CONSTRUCTOR(IndexConstructorMain.class), @@ -64,6 +66,7 @@ public class ProcessService { List envOpts() { String variable = switch (this) { case CRAWLER -> "CRAWLER_PROCESS_OPTS"; + case LIVE_CRAWLER -> "LIVE_CRAWLER_PROCESS_OPTS"; case CONVERTER -> "CONVERTER_PROCESS_OPTS"; case LOADER -> "LOADER_PROCESS_OPTS"; case INDEX_CONSTRUCTOR -> "INDEX_CONSTRUCTION_PROCESS_OPTS"; diff --git a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java index c66e34c9..ad1e4559 100644 --- a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java +++ b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java @@ -19,7 +19,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Consumer; +import java.util.function.BiConsumer; @Singleton public class FeedsClient { @@ -28,7 +28,9 @@ public class FeedsClient { private final MqOutbox updateFeedsOutbox; @Inject - public FeedsClient(GrpcChannelPoolFactory factory, MqPersistence mqPersistence, ServiceConfiguration serviceConfiguration) { + public FeedsClient(GrpcChannelPoolFactory factory, + MqPersistence mqPersistence, + ServiceConfiguration serviceConfiguration) { // The client is only interested in the primary node var key = ServiceKey.forGrpcApi(FeedApiGrpc.class, ServicePartition.any()); @@ -51,10 +53,10 @@ public class FeedsClient { } } - public void getUpdatedDomains(Instant since, Consumer consumer) throws ExecutionException, InterruptedException { + public void getUpdatedDomains(Instant since, BiConsumer> consumer) throws ExecutionException, InterruptedException { channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getUpdatedLinks) .run(RpcUpdatedLinksRequest.newBuilder().setSinceEpochMillis(since.toEpochMilli()).build()) - .forEachRemaining(rsp -> consumer.accept(new UpdatedDomain(rsp))); + .forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()))); } public record UpdatedDomain(String domain, List urls) { diff --git a/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java b/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java index 53825ecf..c0999c96 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java @@ -1,7 +1,6 @@ package nu.marginalia.converting.processor; import com.google.inject.Inject; -import nu.marginalia.atags.AnchorTextKeywords; import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.atags.source.AnchorTagsSource; import nu.marginalia.atags.source.AnchorTagsSourceFactory; @@ -37,7 +36,6 @@ public class DomainProcessor { private final DocumentProcessor documentProcessor; private final SiteWords siteWords; private final AnchorTagsSource anchorTagsSource; - private final AnchorTextKeywords anchorTextKeywords; private final GeoIpDictionary geoIpDictionary; private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -46,12 +44,10 @@ public class DomainProcessor { public DomainProcessor(DocumentProcessor documentProcessor, SiteWords siteWords, AnchorTagsSourceFactory anchorTagsSourceFactory, - AnchorTextKeywords anchorTextKeywords, GeoIpDictionary geoIpDictionary) throws SQLException { this.documentProcessor = documentProcessor; this.siteWords = siteWords; - this.anchorTextKeywords = anchorTextKeywords; this.anchorTagsSource = anchorTagsSourceFactory.create(); this.geoIpDictionary = geoIpDictionary; diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDelayTimer.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDelayTimer.java index fa9a50b0..31925ce7 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDelayTimer.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDelayTimer.java @@ -19,7 +19,12 @@ public class CrawlDelayTimer { private final long delayTime; public CrawlDelayTimer(long delayTime) { - this.delayTime = delayTime; + if (delayTime <= 0) { + this.delayTime = DEFAULT_CRAWL_DELAY_MIN_MS; + } + else { + this.delayTime = delayTime; + } } /** Call when we've gotten an HTTP 429 response. This will wait a moment, and then @@ -41,6 +46,10 @@ public class CrawlDelayTimer { Thread.sleep(delay.toMillis()); } + public void waitFetchDelay() { + waitFetchDelay(0); + } + public void waitFetchDelay(long spentTime) { long sleepTime = delayTime; diff --git a/code/processes/live-crawler/build.gradle b/code/processes/live-crawler/build.gradle new file mode 100644 index 00000000..b60fd863 --- /dev/null +++ b/code/processes/live-crawler/build.gradle @@ -0,0 +1,78 @@ +plugins { + id 'java' + + id 'application' + id 'jvm-test-suite' +} + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion)) + } +} + +application { + mainClass = 'nu.marginalia.livecrawler.LiveCrawlerMain' + applicationName = 'live-crawler-process' +} + +tasks.distZip.enabled = false + +apply from: "$rootProject.projectDir/srcsets.gradle" + +dependencies { + implementation project(':code:common:process') + + implementation project(':code:common:db') + implementation project(':code:common:model') + implementation project(':code:common:config') + implementation project(':code:common:service') + implementation project(':code:common:linkdb') + implementation project(':code:functions:live-capture:api') + implementation project(':code:libraries:blocking-thread-pool') + implementation project(':code:index:api') + implementation project(':code:processes:process-mq-api') + implementation project(':code:libraries:message-queue') + implementation project(':code:libraries:language-processing') + implementation project(':code:libraries:easy-lsh') + implementation project(':code:processes:crawling-process') + implementation project(':code:processes:crawling-process:model') + implementation project(':code:processes:converting-process') + implementation project(':code:processes:loading-process') + + + implementation project(':code:processes:crawling-process:ft-crawl-blocklist') + implementation project(':code:processes:crawling-process:ft-link-parser') + implementation project(':code:processes:crawling-process:ft-content-type') + implementation project(':third-party:commons-codec') + + implementation libs.bundles.slf4j + + implementation libs.notnull + implementation libs.guava + implementation dependencies.create(libs.guice.get()) { + exclude group: 'com.google.guava' + } + implementation libs.gson + implementation libs.zstd + implementation libs.jwarc + implementation libs.crawlercommons + implementation libs.okhttp3 + implementation libs.jsoup + implementation libs.opencsv + implementation libs.fastutil + implementation libs.sqlite + + implementation libs.bundles.mariadb + + + testImplementation libs.bundles.slf4j.test + testImplementation libs.bundles.junit + testImplementation libs.mockito + testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') + testImplementation libs.commons.codec + testImplementation 'org.testcontainers:mariadb:1.17.4' + testImplementation 'org.testcontainers:junit-jupiter:1.17.4' + testImplementation project(':code:libraries:test-helpers') +} + diff --git a/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlDataSet.java b/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlDataSet.java new file mode 100644 index 00000000..252b063e --- /dev/null +++ b/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlDataSet.java @@ -0,0 +1,210 @@ +package nu.marginalia.livecrawler; + +import nu.marginalia.io.SerializableCrawlDataStream; +import nu.marginalia.model.EdgeUrl; +import nu.marginalia.model.crawldata.CrawledDocument; +import nu.marginalia.model.crawldata.CrawledDomain; +import nu.marginalia.model.crawldata.SerializableCrawlData; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +/** Data access object for the live crawl database, a simple sqlite file */ +public class LiveCrawlDataSet implements AutoCloseable { + private final Connection connection; + private final Path basePath; + + public LiveCrawlDataSet(Path basePath) throws SQLException { + this.basePath = basePath; + this.connection = DriverManager.getConnection("jdbc:sqlite:" + basePath.resolve("live-crawl-data.db")); + this.connection.setAutoCommit(true); + + try (var stmt = connection.createStatement()) { + stmt.execute("CREATE TABLE IF NOT EXISTS urls (url TEXT PRIMARY KEY, domainId LONG, body BLOB, headers BLOB, ip TEXT, timestamp long)"); + stmt.execute("CREATE INDEX IF NOT EXISTS domainIdIndex ON urls (domainId)"); + } + } + + public Path createWorkDir() throws IOException { + return Files.createTempDirectory(basePath, "work"); + } + + /** Remove entries older than the given timestamp */ + public void prune(Instant cutoff) throws SQLException { + try (var stmt = connection.prepareStatement("DELETE FROM urls WHERE timestamp < ?")) { + stmt.setLong(1, cutoff.toEpochMilli()); + stmt.executeUpdate(); + } + } + + /** Check if the given URL is already in the database */ + public boolean hasUrl(String url) throws SQLException { + try (var stmt = connection.prepareStatement("SELECT 1 FROM urls WHERE url = ?")) { + stmt.setString(1, url); + return stmt.executeQuery().next(); + } + } + + /** Check if the given URL is already in the database */ + public boolean hasUrl(EdgeUrl url) throws SQLException { + return hasUrl(url.toString()); + } + + /** Save a document to the database */ + public void saveDocument(int domainId, EdgeUrl url, String body, String headers, String ip) throws SQLException, IOException { + try (var stmt = connection.prepareStatement(""" + INSERT OR REPLACE INTO urls (domainId, url, body, headers, ip, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + """)) + { + stmt.setInt(1, domainId); + stmt.setString(2, url.toString()); + stmt.setBytes(3, compress(body)); + stmt.setBytes(4, compress(headers)); + stmt.setString(5, ip); + stmt.setLong(6, Instant.now().toEpochMilli()); + stmt.executeUpdate(); + } + } + + private byte[] compress(String data) throws IOException { + // gzip compression + try (var bos = new ByteArrayOutputStream(); + var gzip = new GZIPOutputStream(bos)) + { + gzip.write(data.getBytes()); + gzip.finish(); + return bos.toByteArray(); + } + } + + private String decompress(byte[] data) { + // gzip decompression + try (var bis = new ByteArrayInputStream(data); + var gzip = new GZIPInputStream(bis)) + { + return new String(gzip.readAllBytes()); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + public Collection getDataStreams() throws SQLException { + List domainIds = new ArrayList<>(); + + try (var stmt = connection.createStatement()) { + var rs = stmt.executeQuery("SELECT DISTINCT domainId FROM urls"); + while (rs.next()) { + domainIds.add(rs.getInt(1)); + } + } + + List streams = new ArrayList<>(); + for (var domainId : domainIds) { + streams.add(new DataStream(domainId)); + } + return streams; + } + + class DataStream implements SerializableCrawlDataStream { + private final int domainId; + private ArrayList data; + + DataStream(int domainId) { + this.domainId = domainId; + this.data = null; + } + + private void query() { + try (var stmt = connection.prepareStatement(""" + SELECT url, body, headers, ip, timestamp + FROM urls + WHERE domainId = ? + """)) { + stmt.setInt(1, domainId); + var rs = stmt.executeQuery(); + data = new ArrayList<>(); + while (rs.next()) { + String url = rs.getString("url"); + String body = decompress(rs.getBytes("body")); + String headers = decompress(rs.getBytes("headers")); + + data.add(new CrawledDocument( + "LIVE", + url, + "text/html", + Instant.ofEpochMilli(rs.getLong("timestamp")).toString(), + 200, + "OK", + "", + headers, + body, + body, + Integer.toString(body.hashCode()), + null, + "LIVE", + false, + "", + "" + )); + } + var last = data.getLast(); + var domain = new CrawledDomain( + last.getDomain(), + null, + "OK", + "", + "0.0.0.0", + List.of(), + List.of() + ); + + // Add the domain as the last element, which will be the first + // element popped from the list + data.addLast(domain); + } + catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public SerializableCrawlData next() throws IOException { + if (data == null) + query(); + + return data.removeLast(); + } + + @Override + public boolean hasNext() throws IOException { + if (data == null) { + query(); + } + return !data.isEmpty(); + } + + @Override + public void close() throws Exception { + data.clear(); + } + } + + @Override + public void close() throws Exception { + connection.close(); + } +} diff --git a/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlerMain.java b/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlerMain.java new file mode 100644 index 00000000..85d216e8 --- /dev/null +++ b/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlerMain.java @@ -0,0 +1,273 @@ +package nu.marginalia.livecrawler; + +import com.google.gson.Gson; +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import nu.marginalia.ProcessConfiguration; +import nu.marginalia.ProcessConfigurationModule; +import nu.marginalia.WmsaHome; +import nu.marginalia.api.feeds.FeedsClient; +import nu.marginalia.converting.ConverterModule; +import nu.marginalia.converting.processor.DomainProcessor; +import nu.marginalia.converting.writer.ConverterBatchWriter; +import nu.marginalia.db.DbDomainQueries; +import nu.marginalia.db.DomainBlacklist; +import nu.marginalia.io.SerializableCrawlDataStream; +import nu.marginalia.loading.LoaderInputData; +import nu.marginalia.loading.documents.DocumentLoaderService; +import nu.marginalia.loading.documents.KeywordLoaderService; +import nu.marginalia.loading.domains.DbDomainIdRegistry; +import nu.marginalia.loading.domains.DomainIdRegistry; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.mq.MessageQueueFactory; +import nu.marginalia.mq.MqMessage; +import nu.marginalia.mq.inbox.MqInboxResponse; +import nu.marginalia.mq.inbox.MqSingleShotInbox; +import nu.marginalia.mqapi.crawling.LiveCrawlRequest; +import nu.marginalia.process.control.ProcessHeartbeat; +import nu.marginalia.service.ProcessMainClass; +import nu.marginalia.service.module.DatabaseModule; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorage; +import nu.marginalia.storage.model.FileStorageId; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.security.Security; +import java.sql.SQLException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static nu.marginalia.mqapi.ProcessInboxNames.LIVE_CRAWLER_INBOX; + +public class LiveCrawlerMain extends ProcessMainClass { + + private static final Logger logger = + LoggerFactory.getLogger(LiveCrawlerMain.class); + + private final FeedsClient feedsClient; + private final Gson gson; + private final ProcessHeartbeat heartbeat; + private final DbDomainQueries domainQueries; + private final DomainBlacklist domainBlacklist; + private final MessageQueueFactory messageQueueFactory; + private final DomainProcessor domainProcessor; + private final FileStorageService fileStorageService; + private final KeywordLoaderService keywordLoaderService; + private final DocumentLoaderService documentLoaderService; + private final int node; + + @Inject + public LiveCrawlerMain(FeedsClient feedsClient, + Gson gson, + ProcessConfiguration config, + ProcessHeartbeat heartbeat, + DbDomainQueries domainQueries, + DomainBlacklist domainBlacklist, + MessageQueueFactory messageQueueFactory, + DomainProcessor domainProcessor, + FileStorageService fileStorageService, + KeywordLoaderService keywordLoaderService, + DocumentLoaderService documentLoaderService) + throws Exception + { + this.feedsClient = feedsClient; + this.gson = gson; + this.heartbeat = heartbeat; + this.domainQueries = domainQueries; + this.domainBlacklist = domainBlacklist; + this.messageQueueFactory = messageQueueFactory; + this.node = config.node(); + this.domainProcessor = domainProcessor; + this.fileStorageService = fileStorageService; + this.keywordLoaderService = keywordLoaderService; + this.documentLoaderService = documentLoaderService; + + domainBlacklist.waitUntilLoaded(); + } + + public static void main(String... args) throws Exception { + + // Prevent Java from caching DNS lookups forever (filling up the system RAM as a result) + Security.setProperty("networkaddress.cache.ttl", "3600"); + + // This must run *early* + System.setProperty("http.agent", WmsaHome.getUserAgent().uaString()); + + // If these aren't set properly, the JVM will hang forever on some requests + System.setProperty("sun.net.client.defaultConnectTimeout", "30000"); + System.setProperty("sun.net.client.defaultReadTimeout", "30000"); + + // We don't want to use too much memory caching sessions for https + System.setProperty("javax.net.ssl.sessionCacheSize", "2048"); + + try { + Injector injector = Guice.createInjector( + new LiveCrawlerModule(), + new ProcessConfigurationModule("crawler"), + new ConverterModule(), + new ServiceDiscoveryModule(), + new DatabaseModule(false) + ); + + var crawler = injector.getInstance(LiveCrawlerMain.class); + LiveCrawlInstructions instructions = crawler.fetchInstructions(); + + try (var dataset = crawler.openDataSet(instructions.liveDataFileStorageId)) { + crawler.run(dataset); + instructions.ok(); + } catch (Exception e) { + instructions.err(); + throw e; + } + + } catch (Exception e) { + logger.error("LiveCrawler failed", e); + + System.exit(1); + } + System.exit(0); + } + + enum LiveCrawlState { + PRUNE_DB, + FETCH_LINKS, + CRAWLING, + PROCESSING, + LOADING, + CONSTRUCTING, + DONE + } + + private LiveCrawlDataSet openDataSet(FileStorageId storageId) throws SQLException { + FileStorage storage = fileStorageService.getStorage(storageId); + return new LiveCrawlDataSet(storage.asPath()); + } + + private void run(LiveCrawlDataSet dataSet) throws Exception { + try (var state = heartbeat.createProcessTaskHeartbeat(LiveCrawlState.class, "LiveCrawler")) { + final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS); + + state.progress(LiveCrawlState.FETCH_LINKS); + + Map> urlsPerDomain = new HashMap<>(10_000); + feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put); + + logger.info("Fetched data for {} domains", urlsPerDomain.size()); + + state.progress(LiveCrawlState.PRUNE_DB); + + // Remove data that is too old + dataSet.prune(cutoff); + + state.progress(LiveCrawlState.CRAWLING); + + try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainQueries, domainBlacklist); + var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling")) + { + for (Map.Entry> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) { + EdgeDomain domain = new EdgeDomain(entry.getKey()); + List urls = entry.getValue(); + + fetcher.scheduleRetrieval(domain, urls); + } + } + + Path tempPath = dataSet.createWorkDir(); + + try { + state.progress(LiveCrawlState.PROCESSING); + + try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing"); + var writer = new ConverterBatchWriter(tempPath, 0) + ) { + for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) { + writer.write(domainProcessor.sideloadProcessing(stream, 0)); + } + } + + state.progress(LiveCrawlState.LOADING); + + LoaderInputData lid = new LoaderInputData(tempPath, 1); + + DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(domainQueries); + + keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid); + documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, lid); + + keywordLoaderService.close(); + + } finally { + FileUtils.deleteDirectory(tempPath.toFile()); + } + + // Construct the index + + state.progress(LiveCrawlState.DONE); + } + } + + private LiveCrawlInstructions fetchInstructions() throws Exception { + + var inbox = messageQueueFactory.createSingleShotInbox(LIVE_CRAWLER_INBOX, node, UUID.randomUUID()); + + logger.info("Waiting for instructions"); + + var msgOpt = getMessage(inbox, LiveCrawlRequest.class.getSimpleName()); + var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); + + // for live crawl, request is empty for now + LiveCrawlRequest request = gson.fromJson(msg.payload(), LiveCrawlRequest.class); + + return new LiveCrawlInstructions(msg, inbox, request.liveDataFileStorageId); + } + + + private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { + var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); + if (opt.isPresent()) { + if (!opt.get().function().equals(expectedFunction)) { + throw new RuntimeException("Unexpected function: " + opt.get().function()); + } + return opt; + } + else { + var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); + stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); + return stolenMessage; + } + } + + + private static class LiveCrawlInstructions { + private final MqMessage message; + private final MqSingleShotInbox inbox; + + public final FileStorageId liveDataFileStorageId; + + LiveCrawlInstructions(MqMessage message, + MqSingleShotInbox inbox, + FileStorageId liveDataFileStorageId) + { + this.message = message; + this.inbox = inbox; + this.liveDataFileStorageId = liveDataFileStorageId; + } + + + public void ok() { + inbox.sendResponse(message, MqInboxResponse.ok()); + } + public void err() { + inbox.sendResponse(message, MqInboxResponse.err()); + } + + } + +} diff --git a/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlerModule.java b/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlerModule.java new file mode 100644 index 00000000..6088fb5e --- /dev/null +++ b/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlerModule.java @@ -0,0 +1,49 @@ +package nu.marginalia.livecrawler; + +import com.google.inject.AbstractModule; +import com.google.inject.Inject; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.name.Names; +import nu.marginalia.IndexLocations; +import nu.marginalia.ProcessConfiguration; +import nu.marginalia.UserAgent; +import nu.marginalia.WmsaHome; +import nu.marginalia.linkdb.docs.DocumentDbWriter; +import nu.marginalia.service.ServiceId; +import nu.marginalia.service.module.ServiceConfiguration; +import nu.marginalia.storage.FileStorageService; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.SQLException; +import java.util.UUID; + +import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME; + +public class LiveCrawlerModule extends AbstractModule { + + public void configure() { + bind(UserAgent.class).toInstance(WmsaHome.getUserAgent()); + bind(Path.class).annotatedWith(Names.named("local-index-path")).toInstance(Path.of(System.getProperty("local-index-path", "/vol"))); + } + + @Inject + @Provides @Singleton + private DocumentDbWriter createLinkdbWriter(FileStorageService service) throws SQLException, IOException { + // Migrate + Path dbPath = IndexLocations.getLinkdbWritePath(service).resolve(DOCDB_FILE_NAME); + + if (Files.exists(dbPath)) { + Files.delete(dbPath); + } + return new DocumentDbWriter(dbPath); + } + + @Singleton + @Provides + public ServiceConfiguration provideServiceConfiguration(ProcessConfiguration processConfiguration) { + return new ServiceConfiguration(ServiceId.NOT_A_SERVICE, processConfiguration.node(), null, null, -1, UUID.randomUUID()); + } +} diff --git a/code/processes/live-crawler/java/nu/marginalia/livecrawler/SimpleLinkScraper.java b/code/processes/live-crawler/java/nu/marginalia/livecrawler/SimpleLinkScraper.java new file mode 100644 index 00000000..336e8f5e --- /dev/null +++ b/code/processes/live-crawler/java/nu/marginalia/livecrawler/SimpleLinkScraper.java @@ -0,0 +1,158 @@ +package nu.marginalia.livecrawler; + +import crawlercommons.robots.SimpleRobotRules; +import crawlercommons.robots.SimpleRobotRulesParser; +import nu.marginalia.WmsaHome; +import nu.marginalia.crawl.fetcher.HttpFetcherImpl; +import nu.marginalia.crawl.retreival.CrawlDelayTimer; +import nu.marginalia.db.DbDomainQueries; +import nu.marginalia.db.DomainBlacklist; +import nu.marginalia.link_parser.LinkParser; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.model.EdgeUrl; +import nu.marginalia.util.SimpleBlockingThreadPool; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** A simple link scraper that fetches URLs and stores them in a database, + * with no concept of a crawl frontier, WARC output, or other advanced features + */ +public class SimpleLinkScraper implements AutoCloseable { + private final SimpleBlockingThreadPool pool = new SimpleBlockingThreadPool("LiveCrawler", 32, 10); + private final LinkParser lp = new LinkParser(); + private final LiveCrawlDataSet dataSet; + private final DbDomainQueries domainQueries; + private final DomainBlacklist domainBlacklist; + private final Duration connectTimeout = Duration.ofSeconds(10); + private final Duration readTimeout = Duration.ofSeconds(10); + + public SimpleLinkScraper(LiveCrawlDataSet dataSet, + DbDomainQueries domainQueries, + DomainBlacklist domainBlacklist) { + this.dataSet = dataSet; + this.domainQueries = domainQueries; + this.domainBlacklist = domainBlacklist; + } + + public void scheduleRetrieval(EdgeDomain domain, List urls) { + + var id = domainQueries.tryGetDomainId(domain); + if (id.isEmpty() || domainBlacklist.isBlacklisted(id.getAsInt())) { + return; + } + + pool.submitQuietly(() -> retrieveNow(domain, id.getAsInt(), urls)); + } + + public void retrieveNow(EdgeDomain domain, int domainId, List urls) throws Exception { + try (HttpClient client = HttpClient + .newBuilder() + .connectTimeout(connectTimeout) + .followRedirects(HttpClient.Redirect.NEVER) + .version(HttpClient.Version.HTTP_2) + .build()) { + + EdgeUrl rootUrl = domain.toRootUrlHttps(); + + SimpleRobotRules rules = fetchRobotsRules(rootUrl, client); + + CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay()); + + for (var url : urls) { + Optional optParsedUrl = lp.parseLink(rootUrl, url); + if (optParsedUrl.isEmpty()) { + continue; + } + if (dataSet.hasUrl(optParsedUrl.get())) { + continue; + } + + EdgeUrl parsedUrl = optParsedUrl.get(); + if (!rules.isAllowed(url)) { + continue; + } + + fetchUrl(domainId, parsedUrl, timer, client); + } + } + } + + private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl, HttpClient client) throws IOException, InterruptedException, URISyntaxException { + var robotsRequest = HttpRequest.newBuilder(rootUrl.withPathAndParam("/robots.txt", null).asURI()) + .GET() + .header("User-Agent", WmsaHome.getUserAgent().uaString()) + .timeout(readTimeout); + + // Fetch the robots.txt + + SimpleRobotRulesParser parser = new SimpleRobotRulesParser(); + SimpleRobotRules rules = new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL); + HttpResponse robotsTxt = client.send(robotsRequest.build(), HttpResponse.BodyHandlers.ofByteArray()); + if (robotsTxt.statusCode() == 200) { + rules = parser.parseContent(rootUrl.toString(), + robotsTxt.body(), + robotsTxt.headers().firstValue("Content-Type").orElse("text/plain"), + WmsaHome.getUserAgent().uaIdentifier()); + } + + return rules; + } + + private void fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer, HttpClient client) throws Exception { + + timer.waitFetchDelay(); + + // Loop for HTTP 429 retries + for (int i = 0; i < 2; i++) { + HttpRequest request = HttpRequest.newBuilder(parsedUrl.asURI()) + .GET() + .header("User-Agent", WmsaHome.getUserAgent().uaString()) + .header("Accept", "text/html") + .timeout(readTimeout) + .build(); + + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() == 429) { + timer.waitRetryDelay(new HttpFetcherImpl.RateLimitException( + response.headers().firstValue("Retry-After").orElse("5") + )); + continue; + } + + String contentType = response.headers().firstValue("Content-Type").orElse("").toLowerCase(); + + if (response.statusCode() == 200 && contentType.startsWith("text/html")) { + dataSet.saveDocument(domainId, parsedUrl, response.body(), headersToString(response.headers()), ""); + } + + break; + } + } + + private String headersToString(HttpHeaders headers) { + StringBuilder headersStr = new StringBuilder(); + headers.map().forEach((k, v) -> { + headersStr.append(k).append(": ").append(v).append("\n"); + }); + return headersStr.toString(); + } + + @Override + public void close() throws Exception { + pool.shutDown(); + for (int i = 0; i < 4; i++) { + pool.awaitTermination(1, TimeUnit.HOURS); + } + pool.shutDownNow(); + } +} diff --git a/code/processes/live-crawler/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java b/code/processes/live-crawler/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java new file mode 100644 index 00000000..8753f2a0 --- /dev/null +++ b/code/processes/live-crawler/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java @@ -0,0 +1,33 @@ +package nu.marginalia.livecrawler; + +import nu.marginalia.model.EdgeUrl; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.nio.file.Files; +import java.nio.file.Path; + +public class LiveCrawlDataSetTest { + + @Test + public void testGetDataSet() throws Exception { + Path tempFile = Files.createTempFile("test", ".db"); + try { + LiveCrawlDataSet dataSet = new LiveCrawlDataSet(tempFile.toString()); + + Assertions.assertFalse(dataSet.hasUrl("https://www.example.com/")); + dataSet.saveDocument( + 1, + new EdgeUrl("https://www.example.com/"), + "test", + "test", + "test" + ); + Assertions.assertTrue(dataSet.hasUrl("https://www.example.com/")); + } + finally { + Files.delete(tempFile); + } + } + +} \ No newline at end of file diff --git a/code/processes/loading-process/java/nu/marginalia/loading/documents/KeywordLoaderService.java b/code/processes/loading-process/java/nu/marginalia/loading/documents/KeywordLoaderService.java index fadbd64c..4fb1e711 100644 --- a/code/processes/loading-process/java/nu/marginalia/loading/documents/KeywordLoaderService.java +++ b/code/processes/loading-process/java/nu/marginalia/loading/documents/KeywordLoaderService.java @@ -27,7 +27,8 @@ public class KeywordLoaderService { public boolean loadKeywords(DomainIdRegistry domainIdRegistry, ProcessHeartbeat heartbeat, - LoaderInputData inputData) throws IOException { + LoaderInputData inputData) throws IOException + { try (var task = heartbeat.createAdHocTaskHeartbeat("KEYWORDS")) { Collection> documentFiles = inputData.listDocumentFiles(); diff --git a/code/processes/loading-process/java/nu/marginalia/loading/domains/CachingDomainIdRegistry.java b/code/processes/loading-process/java/nu/marginalia/loading/domains/CachingDomainIdRegistry.java new file mode 100644 index 00000000..b80f4ce1 --- /dev/null +++ b/code/processes/loading-process/java/nu/marginalia/loading/domains/CachingDomainIdRegistry.java @@ -0,0 +1,27 @@ +package nu.marginalia.loading.domains; + +import java.util.HashMap; +import java.util.Map; + +/** Maps domain names to domain ids */ +public class CachingDomainIdRegistry implements DomainIdRegistry { + private final Map domainIds = new HashMap<>(10_000); + + @Override + public int getDomainId(String domainName) { + Integer id = domainIds.get(domainName.toLowerCase()); + + if (id == null) { + // This is a very severe problem + throw new IllegalStateException("Unknown domain id for domain " + domainName); + } + + return id; + } + + @Override + public void add(String domainName, int id) { + domainIds.put(domainName, id); + } + +} diff --git a/code/processes/loading-process/java/nu/marginalia/loading/domains/DbDomainIdRegistry.java b/code/processes/loading-process/java/nu/marginalia/loading/domains/DbDomainIdRegistry.java new file mode 100644 index 00000000..44a19d35 --- /dev/null +++ b/code/processes/loading-process/java/nu/marginalia/loading/domains/DbDomainIdRegistry.java @@ -0,0 +1,22 @@ +package nu.marginalia.loading.domains; + +import nu.marginalia.db.DbDomainQueries; +import nu.marginalia.model.EdgeDomain; + +public class DbDomainIdRegistry implements DomainIdRegistry { + private final DbDomainQueries dbDomainQueries; + + public DbDomainIdRegistry(DbDomainQueries dbDomainQueries) { + this.dbDomainQueries = dbDomainQueries; + } + + @Override + public int getDomainId(String domainName) { + return dbDomainQueries.getDomainId(new EdgeDomain(domainName)); + } + + @Override + public void add(String domainName, int id) { + throw new UnsupportedOperationException("Not implemented"); + } +} diff --git a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainIdRegistry.java b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainIdRegistry.java index bccc3ed3..e26314e6 100644 --- a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainIdRegistry.java +++ b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainIdRegistry.java @@ -1,25 +1,7 @@ package nu.marginalia.loading.domains; -import java.util.HashMap; -import java.util.Map; - -/** Maps domain names to domain ids */ -public class DomainIdRegistry { - private final Map domainIds = new HashMap<>(10_000); - - public int getDomainId(String domainName) { - Integer id = domainIds.get(domainName.toLowerCase()); - - if (id == null) { - // This is a very severe problem - throw new IllegalStateException("Unknown domain id for domain " + domainName); - } - - return id; - } - - void add(String domainName, int id) { - domainIds.put(domainName, id); - } +public interface DomainIdRegistry { + int getDomainId(String domainName); + void add(String domainName, int id); } diff --git a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java index 66389062..ae3229ad 100644 --- a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java +++ b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java @@ -52,7 +52,7 @@ public class DomainLoaderService { throws IOException, SQLException { Set domainNamesAll = new HashSet<>(100_000); - DomainIdRegistry ret = new DomainIdRegistry(); + DomainIdRegistry ret = new CachingDomainIdRegistry(); try (var conn = dataSource.getConnection(); var taskHeartbeat = heartbeat.createProcessTaskHeartbeat(Steps.class, "DOMAIN_IDS")) diff --git a/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java b/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java index b7906c22..ab8e3b37 100644 --- a/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java +++ b/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java @@ -4,6 +4,7 @@ public class ProcessInboxNames { public static final String CONVERTER_INBOX = "converter"; public static final String LOADER_INBOX = "loader"; public static final String CRAWLER_INBOX = "crawler"; + public static final String LIVE_CRAWLER_INBOX = "live-crawler"; public static final String INDEX_CONSTRUCTOR_INBOX = "index_constructor"; } diff --git a/code/processes/process-mq-api/java/nu/marginalia/mqapi/crawling/LiveCrawlRequest.java b/code/processes/process-mq-api/java/nu/marginalia/mqapi/crawling/LiveCrawlRequest.java new file mode 100644 index 00000000..da754283 --- /dev/null +++ b/code/processes/process-mq-api/java/nu/marginalia/mqapi/crawling/LiveCrawlRequest.java @@ -0,0 +1,11 @@ +package nu.marginalia.mqapi.crawling; + +import nu.marginalia.storage.model.FileStorageId; + +public class LiveCrawlRequest { + public FileStorageId liveDataFileStorageId; + + public LiveCrawlRequest(FileStorageId liveDataFileStorageId) { + this.liveDataFileStorageId = liveDataFileStorageId; + } +} diff --git a/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java b/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java index a29924a2..498daa79 100644 --- a/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java +++ b/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java @@ -31,6 +31,7 @@ import nu.marginalia.loading.LoaderIndexJournalWriter; import nu.marginalia.loading.LoaderInputData; import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService; +import nu.marginalia.loading.domains.CachingDomainIdRegistry; import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.loading.links.DomainLinksLoaderService; import nu.marginalia.model.EdgeDomain; @@ -166,7 +167,7 @@ public class IntegrationTest { LoaderInputData inputData = new LoaderInputData(List.of(processedDataDir)); - DomainIdRegistry domainIdRegistry = Mockito.mock(DomainIdRegistry.class); + DomainIdRegistry domainIdRegistry = Mockito.mock(CachingDomainIdRegistry.class); when(domainIdRegistry.getDomainId(any())).thenReturn(1); linksService.loadLinks(domainIdRegistry, new FakeProcessHeartbeat(), inputData); diff --git a/settings.gradle b/settings.gradle index 1df48b72..d98099af 100644 --- a/settings.gradle +++ b/settings.gradle @@ -64,6 +64,7 @@ include 'code:execution:data-extractors' include 'code:processes:crawling-process:ft-crawl-blocklist' include 'code:processes:crawling-process:ft-link-parser' include 'code:processes:crawling-process:ft-content-type' +include 'code:processes:live-crawler' include 'code:processes:process-mq-api'