(live-crawl) Make the actor poll for feeds changes instead of being a one-shot thing.

Also changes the live crawl process to store the live crawl data in a fixed directory in the storage base rather than versioned directories.
This commit is contained in:
Viktor Lofgren 2024-11-20 15:36:25 +01:00
parent 79ce4de2ab
commit 6e4252cf4c
4 changed files with 74 additions and 60 deletions

View File

@ -3,25 +3,21 @@ package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.IndexLocations;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorStateMachines;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.LiveCrawlRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageType;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.util.List;
import java.time.Duration;
import java.util.Objects;
@Singleton
public class LiveCrawlActor extends RecordActorPrototype {
@ -29,14 +25,14 @@ public class LiveCrawlActor extends RecordActorPrototype {
// STATES
private final ActorProcessWatcher processWatcher;
private final MqOutbox mqLiveCrawlerOutbox;
private final FileStorageService storageService;
private final ExecutorActorStateMachines executorActorStateMachines;
private final FeedsClient feedsClient;
private final Logger logger = LoggerFactory.getLogger(getClass());
public record Initial() implements ActorStep {}
public record LiveCrawl(FileStorageId id, long msgId) implements ActorStep {
public LiveCrawl(FileStorageId id) { this(id, -1); }
public record Monitor(String feedsHash) implements ActorStep {}
public record LiveCrawl(String feedsHash, long msgId) implements ActorStep {
public LiveCrawl(String feedsHash) { this(feedsHash, -1); }
}
@Override
@ -44,43 +40,32 @@ public class LiveCrawlActor extends RecordActorPrototype {
logger.info("{}", self);
return switch (self) {
case Initial() -> {
// clear the output directory of the loader from any debris from partial jobs that have been aborted
Files.list(IndexLocations.getIndexConstructionArea(storageService)).forEach(path -> {
try {
if (Files.isDirectory(path)) {
FileUtils.deleteDirectory(path.toFile());
yield new Monitor("-");
}
else if (Files.isRegularFile(path)) {
Files.delete(path);
case Monitor(String feedsHash) -> {
for (;;) {
String currentHash = feedsClient.getFeedDataHash();
if (!Objects.equals(currentHash, feedsHash)) {
yield new LiveCrawl(currentHash);
}
} catch (Exception e) {
logger.error("Error clearing staging area", e);
}
});
List<FileStorageId> activeCrawlData = storageService.getActiveFileStorages(FileStorageType.CRAWL_DATA);
if (activeCrawlData.isEmpty()) {
var storage = storageService.allocateStorage(FileStorageType.CRAWL_DATA, "crawl-data", "Crawl data");
yield new LiveCrawl(storage.id());
} else {
yield new LiveCrawl(activeCrawlData.getFirst());
Thread.sleep(Duration.ofMinutes(15));
}
}
case LiveCrawl(FileStorageId storageId, long msgId) when msgId < 0 -> {
long id = mqLiveCrawlerOutbox.sendAsync(new LiveCrawlRequest(storageId));
yield new LiveCrawl(storageId, id);
case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> {
long id = mqLiveCrawlerOutbox.sendAsync(new LiveCrawlRequest());
yield new LiveCrawl(feedsHash, id);
}
case LiveCrawl(FileStorageId storageId, long msgId) -> {
case LiveCrawl(String feedsHash, long msgId) -> {
var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessService.ProcessId.LIVE_CRAWLER, msgId);
if (rsp.state() != MqMessageState.OK) {
yield new Error("Crawler failed");
}
// Build the index
executorActorStateMachines.initFrom(ExecutorActor.CONVERT_AND_LOAD, new ConvertAndLoadActor.Rerank());
yield new End();
yield new Monitor(feedsHash);
}
default -> new Error("Unknown state");
};
@ -88,22 +73,21 @@ public class LiveCrawlActor extends RecordActorPrototype {
@Override
public String describe() {
return "Process a set of crawl data and then load it into the database.";
return "Actor that polls the feeds database for changes, and triggers the live crawler when needed";
}
@Inject
public LiveCrawlActor(ActorProcessWatcher processWatcher,
ProcessOutboxes processOutboxes,
FileStorageService storageService,
FeedsClient feedsClient,
Gson gson,
ExecutorActorStateMachines executorActorStateMachines)
{
super(gson);
this.processWatcher = processWatcher;
this.mqLiveCrawlerOutbox = processOutboxes.getLiveCrawlerOutbox();
this.storageService = storageService;
this.executorActorStateMachines = executorActorStateMachines;
this.feedsClient = feedsClient;
}

View File

@ -30,11 +30,12 @@ import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageBaseType;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Security;
import java.sql.SQLException;
@ -119,7 +120,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
LiveCrawlInstructions instructions = crawler.fetchInstructions();
try{
crawler.run(instructions.liveDataFileStorageId);
crawler.run();
instructions.ok();
} catch (Exception e) {
instructions.err();
@ -144,8 +145,13 @@ public class LiveCrawlerMain extends ProcessMainClass {
DONE
}
private void run(FileStorageId storageId) throws Exception {
Path basePath = fileStorageService.getStorage(storageId).asPath();
private void run() throws Exception {
Path basePath = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE).asPath().resolve("live-crawl-data");
if (!Files.isDirectory(basePath)) {
Files.createDirectories(basePath);
}
run(basePath);
}
@ -226,7 +232,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
// for live crawl, request is empty for now
LiveCrawlRequest request = gson.fromJson(msg.payload(), LiveCrawlRequest.class);
return new LiveCrawlInstructions(msg, inbox, request.liveDataFileStorageId);
return new LiveCrawlInstructions(msg, inbox);
}
@ -250,15 +256,11 @@ public class LiveCrawlerMain extends ProcessMainClass {
private final MqMessage message;
private final MqSingleShotInbox inbox;
public final FileStorageId liveDataFileStorageId;
LiveCrawlInstructions(MqMessage message,
MqSingleShotInbox inbox,
FileStorageId liveDataFileStorageId)
MqSingleShotInbox inbox)
{
this.message = message;
this.inbox = inbox;
this.liveDataFileStorageId = liveDataFileStorageId;
}

View File

@ -1,19 +1,25 @@
package nu.marginalia.livecrawler;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawldata.CrawledDocument;
import nu.marginalia.model.crawldata.CrawledDomain;
import nu.marginalia.model.crawldata.SerializableCrawlData;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
public class LiveCrawlDataSetTest {
@Test
public void testGetDataSet() throws Exception {
Path tempFile = Files.createTempFile("test", ".db");
Path tempDir = Files.createTempDirectory("live-crawl-data-set-test");
try {
LiveCrawlDataSet dataSet = new LiveCrawlDataSet(tempFile.toString());
LiveCrawlDataSet dataSet = new LiveCrawlDataSet(tempDir);
Assertions.assertFalse(dataSet.hasUrl("https://www.example.com/"));
dataSet.saveDocument(
@ -24,9 +30,38 @@ public class LiveCrawlDataSetTest {
"test"
);
Assertions.assertTrue(dataSet.hasUrl("https://www.example.com/"));
var streams = dataSet.getDataStreams();
Assertions.assertEquals(1, streams.size());
var stream = streams.iterator().next();
List<SerializableCrawlData> data = new ArrayList<>();
while (stream.hasNext()) {
data.add(stream.next());
}
int dataCount = 0;
int domainCount = 0;
for (var item : data) {
switch (item) {
case CrawledDomain domain -> {
domainCount++;
Assertions.assertEquals("www.example.com", domain.getDomain());
}
case CrawledDocument document -> {
dataCount++;
Assertions.assertEquals("https://www.example.com/", document.url);
Assertions.assertEquals("test", document.documentBody);
}
}
}
Assertions.assertEquals(1, dataCount);
Assertions.assertEquals(1, domainCount);
}
finally {
Files.delete(tempFile);
FileUtils.deleteDirectory(tempDir.toFile());
}
}

View File

@ -1,11 +1,4 @@
package nu.marginalia.mqapi.crawling;
import nu.marginalia.storage.model.FileStorageId;
public class LiveCrawlRequest {
public FileStorageId liveDataFileStorageId;
public LiveCrawlRequest(FileStorageId liveDataFileStorageId) {
this.liveDataFileStorageId = liveDataFileStorageId;
}
}