From d86926be5f7588517e2542b0c93899c07e963ee2 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Fri, 5 Jul 2024 15:31:47 +0200 Subject: [PATCH] (crawl) Add new functionality for re-crawling a single domain --- .../executor/client/ExecutorCrawlClient.java | 9 ++ .../api/src/main/protobuf/executor-api.proto | 5 ++ .../nu/marginalia/actor/ExecutorActor.java | 1 + .../actor/ExecutorActorControlService.java | 4 + .../nu/marginalia/actor/task/CrawlActor.java | 4 +- .../marginalia/actor/task/RecrawlActor.java | 2 +- .../actor/task/RecrawlSingleDomainActor.java | 85 +++++++++++++++++++ .../execution/ExecutorCrawlGrpcService.java | 16 ++++ .../mqapi/crawling/CrawlRequest.java | 16 ++++ .../java/nu/marginalia/crawl/CrawlerMain.java | 43 +++++++++- .../node/svc/ControlNodeActionsService.java | 19 +++++ .../node-storage-crawl-parquet-details.hdb | 10 ++- 12 files changed, 208 insertions(+), 6 deletions(-) create mode 100644 code/execution/java/nu/marginalia/actor/task/RecrawlSingleDomainActor.java diff --git a/code/execution/api/java/nu/marginalia/executor/client/ExecutorCrawlClient.java b/code/execution/api/java/nu/marginalia/executor/client/ExecutorCrawlClient.java index b037702d..25610892 100644 --- a/code/execution/api/java/nu/marginalia/executor/client/ExecutorCrawlClient.java +++ b/code/execution/api/java/nu/marginalia/executor/client/ExecutorCrawlClient.java @@ -44,6 +44,15 @@ public class ExecutorCrawlClient { .build()); } + public void triggerRecrawlSingleDomain(int node, FileStorageId fid, String domainName) { + channelPool.call(ExecutorCrawlApiBlockingStub::triggerSingleDomainRecrawl) + .forNode(node) + .run(RpcFileStorageIdWithDomainName.newBuilder() + .setFileStorageId(fid.id()) + .setTargetDomainName(domainName) + .build()); + } + public void triggerConvert(int node, FileStorageId fid) { channelPool.call(ExecutorCrawlApiBlockingStub::triggerConvert) .forNode(node) diff --git a/code/execution/api/src/main/protobuf/executor-api.proto b/code/execution/api/src/main/protobuf/executor-api.proto index 565770ac..2858d60b 100644 --- a/code/execution/api/src/main/protobuf/executor-api.proto +++ b/code/execution/api/src/main/protobuf/executor-api.proto @@ -22,6 +22,7 @@ service ExecutorApi { service ExecutorCrawlApi { rpc triggerCrawl(RpcFileStorageId) returns (Empty) {} rpc triggerRecrawl(RpcFileStorageId) returns (Empty) {} + rpc triggerSingleDomainRecrawl(RpcFileStorageIdWithDomainName) returns (Empty) {} rpc triggerConvert(RpcFileStorageId) returns (Empty) {} rpc triggerConvertAndLoad(RpcFileStorageId) returns (Empty) {} rpc loadProcessedData(RpcFileStorageIds) returns (Empty) {} @@ -55,6 +56,10 @@ message RpcProcessId { message RpcFileStorageId { int64 fileStorageId = 1; } +message RpcFileStorageIdWithDomainName { + int64 fileStorageId = 1; + string targetDomainName = 2; +} message RpcFileStorageIds { repeated int64 fileStorageIds = 1; } diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActor.java b/code/execution/java/nu/marginalia/actor/ExecutorActor.java index d04b3eaa..e59ecd9c 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActor.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActor.java @@ -3,6 +3,7 @@ package nu.marginalia.actor; public enum ExecutorActor { CRAWL, RECRAWL, + RECRAWL_SINGLE_DOMAIN, CONVERT_AND_LOAD, PROC_CONVERTER_SPAWNER, PROC_LOADER_SPAWNER, diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java index 6f37d7ab..591119f8 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java @@ -26,6 +26,7 @@ public class ExecutorActorControlService { private final ExecutorActorStateMachines stateMachines; public Map actorDefinitions = new HashMap<>(); private final int node; + @Inject public ExecutorActorControlService(MessageQueueFactory messageQueueFactory, BaseServiceParams baseServiceParams, @@ -33,6 +34,7 @@ public class ExecutorActorControlService { ConvertAndLoadActor convertAndLoadActor, CrawlActor crawlActor, RecrawlActor recrawlActor, + RecrawlSingleDomainActor recrawlSingleDomainActor, RestoreBackupActor restoreBackupActor, ConverterMonitorActor converterMonitorFSM, CrawlerMonitorActor crawlerMonitorActor, @@ -57,6 +59,8 @@ public class ExecutorActorControlService { register(ExecutorActor.CRAWL, crawlActor); register(ExecutorActor.RECRAWL, recrawlActor); + register(ExecutorActor.RECRAWL_SINGLE_DOMAIN, recrawlSingleDomainActor); + register(ExecutorActor.CONVERT, convertActor); register(ExecutorActor.RESTORE_BACKUP, restoreBackupActor); register(ExecutorActor.CONVERT_AND_LOAD, convertAndLoadActor); diff --git a/code/execution/java/nu/marginalia/actor/task/CrawlActor.java b/code/execution/java/nu/marginalia/actor/task/CrawlActor.java index 3e097554..0a742888 100644 --- a/code/execution/java/nu/marginalia/actor/task/CrawlActor.java +++ b/code/execution/java/nu/marginalia/actor/task/CrawlActor.java @@ -50,7 +50,9 @@ public class CrawlActor extends RecordActorPrototype { storageService.relateFileStorages(storage.id(), dataArea.id()); // Send convert request - long msgId = mqCrawlerOutbox.sendAsync(new CrawlRequest(List.of(fid), dataArea.id())); + long msgId = mqCrawlerOutbox.sendAsync( + CrawlRequest.forSpec(fid, dataArea.id()) + ); yield new Crawl(msgId); } diff --git a/code/execution/java/nu/marginalia/actor/task/RecrawlActor.java b/code/execution/java/nu/marginalia/actor/task/RecrawlActor.java index 2b748ced..0eefd4ef 100644 --- a/code/execution/java/nu/marginalia/actor/task/RecrawlActor.java +++ b/code/execution/java/nu/marginalia/actor/task/RecrawlActor.java @@ -59,7 +59,7 @@ public class RecrawlActor extends RecordActorPrototype { refreshService.synchronizeDomainList(); - long id = mqCrawlerOutbox.sendAsync(new CrawlRequest(null, fid)); + long id = mqCrawlerOutbox.sendAsync(CrawlRequest.forRecrawl(fid)); yield new Crawl(id, fid, cascadeLoad); } diff --git a/code/execution/java/nu/marginalia/actor/task/RecrawlSingleDomainActor.java b/code/execution/java/nu/marginalia/actor/task/RecrawlSingleDomainActor.java new file mode 100644 index 00000000..990da5aa --- /dev/null +++ b/code/execution/java/nu/marginalia/actor/task/RecrawlSingleDomainActor.java @@ -0,0 +1,85 @@ +package nu.marginalia.actor.task; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.actor.prototype.RecordActorPrototype; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.actor.state.Resume; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.crawling.CrawlRequest; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.ProcessService; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageId; +import nu.marginalia.storage.model.FileStorageType; + +@Singleton +public class RecrawlSingleDomainActor extends RecordActorPrototype { + + private final MqOutbox mqCrawlerOutbox; + private final FileStorageService storageService; + private final ActorProcessWatcher processWatcher; + + /** Initial step + * @param storageId - the id of the storage to recrawl + * @param targetDomainName - domain to be recrawled + */ + public record Initial(FileStorageId storageId, String targetDomainName) implements ActorStep {} + + /** The action step */ + @Resume(behavior = ActorResumeBehavior.RETRY) + public record Crawl(long messageId) implements ActorStep {} + + @Override + public ActorStep transition(ActorStep self) throws Exception { + return switch (self) { + case Initial (FileStorageId fid, String targetDomainName) -> { + var crawlStorage = storageService.getStorage(fid); + + if (crawlStorage == null) yield new Error("Bad storage id"); + if (crawlStorage.type() != FileStorageType.CRAWL_DATA) yield new Error("Bad storage type " + crawlStorage.type()); + + long id = mqCrawlerOutbox.sendAsync( + CrawlRequest.forSingleDomain(targetDomainName, fid) + ); + + yield new Crawl(id); + } + case Crawl (long msgId) -> { + var rsp = processWatcher.waitResponse( + mqCrawlerOutbox, + ProcessService.ProcessId.CRAWLER, + msgId); + + if (rsp.state() != MqMessageState.OK) { + yield new Error("Crawler failed"); + } + + yield new End(); + } + default -> new End(); + }; + } + + @Override + public String describe() { + return "Run the crawler only re-fetching a single domain"; + } + + @Inject + public RecrawlSingleDomainActor(ActorProcessWatcher processWatcher, + ProcessOutboxes processOutboxes, + FileStorageService storageService, + Gson gson) + { + super(gson); + + this.processWatcher = processWatcher; + this.mqCrawlerOutbox = processOutboxes.getCrawlerOutbox(); + this.storageService = storageService; + } + +} diff --git a/code/execution/java/nu/marginalia/execution/ExecutorCrawlGrpcService.java b/code/execution/java/nu/marginalia/execution/ExecutorCrawlGrpcService.java index b95f64d0..20648015 100644 --- a/code/execution/java/nu/marginalia/execution/ExecutorCrawlGrpcService.java +++ b/code/execution/java/nu/marginalia/execution/ExecutorCrawlGrpcService.java @@ -47,6 +47,22 @@ public class ExecutorCrawlGrpcService extends ExecutorCrawlApiGrpc.ExecutorCrawl } } + @Override + public void triggerSingleDomainRecrawl(RpcFileStorageIdWithDomainName request, StreamObserver responseObserver) { + try { + actorControlService.startFrom(ExecutorActor.RECRAWL_SINGLE_DOMAIN, + new RecrawlSingleDomainActor.Initial( + FileStorageId.of(request.getFileStorageId()), + request.getTargetDomainName())); + + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + @Override public void triggerConvert(RpcFileStorageId request, StreamObserver responseObserver) { try { diff --git a/code/process-mqapi/java/nu/marginalia/mqapi/crawling/CrawlRequest.java b/code/process-mqapi/java/nu/marginalia/mqapi/crawling/CrawlRequest.java index 40cd30ce..ff090140 100644 --- a/code/process-mqapi/java/nu/marginalia/mqapi/crawling/CrawlRequest.java +++ b/code/process-mqapi/java/nu/marginalia/mqapi/crawling/CrawlRequest.java @@ -14,8 +14,24 @@ public class CrawlRequest { */ public List specStorage; + /** (optional) Name of a single domain to be re-crawled */ + public String targetDomainName; + /** File storage where the crawl data will be written. If it contains existing crawl data, * this crawl data will be referenced for e-tags and last-mofified checks. */ public FileStorageId crawlStorage; + + public static CrawlRequest forSpec(FileStorageId specStorage, FileStorageId crawlStorage) { + return new CrawlRequest(List.of(specStorage), null, crawlStorage); + } + + public static CrawlRequest forSingleDomain(String targetDomainName, FileStorageId crawlStorage) { + return new CrawlRequest(null, targetDomainName, crawlStorage); + } + + public static CrawlRequest forRecrawl(FileStorageId crawlStorage) { + return new CrawlRequest(null, null, crawlStorage); + } + } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java index 1b04c0f9..5173af75 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java @@ -23,6 +23,7 @@ import nu.marginalia.crawling.io.CrawledDomainReader; import nu.marginalia.crawling.io.CrawlerOutputFile; import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter; import nu.marginalia.crawlspec.CrawlSpecFileNames; +import nu.marginalia.model.EdgeDomain; import nu.marginalia.service.ProcessMainClass; import nu.marginalia.storage.FileStorageService; import nu.marginalia.model.crawlspec.CrawlSpecRecord; @@ -136,7 +137,12 @@ public class CrawlerMain extends ProcessMainClass { var instructions = crawler.fetchInstructions(); try { - crawler.run(instructions.specProvider, instructions.outputDir); + if (instructions.targetDomainName != null) { + crawler.runForSingleDomain(instructions.targetDomainName, instructions.outputDir); + } + else { + crawler.run(instructions.specProvider, instructions.outputDir); + } instructions.ok(); } catch (Exception ex) { logger.error("Crawler failed", ex); @@ -200,6 +206,26 @@ public class CrawlerMain extends ProcessMainClass { } } + public void runForSingleDomain(String targetDomainName, Path outputDir) throws Exception { + + heartbeat.start(); + + try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler-" + targetDomainName.replace('/', '-') + ".log")); + WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir); + AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(List.of(new EdgeDomain(targetDomainName))) + ) { + var spec = new CrawlSpecRecord(targetDomainName, 1000, null); + var task = new CrawlTask(spec, anchorTagsSource, outputDir, warcArchiver, workLog); + task.run(); + } + catch (Exception ex) { + logger.warn("Exception in crawler", ex); + } + finally { + heartbeat.shutDown(); + } + } + class CrawlTask implements SimpleBlockingThreadPool.Task { private final CrawlSpecRecord specification; @@ -216,7 +242,8 @@ public class CrawlerMain extends ProcessMainClass { AnchorTagsSource anchorTagsSource, Path outputDir, WarcArchiverIf warcArchiver, - WorkLog workLog) { + WorkLog workLog) + { this.specification = specification; this.anchorTagsSource = anchorTagsSource; this.outputDir = outputDir; @@ -303,11 +330,19 @@ public class CrawlerMain extends ProcessMainClass { private final MqMessage message; private final MqSingleShotInbox inbox; - CrawlRequest(CrawlSpecProvider specProvider, Path outputDir, MqMessage message, MqSingleShotInbox inbox) { + private final String targetDomainName; + + CrawlRequest(CrawlSpecProvider specProvider, + String targetDomainName, + Path outputDir, + MqMessage message, + MqSingleShotInbox inbox) + { this.message = message; this.inbox = inbox; this.specProvider = specProvider; this.outputDir = outputDir; + this.targetDomainName = targetDomainName; } @@ -325,6 +360,7 @@ public class CrawlerMain extends ProcessMainClass { var inbox = messageQueueFactory.createSingleShotInbox(CRAWLER_INBOX, node, UUID.randomUUID()); logger.info("Waiting for instructions"); + var msgOpt = getMessage(inbox, nu.marginalia.mqapi.crawling.CrawlRequest.class.getSimpleName()); var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); @@ -350,6 +386,7 @@ public class CrawlerMain extends ProcessMainClass { return new CrawlRequest( specProvider, + request.targetDomainName, crawlData.asPath(), msg, inbox); diff --git a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java index 4b833789..c385e52e 100644 --- a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java +++ b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java @@ -24,6 +24,7 @@ import java.nio.file.Path; import java.sql.SQLException; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Set; @Singleton @@ -88,6 +89,9 @@ public class ControlNodeActionsService { Spark.post("/nodes/:id/actions/recrawl", this::triggerAutoRecrawl, redirectControl.renderRedirectAcknowledgement("Recrawling", "..") ); + Spark.post("/nodes/:id/actions/recrawl-single-domain", this::triggerSingleDomainRecrawl, + redirectControl.renderRedirectAcknowledgement("Recrawling", "..") + ); Spark.post("/nodes/:id/actions/process", this::triggerProcess, redirectControl.renderRedirectAcknowledgement("Processing", "..") ); @@ -216,6 +220,21 @@ public class ControlNodeActionsService { return ""; } + private Object triggerSingleDomainRecrawl(Request request, Response response) throws SQLException { + int nodeId = Integer.parseInt(request.params("id")); + + var toCrawl = parseSourceFileStorageId(request.queryParams("source")); + var targetDomainName = Objects.requireNonNull(request.queryParams("targetDomainName")); + + crawlClient.triggerRecrawlSingleDomain( + nodeId, + toCrawl, + targetDomainName + ); + + return ""; + } + private Object triggerNewCrawl(Request request, Response response) throws SQLException { int nodeId = Integer.parseInt(request.params("id")); diff --git a/code/services-core/control-service/resources/templates/control/node/node-storage-crawl-parquet-details.hdb b/code/services-core/control-service/resources/templates/control/node/node-storage-crawl-parquet-details.hdb index 2be78e28..c06e2cd1 100644 --- a/code/services-core/control-service/resources/templates/control/node/node-storage-crawl-parquet-details.hdb +++ b/code/services-core/control-service/resources/templates/control/node/node-storage-crawl-parquet-details.hdb @@ -24,12 +24,20 @@

Summary

- + + +
DomainFileDomainFileCrawl
{{domain}} Download Parquet +
+ + + +
+

Contents