diff --git a/code/execution/api/java/nu/marginalia/executor/client/ExecutorExportClient.java b/code/execution/api/java/nu/marginalia/executor/client/ExecutorExportClient.java index e12fa0d3..b03e4ae2 100644 --- a/code/execution/api/java/nu/marginalia/executor/client/ExecutorExportClient.java +++ b/code/execution/api/java/nu/marginalia/executor/client/ExecutorExportClient.java @@ -3,6 +3,7 @@ package nu.marginalia.executor.client; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.functions.execution.api.*; +import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.service.client.GrpcChannelPoolFactory; import nu.marginalia.service.client.GrpcMultiNodeChannelPool; import nu.marginalia.service.discovery.property.ServiceKey; @@ -11,6 +12,8 @@ import nu.marginalia.storage.model.FileStorageId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; + import static nu.marginalia.functions.execution.api.ExecutorExportApiGrpc.ExecutorExportApiBlockingStub; @Singleton @@ -18,23 +21,33 @@ public class ExecutorExportClient { private final GrpcMultiNodeChannelPool channelPool; private static final Logger logger = LoggerFactory.getLogger(ExecutorExportClient.class); + private final MqPersistence persistence; @Inject - public ExecutorExportClient(GrpcChannelPoolFactory grpcChannelPoolFactory) + public ExecutorExportClient(GrpcChannelPoolFactory grpcChannelPoolFactory, MqPersistence persistence) { this.channelPool = grpcChannelPoolFactory .createMulti( ServiceKey.forGrpcApi(ExecutorExportApiGrpc.class, ServicePartition.multi()), ExecutorExportApiGrpc::newBlockingStub); + this.persistence = persistence; } + long createTrackingTokenMsg(String task, int node, Duration ttl) throws Exception { + return persistence.sendNewMessage("task-tracking[" + node + "]", "export-client", null, task, "", ttl); + } + + public long exportAtags(int node, FileStorageId fid) throws Exception { + long msgId = createTrackingTokenMsg("atags", node, Duration.ofHours(6)); - public void exportAtags(int node, FileStorageId fid) { channelPool.call(ExecutorExportApiBlockingStub::exportAtags) .forNode(node) - .run(RpcFileStorageId.newBuilder() + .run(RpcExportRequest.newBuilder() .setFileStorageId(fid.id()) + .setMsgId(msgId) .build()); + return msgId; } + public void exportSampleData(int node, FileStorageId fid, int size, String name) { channelPool.call(ExecutorExportApiBlockingStub::exportSampleData) .forNode(node) @@ -45,20 +58,26 @@ public class ExecutorExportClient { .build()); } - public void exportRssFeeds(int node, FileStorageId fid) { + public long exportRssFeeds(int node, FileStorageId fid) throws Exception { + long msgId = createTrackingTokenMsg("rss", node, Duration.ofHours(6)); channelPool.call(ExecutorExportApiBlockingStub::exportRssFeeds) .forNode(node) - .run(RpcFileStorageId.newBuilder() + .run(RpcExportRequest.newBuilder() .setFileStorageId(fid.id()) + .setMsgId(msgId) .build()); + return msgId; } - public void exportTermFrequencies(int node, FileStorageId fid) { + public long exportTermFrequencies(int node, FileStorageId fid) throws Exception { + long msgId = createTrackingTokenMsg("tfreq", node, Duration.ofHours(6)); channelPool.call(ExecutorExportApiBlockingStub::exportTermFrequencies) .forNode(node) - .run(RpcFileStorageId.newBuilder() + .run(RpcExportRequest.newBuilder() .setFileStorageId(fid.id()) + .setMsgId(msgId) .build()); + return msgId; } public void exportData(int node) { @@ -77,4 +96,21 @@ public class ExecutorExportClient { } + public void exportAllAtags() { + channelPool.call(ExecutorExportApiBlockingStub::exportAllAtags) + .forNode(1) + .run(Empty.getDefaultInstance()); + } + + public void exportAllFeeds() { + channelPool.call(ExecutorExportApiBlockingStub::exportAllFeeds) + .forNode(1) + .run(Empty.getDefaultInstance()); + } + + public void exportAllTfreqs() { + channelPool.call(ExecutorExportApiBlockingStub::exportAllTfreqs) + .forNode(1) + .run(Empty.getDefaultInstance()); + } } diff --git a/code/execution/api/src/main/protobuf/executor-api.proto b/code/execution/api/src/main/protobuf/executor-api.proto index b3ab1d41..2e0704bd 100644 --- a/code/execution/api/src/main/protobuf/executor-api.proto +++ b/code/execution/api/src/main/protobuf/executor-api.proto @@ -39,15 +39,20 @@ service ExecutorSideloadApi { } service ExecutorExportApi { - rpc exportAtags(RpcFileStorageId) returns (Empty) {} + rpc exportAtags(RpcExportRequest) returns (Empty) {} rpc exportSegmentationModel(RpcExportSegmentationModel) returns (Empty) {} rpc exportSampleData(RpcExportSampleData) returns (Empty) {} - rpc exportRssFeeds(RpcFileStorageId) returns (Empty) {} - rpc exportTermFrequencies(RpcFileStorageId) returns (Empty) {} + rpc exportRssFeeds(RpcExportRequest) returns (Empty) {} + rpc exportTermFrequencies(RpcExportRequest) returns (Empty) {} rpc exportData(Empty) returns (Empty) {} + + rpc exportAllAtags(Empty) returns (Empty) {} + rpc exportAllFeeds(Empty) returns (Empty) {} + rpc exportAllTfreqs(Empty) returns (Empty) {} } message Empty {} + message RpcFsmName { string actorName = 1; } @@ -57,6 +62,10 @@ message RpcProcessId { message RpcFileStorageId { int64 fileStorageId = 1; } +message RpcExportRequest { + int64 fileStorageId = 1; + int64 msgId = 2; +} message RpcFileStorageIdWithDomainName { int64 fileStorageId = 1; string targetDomainName = 2; diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActor.java b/code/execution/java/nu/marginalia/actor/ExecutorActor.java index adeed585..211a7144 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActor.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActor.java @@ -5,6 +5,8 @@ import nu.marginalia.nodecfg.model.NodeProfile; import java.util.Set; public enum ExecutorActor { + PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), + CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java index 7cb3e91c..04daf704 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java @@ -3,6 +3,7 @@ package nu.marginalia.actor; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.monitor.FileStorageMonitorActor; +import nu.marginalia.actor.precession.ExportAllPrecessionActor; import nu.marginalia.actor.proc.*; import nu.marginalia.actor.prototype.ActorPrototype; import nu.marginalia.actor.prototype.RecordActorPrototype; @@ -13,6 +14,7 @@ import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.model.NodeConfiguration; import nu.marginalia.service.control.ServiceEventLog; +import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.server.BaseServiceParams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +39,7 @@ public class ExecutorActorControlService { @Inject public ExecutorActorControlService(MessageQueueFactory messageQueueFactory, + ServiceConfiguration serviceConfiguration, NodeConfigurationService configurationService, BaseServiceParams baseServiceParams, ConvertActor convertActor, @@ -63,6 +66,7 @@ public class ExecutorActorControlService { DownloadSampleActor downloadSampleActor, ScrapeFeedsActor scrapeFeedsActor, ExecutorActorStateMachines stateMachines, + ExportAllPrecessionActor exportAllPrecessionActor, UpdateRssActor updateRssActor) throws SQLException { this.messageQueueFactory = messageQueueFactory; this.eventLog = baseServiceParams.eventLog; @@ -102,6 +106,10 @@ public class ExecutorActorControlService { register(ExecutorActor.SCRAPE_FEEDS, scrapeFeedsActor); register(ExecutorActor.UPDATE_RSS, updateRssActor); + + if (serviceConfiguration.node() == 1) { + register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor); + } } private void register(ExecutorActor process, RecordActorPrototype graph) { diff --git a/code/execution/java/nu/marginalia/actor/precession/ExportAllPrecessionActor.java b/code/execution/java/nu/marginalia/actor/precession/ExportAllPrecessionActor.java new file mode 100644 index 00000000..5b0aed52 --- /dev/null +++ b/code/execution/java/nu/marginalia/actor/precession/ExportAllPrecessionActor.java @@ -0,0 +1,116 @@ +package nu.marginalia.actor.precession; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import nu.marginalia.actor.prototype.RecordActorPrototype; +import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.executor.client.ExecutorExportClient; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.nodecfg.NodeConfigurationService; +import nu.marginalia.nodecfg.model.NodeConfiguration; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageType; + +import java.time.Duration; +import java.util.Comparator; +import java.util.Optional; + +public class ExportAllPrecessionActor extends RecordActorPrototype { + + private final NodeConfigurationService nodeConfigurationService; + private final ExecutorExportClient exportClient; + private final FileStorageService fileStorageService; + private final MqPersistence persistence; + + @Inject + public ExportAllPrecessionActor(Gson gson, + NodeConfigurationService nodeConfigurationService, + ExecutorExportClient exportClient, + FileStorageService fileStorageService, + MqPersistence persistence) + { + super(gson); + this.nodeConfigurationService = nodeConfigurationService; + this.exportClient = exportClient; + this.fileStorageService = fileStorageService; + this.persistence = persistence; + } + + public enum ExportTask { + FEEDS, + ATAGS, + TFREQ + } + + public record Initial(ExportTask task) implements ActorStep {} + public record Export(int nodeId, ExportTask task, long msgId) implements ActorStep { + public Export(int nodeId, ExportTask task) { + this(nodeId, task, -1); + } + } + + @Override + public ActorStep transition(ActorStep self) throws Exception { + return switch (self) { + case Initial(ExportTask task) -> { + var firstNode = nextNodeId(-1); + if (firstNode.isEmpty()) + yield new Error("No nodes included in precession"); + else + yield new Export(firstNode.get(), task); + } + + case Export(int nodeId, ExportTask task, long msgId) when msgId < 0 -> { + var activeStorages = fileStorageService.getActiveFileStorages(nodeId, FileStorageType.CRAWL_DATA); + if (activeStorages.isEmpty()) { + yield new Error("Node " + nodeId + " has no active file storage"); + } + var activeCrawlStorageId = activeStorages.getFirst(); + + long trackingMsgId = switch(task) { + case ATAGS -> exportClient.exportAtags(nodeId, activeCrawlStorageId); + case TFREQ -> exportClient.exportTermFrequencies(nodeId, activeCrawlStorageId); + case FEEDS -> exportClient.exportRssFeeds(nodeId, activeCrawlStorageId); + }; + + yield new Export(nodeId, task, trackingMsgId); + } + + case Export(int nodeId, ExportTask task, long msgId) -> { + for (; ; ) { + var msg = persistence.getMessage(msgId); + if (!msg.state().isTerminal()) { + Thread.sleep(Duration.ofSeconds(30)); + continue; + } + if (msg.state() == MqMessageState.OK) { + var nextNode = nextNodeId(nodeId); + if (nextNode.isEmpty()) { + yield new End(); + } else { + yield new Export(nextNode.get(), task); + } + } else { + yield new Error("Export failed for node " + nodeId); + } + } + } + default -> new Error("Unknown state"); + }; + } + + private Optional nextNodeId(int currentNodeId) { + return nodeConfigurationService.getAll() + .stream().sorted(Comparator.comparing(NodeConfiguration::node)) + .filter(node -> node.node() > currentNodeId) + .filter(NodeConfiguration::includeInPrecession) + .map(NodeConfiguration::node) + .findFirst(); + } + + @Override + public String describe() { + return "Runs an export job on each index node included in the precession"; + } +} diff --git a/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java b/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java index e8b4f341..28c853f0 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java @@ -7,6 +7,7 @@ import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessService; @@ -25,42 +26,51 @@ public class ExportAtagsActor extends RecordActorPrototype { private final ActorProcessWatcher processWatcher; private final MqOutbox exportTasksOutbox; private final Logger logger = LoggerFactory.getLogger(getClass()); + private final MqPersistence persistence; - public record Export(FileStorageId crawlId) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { - public Run(FileStorageId crawlId, FileStorageId destId) { - this(crawlId, destId, -1); + public record Export(long responseMsgId, FileStorageId crawlId) implements ActorStep {} + public record Run(long responseMsgId,FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { + public Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId) { + this(responseMsgId, crawlId, destId, -1); } } + public record Fail(long responseMsgId, String message) implements ActorStep {} @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { - case Export(FileStorageId crawlId) -> { + case Export(long responseMsgId, FileStorageId crawlId) -> { + persistence.updateMessageState(responseMsgId, MqMessageState.ACK); + var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atags-export", "Atags " + LocalDateTime.now()); - if (storage == null) yield new Error("Bad storage id"); - yield new Run(crawlId, storage.id()); + if (storage == null) yield new Fail(responseMsgId, "Bad storage id"); + + yield new Run(responseMsgId, crawlId, storage.id()); } - case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { + case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.atags(crawlId, destId)); - yield new Run(crawlId, destId, newMsgId); + yield new Run(responseMsgId, crawlId, destId, newMsgId); } - case Run(_, FileStorageId destId, long msgId) -> { + case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) -> { var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); if (rsp.state() != MqMessageState.OK) { storageService.flagFileForDeletion(destId); - yield new Error("Exporter failed"); + yield new Fail(responseMsgId, "Exporter failed"); } else { storageService.setFileStorageState(destId, FileStorageState.UNSET); + persistence.updateMessageState(responseMsgId, MqMessageState.OK); yield new End(); } } - + case Fail(long responseMsgId, String message) -> { + persistence.updateMessageState(responseMsgId, MqMessageState.ERR); + yield new Error(message); + } default -> new Error(); }; } @@ -74,11 +84,13 @@ public class ExportAtagsActor extends RecordActorPrototype { public ExportAtagsActor(Gson gson, FileStorageService storageService, ProcessOutboxes processOutboxes, + MqPersistence persistence, ActorProcessWatcher processWatcher) { super(gson); this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); this.storageService = storageService; + this.persistence = persistence; this.processWatcher = processWatcher; } diff --git a/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java b/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java index 7a0d612c..a3138b6d 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java @@ -7,6 +7,7 @@ import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessService; @@ -24,43 +25,51 @@ public class ExportFeedsActor extends RecordActorPrototype { private final FileStorageService storageService; private final ActorProcessWatcher processWatcher; private final MqOutbox exportTasksOutbox; + private final MqPersistence persistence; private final Logger logger = LoggerFactory.getLogger(getClass()); - public record Export(FileStorageId crawlId) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { - public Run(FileStorageId crawlId, FileStorageId destId) { - this(crawlId, destId, -1); + public record Export(long responseMsgId, FileStorageId crawlId) implements ActorStep {} + public record Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { + public Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId) { + this(responseMsgId, crawlId, destId, -1); } } + public record Fail(long responseMsgId, String message) implements ActorStep {} @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { - case Export(FileStorageId crawlId) -> { + case Export(long responseMsgId, FileStorageId crawlId) -> { + persistence.updateMessageState(responseMsgId, MqMessageState.ACK); + var storage = storageService.allocateStorage(FileStorageType.EXPORT, "feed-export", "Feeds " + LocalDateTime.now()); - if (storage == null) yield new Error("Bad storage id"); - yield new Run(crawlId, storage.id()); + if (storage == null) yield new Fail(responseMsgId, "Bad storage id"); + yield new Run(responseMsgId, crawlId, storage.id()); } - case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { + case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.feeds(crawlId, destId)); - yield new Run(crawlId, destId, newMsgId); + yield new Run(responseMsgId, crawlId, destId, newMsgId); } - case Run(_, FileStorageId destId, long msgId) -> { + case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> { var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); if (rsp.state() != MqMessageState.OK) { storageService.flagFileForDeletion(destId); - yield new Error("Exporter failed"); + yield new Fail(responseMsgId, "Exporter failed"); } else { storageService.setFileStorageState(destId, FileStorageState.UNSET); + persistence.updateMessageState(responseMsgId, MqMessageState.OK); yield new End(); } } - + case Fail(long responseMsgId, String message) -> { + persistence.updateMessageState(responseMsgId, MqMessageState.ERR); + yield new Error(message); + } default -> new Error(); }; } @@ -75,12 +84,13 @@ public class ExportFeedsActor extends RecordActorPrototype { public ExportFeedsActor(Gson gson, FileStorageService storageService, ActorProcessWatcher processWatcher, - ProcessOutboxes outboxes) + ProcessOutboxes outboxes, MqPersistence persistence) { super(gson); this.storageService = storageService; this.processWatcher = processWatcher; this.exportTasksOutbox = outboxes.getExportTasksOutbox(); + this.persistence = persistence; } } diff --git a/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java b/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java index d4c50686..952b9828 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java @@ -7,6 +7,7 @@ import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessService; @@ -25,41 +26,48 @@ public class ExportTermFreqActor extends RecordActorPrototype { private final ActorProcessWatcher processWatcher; private final MqOutbox exportTasksOutbox; private final Logger logger = LoggerFactory.getLogger(getClass()); + private final MqPersistence persistence; - public record Export(FileStorageId crawlId) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { - public Run(FileStorageId crawlId, FileStorageId destId) { - this(crawlId, destId, -1); + public record Export(long responseMsgId, FileStorageId crawlId) implements ActorStep {} + public record Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { + public Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId) { + this(responseMsgId, crawlId, destId, -1); } } - + public record Fail(long responseMsgId, String message) implements ActorStep {} @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { - case Export(FileStorageId crawlId) -> { + case Export(long responseMsgId, FileStorageId crawlId) -> { + persistence.updateMessageState(responseMsgId, MqMessageState.ACK); var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq", "Term Frequencies " + LocalDateTime.now()); - if (storage == null) yield new Error("Bad storage id"); - yield new Run(crawlId, storage.id()); + if (storage == null) yield new Fail(responseMsgId, "Bad storage id"); + yield new Run(responseMsgId, crawlId, storage.id()); } - case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { + case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.termFreq(crawlId, destId)); - yield new Run(crawlId, destId, newMsgId); + yield new Run(responseMsgId, crawlId, destId, newMsgId); } - case Run(_, FileStorageId destId, long msgId) -> { + case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> { var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); if (rsp.state() != MqMessageState.OK) { storageService.flagFileForDeletion(destId); - yield new Error("Exporter failed"); + yield new Fail(responseMsgId, "Exporter failed"); } else { storageService.setFileStorageState(destId, FileStorageState.UNSET); + persistence.updateMessageState(responseMsgId, MqMessageState.OK); yield new End(); } } + case Fail(long responseMsgId, String message) -> { + persistence.updateMessageState(responseMsgId, MqMessageState.ERR); + yield new Error(message); + } default -> new Error(); }; @@ -75,10 +83,12 @@ public class ExportTermFreqActor extends RecordActorPrototype { public ExportTermFreqActor(Gson gson, FileStorageService storageService, ProcessOutboxes processOutboxes, + MqPersistence persistence, ActorProcessWatcher processWatcher) { super(gson); this.storageService = storageService; + this.persistence = persistence; this.processWatcher = processWatcher; this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); } diff --git a/code/execution/java/nu/marginalia/execution/ExecutorExportGrpcService.java b/code/execution/java/nu/marginalia/execution/ExecutorExportGrpcService.java index b06831e5..eb23a965 100644 --- a/code/execution/java/nu/marginalia/execution/ExecutorExportGrpcService.java +++ b/code/execution/java/nu/marginalia/execution/ExecutorExportGrpcService.java @@ -5,8 +5,10 @@ import com.google.inject.Singleton; import io.grpc.stub.StreamObserver; import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActorControlService; +import nu.marginalia.actor.precession.ExportAllPrecessionActor; import nu.marginalia.actor.task.*; import nu.marginalia.functions.execution.api.*; +import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.server.DiscoverableService; import nu.marginalia.storage.model.FileStorageId; @@ -16,17 +18,21 @@ public class ExecutorExportGrpcService implements DiscoverableService { private final ExecutorActorControlService actorControlService; + private final ServiceConfiguration serviceConfiguration; @Inject - public ExecutorExportGrpcService(ExecutorActorControlService actorControlService) { + public ExecutorExportGrpcService(ExecutorActorControlService actorControlService, ServiceConfiguration serviceConfiguration) { this.actorControlService = actorControlService; + this.serviceConfiguration = serviceConfiguration; } @Override - public void exportAtags(RpcFileStorageId request, StreamObserver responseObserver) { + public void exportAtags(RpcExportRequest request, StreamObserver responseObserver) { try { actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS, - new ExportAtagsActor.Export(FileStorageId.of(request.getFileStorageId())) + new ExportAtagsActor.Export( + request.getMsgId(), + FileStorageId.of(request.getFileStorageId())) ); responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); @@ -55,10 +61,12 @@ public class ExecutorExportGrpcService } @Override - public void exportRssFeeds(RpcFileStorageId request, StreamObserver responseObserver) { + public void exportRssFeeds(RpcExportRequest request, StreamObserver responseObserver) { try { actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS, - new ExportFeedsActor.Export(FileStorageId.of(request.getFileStorageId())) + new ExportFeedsActor.Export( + request.getMsgId(), + FileStorageId.of(request.getFileStorageId())) ); responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); @@ -69,10 +77,10 @@ public class ExecutorExportGrpcService } @Override - public void exportTermFrequencies(RpcFileStorageId request, StreamObserver responseObserver) { + public void exportTermFrequencies(RpcExportRequest request, StreamObserver responseObserver) { try { actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES, - new ExportTermFreqActor.Export(FileStorageId.of(request.getFileStorageId())) + new ExportTermFreqActor.Export(request.getMsgId(), FileStorageId.of(request.getFileStorageId())) ); responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); @@ -109,4 +117,48 @@ public class ExecutorExportGrpcService } } + @Override + public void exportAllAtags(Empty request, StreamObserver responseObserver) { + if (serviceConfiguration.node() != 1) { + responseObserver.onError(new IllegalArgumentException("Export all atags is only available on node 1")); + } + try { + actorControlService.startFrom(ExecutorActor.PREC_EXPORT_ALL, + new ExportAllPrecessionActor.Initial(ExportAllPrecessionActor.ExportTask.ATAGS) + ); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportAllFeeds(Empty request, StreamObserver responseObserver) { + try { + actorControlService.startFrom(ExecutorActor.PREC_EXPORT_ALL, + new ExportAllPrecessionActor.Initial(ExportAllPrecessionActor.ExportTask.FEEDS) + ); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportAllTfreqs(Empty request, StreamObserver responseObserver) { + try { + actorControlService.startFrom(ExecutorActor.PREC_EXPORT_ALL, + new ExportAllPrecessionActor.Initial(ExportAllPrecessionActor.ExportTask.TFREQ) + ); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } } diff --git a/code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java b/code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java index c5d7ed12..588541da 100644 --- a/code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java +++ b/code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java @@ -14,6 +14,7 @@ import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.ProcessConfigurationModule; import nu.marginalia.process.ProcessMainClass; +import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.ServiceDiscoveryModule; import org.slf4j.Logger; @@ -28,6 +29,7 @@ public class ExportTasksMain extends ProcessMainClass { private final SampleDataExporter sampleDataExporter; private final TermFrequencyExporter termFrequencyExporter; private final WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator; + private final ProcessHeartbeat heartbeat; public static void main(String[] args) throws Exception { @@ -58,7 +60,8 @@ public class ExportTasksMain extends ProcessMainClass { FeedExporter feedExporter, SampleDataExporter sampleDataExporter, TermFrequencyExporter termFrequencyExporter, - Gson gson, WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator) + Gson gson, + WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator, ProcessHeartbeat heartbeat) { super(messageQueueFactory, config, gson, ProcessInboxNames.EXPORT_TASK_INBOX); this.atagExporter = atagExporter; @@ -66,15 +69,37 @@ public class ExportTasksMain extends ProcessMainClass { this.sampleDataExporter = sampleDataExporter; this.termFrequencyExporter = termFrequencyExporter; this.websiteAdjacenciesCalculator = websiteAdjacenciesCalculator; + this.heartbeat = heartbeat; + } + + enum ProcessSteps { + RUN, + END } private void run(ExportTaskRequest request) throws Exception { - switch (request.task) { - case ATAGS: atagExporter.export(request.crawlId, request.destId); break; - case FEEDS: feedExporter.export(request.crawlId, request.destId); break; - case TERM_FREQ: termFrequencyExporter.export(request.crawlId, request.destId); break; - case SAMPLE_DATA: sampleDataExporter.export(request.crawlId, request.destId, request.size, request.name); break; - case ADJACENCIES: websiteAdjacenciesCalculator.export(); break; + try (var hb = heartbeat.createProcessTaskHeartbeat(ProcessSteps.class, request.task.toString())) { + hb.progress(ProcessSteps.RUN); + + switch (request.task) { + case ATAGS: + atagExporter.export(request.crawlId, request.destId); + break; + case FEEDS: + feedExporter.export(request.crawlId, request.destId); + break; + case TERM_FREQ: + termFrequencyExporter.export(request.crawlId, request.destId); + break; + case SAMPLE_DATA: + sampleDataExporter.export(request.crawlId, request.destId, request.size, request.name); + break; + case ADJACENCIES: + websiteAdjacenciesCalculator.export(); + break; + } + + hb.progress(ProcessSteps.RUN); } } diff --git a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java index cd0b3f2e..7a89a354 100644 --- a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java +++ b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java @@ -304,7 +304,7 @@ public class ControlNodeActionsService { return ""; } - private Object exportFromCrawlData(Request req, Response rsp) { + private Object exportFromCrawlData(Request req, Response rsp) throws Exception { String exportType = req.queryParams("exportType"); FileStorageId source = parseSourceFileStorageId(req.queryParams("source")); diff --git a/code/services-core/control-service/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java b/code/services-core/control-service/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java index ed1fc0f4..de095d1c 100644 --- a/code/services-core/control-service/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java +++ b/code/services-core/control-service/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java @@ -7,6 +7,7 @@ import nu.marginalia.control.actor.ControlActor; import nu.marginalia.control.actor.ControlActorService; import nu.marginalia.db.DomainTypes; import nu.marginalia.executor.client.ExecutorClient; +import nu.marginalia.executor.client.ExecutorExportClient; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.nodecfg.NodeConfigurationService; @@ -29,6 +30,7 @@ public class ControlSysActionsService { private final NodeConfigurationService nodeConfigurationService; private final FileStorageService fileStorageService; private final ExecutorClient executorClient; + private final ExecutorExportClient exportClient; @Inject public ControlSysActionsService(MessageQueueFactory mqFactory, @@ -38,7 +40,7 @@ public class ControlSysActionsService { ControlActorService controlActorService, NodeConfigurationService nodeConfigurationService, FileStorageService fileStorageService, - ExecutorClient executorClient) + ExecutorClient executorClient, ExecutorExportClient exportClient) { this.apiOutbox = createApiOutbox(mqFactory); this.eventLog = eventLog; @@ -48,6 +50,7 @@ public class ControlSysActionsService { this.nodeConfigurationService = nodeConfigurationService; this.fileStorageService = fileStorageService; this.executorClient = executorClient; + this.exportClient = exportClient; } /** This is a hack to get around the fact that the API service is not a core service @@ -65,6 +68,7 @@ public class ControlSysActionsService { Spark.get("/actions", this::actionsModel, actionsView::render); Spark.post("/actions/recalculate-adjacencies-graph", this::calculateAdjacencies, Redirects.redirectToOverview); + Spark.post("/actions/export-all", this::exportAll, Redirects.redirectToOverview); Spark.post("/actions/reindex-all", this::reindexAll, Redirects.redirectToOverview); Spark.post("/actions/reprocess-all", this::reprocessAll, Redirects.redirectToOverview); Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview); @@ -76,6 +80,23 @@ public class ControlSysActionsService { } } + private Object exportAll(Request request, Response response) { + String exportType = request.queryParams("exportType"); + + switch (exportType) { + case "atags": + exportClient.exportAllAtags(); + break; + case "feeds": + exportClient.exportAllFeeds(); + break; + default: + throw new IllegalArgumentException("Unknown export type: " + exportType); + } + + return ""; + } + private Object actionsModel(Request request, Response response) { try { List> eligibleNodes = new ArrayList<>(); diff --git a/code/services-core/control-service/resources/templates/control/sys/sys-actions.hdb b/code/services-core/control-service/resources/templates/control/sys/sys-actions.hdb index 28edc6f4..34c6fb00 100644 --- a/code/services-core/control-service/resources/templates/control/sys/sys-actions.hdb +++ b/code/services-core/control-service/resources/templates/control/sys/sys-actions.hdb @@ -53,6 +53,43 @@ +
+

+ +

+
+
+ This will trigger an export job from all nodes in succession +
+
+
+ + +
+
+ + +
+ + +
+
+