diff --git a/code/execution/java/nu/marginalia/actor/task/LiveCrawlActor.java b/code/execution/java/nu/marginalia/actor/task/LiveCrawlActor.java index 6752989a..79b50023 100644 --- a/code/execution/java/nu/marginalia/actor/task/LiveCrawlActor.java +++ b/code/execution/java/nu/marginalia/actor/task/LiveCrawlActor.java @@ -3,25 +3,21 @@ package nu.marginalia.actor.task; import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; -import nu.marginalia.IndexLocations; import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActorStateMachines; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.api.feeds.FeedsClient; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mqapi.crawling.LiveCrawlRequest; import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessService; -import nu.marginalia.storage.FileStorageService; -import nu.marginalia.storage.model.FileStorageId; -import nu.marginalia.storage.model.FileStorageType; -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.file.Files; -import java.util.List; +import java.time.Duration; +import java.util.Objects; @Singleton public class LiveCrawlActor extends RecordActorPrototype { @@ -29,14 +25,14 @@ public class LiveCrawlActor extends RecordActorPrototype { // STATES private final ActorProcessWatcher processWatcher; private final MqOutbox mqLiveCrawlerOutbox; - private final FileStorageService storageService; private final ExecutorActorStateMachines executorActorStateMachines; - + private final FeedsClient feedsClient; private final Logger logger = LoggerFactory.getLogger(getClass()); public record Initial() implements ActorStep {} - public record LiveCrawl(FileStorageId id, long msgId) implements ActorStep { - public LiveCrawl(FileStorageId id) { this(id, -1); } + public record Monitor(String feedsHash) implements ActorStep {} + public record LiveCrawl(String feedsHash, long msgId) implements ActorStep { + public LiveCrawl(String feedsHash) { this(feedsHash, -1); } } @Override @@ -44,43 +40,32 @@ public class LiveCrawlActor extends RecordActorPrototype { logger.info("{}", self); return switch (self) { case Initial() -> { - // clear the output directory of the loader from any debris from partial jobs that have been aborted - Files.list(IndexLocations.getIndexConstructionArea(storageService)).forEach(path -> { - try { - if (Files.isDirectory(path)) { - FileUtils.deleteDirectory(path.toFile()); - } - else if (Files.isRegularFile(path)) { - Files.delete(path); - } - } catch (Exception e) { - logger.error("Error clearing staging area", e); + yield new Monitor("-"); + } + case Monitor(String feedsHash) -> { + for (;;) { + String currentHash = feedsClient.getFeedDataHash(); + if (!Objects.equals(currentHash, feedsHash)) { + yield new LiveCrawl(currentHash); } - }); - - - List activeCrawlData = storageService.getActiveFileStorages(FileStorageType.CRAWL_DATA); - if (activeCrawlData.isEmpty()) { - var storage = storageService.allocateStorage(FileStorageType.CRAWL_DATA, "crawl-data", "Crawl data"); - yield new LiveCrawl(storage.id()); - } else { - yield new LiveCrawl(activeCrawlData.getFirst()); + Thread.sleep(Duration.ofMinutes(15)); } } - case LiveCrawl(FileStorageId storageId, long msgId) when msgId < 0 -> { - long id = mqLiveCrawlerOutbox.sendAsync(new LiveCrawlRequest(storageId)); - yield new LiveCrawl(storageId, id); + case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> { + long id = mqLiveCrawlerOutbox.sendAsync(new LiveCrawlRequest()); + yield new LiveCrawl(feedsHash, id); } - case LiveCrawl(FileStorageId storageId, long msgId) -> { + case LiveCrawl(String feedsHash, long msgId) -> { var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessService.ProcessId.LIVE_CRAWLER, msgId); if (rsp.state() != MqMessageState.OK) { yield new Error("Crawler failed"); } + // Build the index executorActorStateMachines.initFrom(ExecutorActor.CONVERT_AND_LOAD, new ConvertAndLoadActor.Rerank()); - yield new End(); + yield new Monitor(feedsHash); } default -> new Error("Unknown state"); }; @@ -88,22 +73,21 @@ public class LiveCrawlActor extends RecordActorPrototype { @Override public String describe() { - return "Process a set of crawl data and then load it into the database."; + return "Actor that polls the feeds database for changes, and triggers the live crawler when needed"; } @Inject public LiveCrawlActor(ActorProcessWatcher processWatcher, ProcessOutboxes processOutboxes, - FileStorageService storageService, + FeedsClient feedsClient, Gson gson, ExecutorActorStateMachines executorActorStateMachines) { super(gson); this.processWatcher = processWatcher; this.mqLiveCrawlerOutbox = processOutboxes.getLiveCrawlerOutbox(); - this.storageService = storageService; this.executorActorStateMachines = executorActorStateMachines; - + this.feedsClient = feedsClient; } diff --git a/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlerMain.java b/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlerMain.java index 63d030af..25f10435 100644 --- a/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlerMain.java +++ b/code/processes/live-crawler/java/nu/marginalia/livecrawler/LiveCrawlerMain.java @@ -30,11 +30,12 @@ import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.ServiceDiscoveryModule; import nu.marginalia.storage.FileStorageService; -import nu.marginalia.storage.model.FileStorageId; +import nu.marginalia.storage.model.FileStorageBaseType; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.file.Files; import java.nio.file.Path; import java.security.Security; import java.sql.SQLException; @@ -119,7 +120,7 @@ public class LiveCrawlerMain extends ProcessMainClass { LiveCrawlInstructions instructions = crawler.fetchInstructions(); try{ - crawler.run(instructions.liveDataFileStorageId); + crawler.run(); instructions.ok(); } catch (Exception e) { instructions.err(); @@ -144,8 +145,13 @@ public class LiveCrawlerMain extends ProcessMainClass { DONE } - private void run(FileStorageId storageId) throws Exception { - Path basePath = fileStorageService.getStorage(storageId).asPath(); + private void run() throws Exception { + Path basePath = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE).asPath().resolve("live-crawl-data"); + + if (!Files.isDirectory(basePath)) { + Files.createDirectories(basePath); + } + run(basePath); } @@ -226,7 +232,7 @@ public class LiveCrawlerMain extends ProcessMainClass { // for live crawl, request is empty for now LiveCrawlRequest request = gson.fromJson(msg.payload(), LiveCrawlRequest.class); - return new LiveCrawlInstructions(msg, inbox, request.liveDataFileStorageId); + return new LiveCrawlInstructions(msg, inbox); } @@ -250,15 +256,11 @@ public class LiveCrawlerMain extends ProcessMainClass { private final MqMessage message; private final MqSingleShotInbox inbox; - public final FileStorageId liveDataFileStorageId; - LiveCrawlInstructions(MqMessage message, - MqSingleShotInbox inbox, - FileStorageId liveDataFileStorageId) + MqSingleShotInbox inbox) { this.message = message; this.inbox = inbox; - this.liveDataFileStorageId = liveDataFileStorageId; } diff --git a/code/processes/live-crawler/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java b/code/processes/live-crawler/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java index 8753f2a0..672cc602 100644 --- a/code/processes/live-crawler/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java +++ b/code/processes/live-crawler/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java @@ -1,19 +1,25 @@ package nu.marginalia.livecrawler; import nu.marginalia.model.EdgeUrl; +import nu.marginalia.model.crawldata.CrawledDocument; +import nu.marginalia.model.crawldata.CrawledDomain; +import nu.marginalia.model.crawldata.SerializableCrawlData; +import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; public class LiveCrawlDataSetTest { @Test public void testGetDataSet() throws Exception { - Path tempFile = Files.createTempFile("test", ".db"); + Path tempDir = Files.createTempDirectory("live-crawl-data-set-test"); try { - LiveCrawlDataSet dataSet = new LiveCrawlDataSet(tempFile.toString()); + LiveCrawlDataSet dataSet = new LiveCrawlDataSet(tempDir); Assertions.assertFalse(dataSet.hasUrl("https://www.example.com/")); dataSet.saveDocument( @@ -24,9 +30,38 @@ public class LiveCrawlDataSetTest { "test" ); Assertions.assertTrue(dataSet.hasUrl("https://www.example.com/")); + + var streams = dataSet.getDataStreams(); + Assertions.assertEquals(1, streams.size()); + var stream = streams.iterator().next(); + + List data = new ArrayList<>(); + while (stream.hasNext()) { + data.add(stream.next()); + } + + int dataCount = 0; + int domainCount = 0; + + for (var item : data) { + switch (item) { + case CrawledDomain domain -> { + domainCount++; + Assertions.assertEquals("www.example.com", domain.getDomain()); + } + case CrawledDocument document -> { + dataCount++; + Assertions.assertEquals("https://www.example.com/", document.url); + Assertions.assertEquals("test", document.documentBody); + } + } + } + + Assertions.assertEquals(1, dataCount); + Assertions.assertEquals(1, domainCount); } finally { - Files.delete(tempFile); + FileUtils.deleteDirectory(tempDir.toFile()); } } diff --git a/code/processes/process-mq-api/java/nu/marginalia/mqapi/crawling/LiveCrawlRequest.java b/code/processes/process-mq-api/java/nu/marginalia/mqapi/crawling/LiveCrawlRequest.java index da754283..8aea5ab5 100644 --- a/code/processes/process-mq-api/java/nu/marginalia/mqapi/crawling/LiveCrawlRequest.java +++ b/code/processes/process-mq-api/java/nu/marginalia/mqapi/crawling/LiveCrawlRequest.java @@ -1,11 +1,4 @@ package nu.marginalia.mqapi.crawling; -import nu.marginalia.storage.model.FileStorageId; - public class LiveCrawlRequest { - public FileStorageId liveDataFileStorageId; - - public LiveCrawlRequest(FileStorageId liveDataFileStorageId) { - this.liveDataFileStorageId = liveDataFileStorageId; - } }