(export) Add export actors to precession

Adding a tracking message to the export actor means it's possible to run them in a precession.

Adding a new precession actor, and some GUI components for triggering exports.

The change also adds a heartbeat to the export process.
This commit is contained in:
Viktor Lofgren 2024-11-26 15:07:03 +01:00
parent b9842b57e0
commit d4bce13a03
13 changed files with 401 additions and 63 deletions

View File

@ -3,6 +3,7 @@ package nu.marginalia.executor.client;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.functions.execution.api.*; import nu.marginalia.functions.execution.api.*;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.client.GrpcChannelPoolFactory; import nu.marginalia.service.client.GrpcChannelPoolFactory;
import nu.marginalia.service.client.GrpcMultiNodeChannelPool; import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServiceKey;
@ -11,6 +12,8 @@ import nu.marginalia.storage.model.FileStorageId;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.time.Duration;
import static nu.marginalia.functions.execution.api.ExecutorExportApiGrpc.ExecutorExportApiBlockingStub; import static nu.marginalia.functions.execution.api.ExecutorExportApiGrpc.ExecutorExportApiBlockingStub;
@Singleton @Singleton
@ -18,23 +21,33 @@ public class ExecutorExportClient {
private final GrpcMultiNodeChannelPool<ExecutorExportApiBlockingStub> channelPool; private final GrpcMultiNodeChannelPool<ExecutorExportApiBlockingStub> channelPool;
private static final Logger logger = LoggerFactory.getLogger(ExecutorExportClient.class); private static final Logger logger = LoggerFactory.getLogger(ExecutorExportClient.class);
private final MqPersistence persistence;
@Inject @Inject
public ExecutorExportClient(GrpcChannelPoolFactory grpcChannelPoolFactory) public ExecutorExportClient(GrpcChannelPoolFactory grpcChannelPoolFactory, MqPersistence persistence)
{ {
this.channelPool = grpcChannelPoolFactory this.channelPool = grpcChannelPoolFactory
.createMulti( .createMulti(
ServiceKey.forGrpcApi(ExecutorExportApiGrpc.class, ServicePartition.multi()), ServiceKey.forGrpcApi(ExecutorExportApiGrpc.class, ServicePartition.multi()),
ExecutorExportApiGrpc::newBlockingStub); ExecutorExportApiGrpc::newBlockingStub);
this.persistence = persistence;
} }
long createTrackingTokenMsg(String task, int node, Duration ttl) throws Exception {
return persistence.sendNewMessage("task-tracking[" + node + "]", "export-client", null, task, "", ttl);
}
public long exportAtags(int node, FileStorageId fid) throws Exception {
long msgId = createTrackingTokenMsg("atags", node, Duration.ofHours(6));
public void exportAtags(int node, FileStorageId fid) {
channelPool.call(ExecutorExportApiBlockingStub::exportAtags) channelPool.call(ExecutorExportApiBlockingStub::exportAtags)
.forNode(node) .forNode(node)
.run(RpcFileStorageId.newBuilder() .run(RpcExportRequest.newBuilder()
.setFileStorageId(fid.id()) .setFileStorageId(fid.id())
.setMsgId(msgId)
.build()); .build());
return msgId;
} }
public void exportSampleData(int node, FileStorageId fid, int size, String name) { public void exportSampleData(int node, FileStorageId fid, int size, String name) {
channelPool.call(ExecutorExportApiBlockingStub::exportSampleData) channelPool.call(ExecutorExportApiBlockingStub::exportSampleData)
.forNode(node) .forNode(node)
@ -45,20 +58,26 @@ public class ExecutorExportClient {
.build()); .build());
} }
public void exportRssFeeds(int node, FileStorageId fid) { public long exportRssFeeds(int node, FileStorageId fid) throws Exception {
long msgId = createTrackingTokenMsg("rss", node, Duration.ofHours(6));
channelPool.call(ExecutorExportApiBlockingStub::exportRssFeeds) channelPool.call(ExecutorExportApiBlockingStub::exportRssFeeds)
.forNode(node) .forNode(node)
.run(RpcFileStorageId.newBuilder() .run(RpcExportRequest.newBuilder()
.setFileStorageId(fid.id()) .setFileStorageId(fid.id())
.setMsgId(msgId)
.build()); .build());
return msgId;
} }
public void exportTermFrequencies(int node, FileStorageId fid) { public long exportTermFrequencies(int node, FileStorageId fid) throws Exception {
long msgId = createTrackingTokenMsg("tfreq", node, Duration.ofHours(6));
channelPool.call(ExecutorExportApiBlockingStub::exportTermFrequencies) channelPool.call(ExecutorExportApiBlockingStub::exportTermFrequencies)
.forNode(node) .forNode(node)
.run(RpcFileStorageId.newBuilder() .run(RpcExportRequest.newBuilder()
.setFileStorageId(fid.id()) .setFileStorageId(fid.id())
.setMsgId(msgId)
.build()); .build());
return msgId;
} }
public void exportData(int node) { public void exportData(int node) {
@ -77,4 +96,21 @@ public class ExecutorExportClient {
} }
public void exportAllAtags() {
channelPool.call(ExecutorExportApiBlockingStub::exportAllAtags)
.forNode(1)
.run(Empty.getDefaultInstance());
}
public void exportAllFeeds() {
channelPool.call(ExecutorExportApiBlockingStub::exportAllFeeds)
.forNode(1)
.run(Empty.getDefaultInstance());
}
public void exportAllTfreqs() {
channelPool.call(ExecutorExportApiBlockingStub::exportAllTfreqs)
.forNode(1)
.run(Empty.getDefaultInstance());
}
} }

View File

@ -39,15 +39,20 @@ service ExecutorSideloadApi {
} }
service ExecutorExportApi { service ExecutorExportApi {
rpc exportAtags(RpcFileStorageId) returns (Empty) {} rpc exportAtags(RpcExportRequest) returns (Empty) {}
rpc exportSegmentationModel(RpcExportSegmentationModel) returns (Empty) {} rpc exportSegmentationModel(RpcExportSegmentationModel) returns (Empty) {}
rpc exportSampleData(RpcExportSampleData) returns (Empty) {} rpc exportSampleData(RpcExportSampleData) returns (Empty) {}
rpc exportRssFeeds(RpcFileStorageId) returns (Empty) {} rpc exportRssFeeds(RpcExportRequest) returns (Empty) {}
rpc exportTermFrequencies(RpcFileStorageId) returns (Empty) {} rpc exportTermFrequencies(RpcExportRequest) returns (Empty) {}
rpc exportData(Empty) returns (Empty) {} rpc exportData(Empty) returns (Empty) {}
rpc exportAllAtags(Empty) returns (Empty) {}
rpc exportAllFeeds(Empty) returns (Empty) {}
rpc exportAllTfreqs(Empty) returns (Empty) {}
} }
message Empty {} message Empty {}
message RpcFsmName { message RpcFsmName {
string actorName = 1; string actorName = 1;
} }
@ -57,6 +62,10 @@ message RpcProcessId {
message RpcFileStorageId { message RpcFileStorageId {
int64 fileStorageId = 1; int64 fileStorageId = 1;
} }
message RpcExportRequest {
int64 fileStorageId = 1;
int64 msgId = 2;
}
message RpcFileStorageIdWithDomainName { message RpcFileStorageIdWithDomainName {
int64 fileStorageId = 1; int64 fileStorageId = 1;
string targetDomainName = 2; string targetDomainName = 2;

View File

@ -5,6 +5,8 @@ import nu.marginalia.nodecfg.model.NodeProfile;
import java.util.Set; import java.util.Set;
public enum ExecutorActor { public enum ExecutorActor {
PREC_EXPORT_ALL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),

View File

@ -3,6 +3,7 @@ package nu.marginalia.actor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.FileStorageMonitorActor; import nu.marginalia.actor.monitor.FileStorageMonitorActor;
import nu.marginalia.actor.precession.ExportAllPrecessionActor;
import nu.marginalia.actor.proc.*; import nu.marginalia.actor.proc.*;
import nu.marginalia.actor.prototype.ActorPrototype; import nu.marginalia.actor.prototype.ActorPrototype;
import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.prototype.RecordActorPrototype;
@ -13,6 +14,7 @@ import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration; import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.BaseServiceParams;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -37,6 +39,7 @@ public class ExecutorActorControlService {
@Inject @Inject
public ExecutorActorControlService(MessageQueueFactory messageQueueFactory, public ExecutorActorControlService(MessageQueueFactory messageQueueFactory,
ServiceConfiguration serviceConfiguration,
NodeConfigurationService configurationService, NodeConfigurationService configurationService,
BaseServiceParams baseServiceParams, BaseServiceParams baseServiceParams,
ConvertActor convertActor, ConvertActor convertActor,
@ -63,6 +66,7 @@ public class ExecutorActorControlService {
DownloadSampleActor downloadSampleActor, DownloadSampleActor downloadSampleActor,
ScrapeFeedsActor scrapeFeedsActor, ScrapeFeedsActor scrapeFeedsActor,
ExecutorActorStateMachines stateMachines, ExecutorActorStateMachines stateMachines,
ExportAllPrecessionActor exportAllPrecessionActor,
UpdateRssActor updateRssActor) throws SQLException { UpdateRssActor updateRssActor) throws SQLException {
this.messageQueueFactory = messageQueueFactory; this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog; this.eventLog = baseServiceParams.eventLog;
@ -102,6 +106,10 @@ public class ExecutorActorControlService {
register(ExecutorActor.SCRAPE_FEEDS, scrapeFeedsActor); register(ExecutorActor.SCRAPE_FEEDS, scrapeFeedsActor);
register(ExecutorActor.UPDATE_RSS, updateRssActor); register(ExecutorActor.UPDATE_RSS, updateRssActor);
if (serviceConfiguration.node() == 1) {
register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor);
}
} }
private void register(ExecutorActor process, RecordActorPrototype graph) { private void register(ExecutorActor process, RecordActorPrototype graph) {

View File

@ -0,0 +1,116 @@
package nu.marginalia.actor.precession;
import com.google.gson.Gson;
import com.google.inject.Inject;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.executor.client.ExecutorExportClient;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageType;
import java.time.Duration;
import java.util.Comparator;
import java.util.Optional;
public class ExportAllPrecessionActor extends RecordActorPrototype {
private final NodeConfigurationService nodeConfigurationService;
private final ExecutorExportClient exportClient;
private final FileStorageService fileStorageService;
private final MqPersistence persistence;
@Inject
public ExportAllPrecessionActor(Gson gson,
NodeConfigurationService nodeConfigurationService,
ExecutorExportClient exportClient,
FileStorageService fileStorageService,
MqPersistence persistence)
{
super(gson);
this.nodeConfigurationService = nodeConfigurationService;
this.exportClient = exportClient;
this.fileStorageService = fileStorageService;
this.persistence = persistence;
}
public enum ExportTask {
FEEDS,
ATAGS,
TFREQ
}
public record Initial(ExportTask task) implements ActorStep {}
public record Export(int nodeId, ExportTask task, long msgId) implements ActorStep {
public Export(int nodeId, ExportTask task) {
this(nodeId, task, -1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial(ExportTask task) -> {
var firstNode = nextNodeId(-1);
if (firstNode.isEmpty())
yield new Error("No nodes included in precession");
else
yield new Export(firstNode.get(), task);
}
case Export(int nodeId, ExportTask task, long msgId) when msgId < 0 -> {
var activeStorages = fileStorageService.getActiveFileStorages(nodeId, FileStorageType.CRAWL_DATA);
if (activeStorages.isEmpty()) {
yield new Error("Node " + nodeId + " has no active file storage");
}
var activeCrawlStorageId = activeStorages.getFirst();
long trackingMsgId = switch(task) {
case ATAGS -> exportClient.exportAtags(nodeId, activeCrawlStorageId);
case TFREQ -> exportClient.exportTermFrequencies(nodeId, activeCrawlStorageId);
case FEEDS -> exportClient.exportRssFeeds(nodeId, activeCrawlStorageId);
};
yield new Export(nodeId, task, trackingMsgId);
}
case Export(int nodeId, ExportTask task, long msgId) -> {
for (; ; ) {
var msg = persistence.getMessage(msgId);
if (!msg.state().isTerminal()) {
Thread.sleep(Duration.ofSeconds(30));
continue;
}
if (msg.state() == MqMessageState.OK) {
var nextNode = nextNodeId(nodeId);
if (nextNode.isEmpty()) {
yield new End();
} else {
yield new Export(nextNode.get(), task);
}
} else {
yield new Error("Export failed for node " + nodeId);
}
}
}
default -> new Error("Unknown state");
};
}
private Optional<Integer> nextNodeId(int currentNodeId) {
return nodeConfigurationService.getAll()
.stream().sorted(Comparator.comparing(NodeConfiguration::node))
.filter(node -> node.node() > currentNodeId)
.filter(NodeConfiguration::includeInPrecession)
.map(NodeConfiguration::node)
.findFirst();
}
@Override
public String describe() {
return "Runs an export job on each index node included in the precession";
}
}

View File

@ -7,6 +7,7 @@ import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessService;
@ -25,42 +26,51 @@ public class ExportAtagsActor extends RecordActorPrototype {
private final ActorProcessWatcher processWatcher; private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox; private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final MqPersistence persistence;
public record Export(FileStorageId crawlId) implements ActorStep {} public record Export(long responseMsgId, FileStorageId crawlId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { public record Run(long responseMsgId,FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId) { public Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId) {
this(crawlId, destId, -1); this(responseMsgId, crawlId, destId, -1);
} }
} }
public record Fail(long responseMsgId, String message) implements ActorStep {}
@Override @Override
public ActorStep transition(ActorStep self) throws Exception { public ActorStep transition(ActorStep self) throws Exception {
return switch(self) { return switch(self) {
case Export(FileStorageId crawlId) -> { case Export(long responseMsgId, FileStorageId crawlId) -> {
persistence.updateMessageState(responseMsgId, MqMessageState.ACK);
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atags-export", "Atags " + LocalDateTime.now()); var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atags-export", "Atags " + LocalDateTime.now());
if (storage == null) yield new Error("Bad storage id"); if (storage == null) yield new Fail(responseMsgId, "Bad storage id");
yield new Run(crawlId, storage.id());
yield new Run(responseMsgId, crawlId, storage.id());
} }
case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW); storageService.setFileStorageState(destId, FileStorageState.NEW);
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.atags(crawlId, destId)); long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.atags(crawlId, destId));
yield new Run(crawlId, destId, newMsgId); yield new Run(responseMsgId, crawlId, destId, newMsgId);
} }
case Run(_, FileStorageId destId, long msgId) -> { case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed"); yield new Fail(responseMsgId, "Exporter failed");
} }
else { else {
storageService.setFileStorageState(destId, FileStorageState.UNSET); storageService.setFileStorageState(destId, FileStorageState.UNSET);
persistence.updateMessageState(responseMsgId, MqMessageState.OK);
yield new End(); yield new End();
} }
} }
case Fail(long responseMsgId, String message) -> {
persistence.updateMessageState(responseMsgId, MqMessageState.ERR);
yield new Error(message);
}
default -> new Error(); default -> new Error();
}; };
} }
@ -74,11 +84,13 @@ public class ExportAtagsActor extends RecordActorPrototype {
public ExportAtagsActor(Gson gson, public ExportAtagsActor(Gson gson,
FileStorageService storageService, FileStorageService storageService,
ProcessOutboxes processOutboxes, ProcessOutboxes processOutboxes,
MqPersistence persistence,
ActorProcessWatcher processWatcher) ActorProcessWatcher processWatcher)
{ {
super(gson); super(gson);
this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); this.exportTasksOutbox = processOutboxes.getExportTasksOutbox();
this.storageService = storageService; this.storageService = storageService;
this.persistence = persistence;
this.processWatcher = processWatcher; this.processWatcher = processWatcher;
} }

View File

@ -7,6 +7,7 @@ import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessService;
@ -24,43 +25,51 @@ public class ExportFeedsActor extends RecordActorPrototype {
private final FileStorageService storageService; private final FileStorageService storageService;
private final ActorProcessWatcher processWatcher; private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox; private final MqOutbox exportTasksOutbox;
private final MqPersistence persistence;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
public record Export(FileStorageId crawlId) implements ActorStep {} public record Export(long responseMsgId, FileStorageId crawlId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { public record Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId) { public Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId) {
this(crawlId, destId, -1); this(responseMsgId, crawlId, destId, -1);
} }
} }
public record Fail(long responseMsgId, String message) implements ActorStep {}
@Override @Override
public ActorStep transition(ActorStep self) throws Exception { public ActorStep transition(ActorStep self) throws Exception {
return switch(self) { return switch(self) {
case Export(FileStorageId crawlId) -> { case Export(long responseMsgId, FileStorageId crawlId) -> {
persistence.updateMessageState(responseMsgId, MqMessageState.ACK);
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "feed-export", "Feeds " + LocalDateTime.now()); var storage = storageService.allocateStorage(FileStorageType.EXPORT, "feed-export", "Feeds " + LocalDateTime.now());
if (storage == null) yield new Error("Bad storage id"); if (storage == null) yield new Fail(responseMsgId, "Bad storage id");
yield new Run(crawlId, storage.id()); yield new Run(responseMsgId, crawlId, storage.id());
} }
case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW); storageService.setFileStorageState(destId, FileStorageState.NEW);
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.feeds(crawlId, destId)); long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.feeds(crawlId, destId));
yield new Run(crawlId, destId, newMsgId); yield new Run(responseMsgId, crawlId, destId, newMsgId);
} }
case Run(_, FileStorageId destId, long msgId) -> { case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed"); yield new Fail(responseMsgId, "Exporter failed");
} }
else { else {
storageService.setFileStorageState(destId, FileStorageState.UNSET); storageService.setFileStorageState(destId, FileStorageState.UNSET);
persistence.updateMessageState(responseMsgId, MqMessageState.OK);
yield new End(); yield new End();
} }
} }
case Fail(long responseMsgId, String message) -> {
persistence.updateMessageState(responseMsgId, MqMessageState.ERR);
yield new Error(message);
}
default -> new Error(); default -> new Error();
}; };
} }
@ -75,12 +84,13 @@ public class ExportFeedsActor extends RecordActorPrototype {
public ExportFeedsActor(Gson gson, public ExportFeedsActor(Gson gson,
FileStorageService storageService, FileStorageService storageService,
ActorProcessWatcher processWatcher, ActorProcessWatcher processWatcher,
ProcessOutboxes outboxes) ProcessOutboxes outboxes, MqPersistence persistence)
{ {
super(gson); super(gson);
this.storageService = storageService; this.storageService = storageService;
this.processWatcher = processWatcher; this.processWatcher = processWatcher;
this.exportTasksOutbox = outboxes.getExportTasksOutbox(); this.exportTasksOutbox = outboxes.getExportTasksOutbox();
this.persistence = persistence;
} }
} }

View File

@ -7,6 +7,7 @@ import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.tasks.ExportTaskRequest; import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService; import nu.marginalia.process.ProcessService;
@ -25,41 +26,48 @@ public class ExportTermFreqActor extends RecordActorPrototype {
private final ActorProcessWatcher processWatcher; private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox; private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final MqPersistence persistence;
public record Export(FileStorageId crawlId) implements ActorStep {} public record Export(long responseMsgId, FileStorageId crawlId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { public record Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId) { public Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId) {
this(crawlId, destId, -1); this(responseMsgId, crawlId, destId, -1);
} }
} }
public record Fail(long responseMsgId, String message) implements ActorStep {}
@Override @Override
public ActorStep transition(ActorStep self) throws Exception { public ActorStep transition(ActorStep self) throws Exception {
return switch(self) { return switch(self) {
case Export(FileStorageId crawlId) -> { case Export(long responseMsgId, FileStorageId crawlId) -> {
persistence.updateMessageState(responseMsgId, MqMessageState.ACK);
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq", "Term Frequencies " + LocalDateTime.now()); var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq", "Term Frequencies " + LocalDateTime.now());
if (storage == null) yield new Error("Bad storage id"); if (storage == null) yield new Fail(responseMsgId, "Bad storage id");
yield new Run(crawlId, storage.id()); yield new Run(responseMsgId, crawlId, storage.id());
} }
case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { case Run(long responseMsgId, FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW); storageService.setFileStorageState(destId, FileStorageState.NEW);
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.termFreq(crawlId, destId)); long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.termFreq(crawlId, destId));
yield new Run(crawlId, destId, newMsgId); yield new Run(responseMsgId, crawlId, destId, newMsgId);
} }
case Run(_, FileStorageId destId, long msgId) -> { case Run(long responseMsgId, _, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId); storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed"); yield new Fail(responseMsgId, "Exporter failed");
} }
else { else {
storageService.setFileStorageState(destId, FileStorageState.UNSET); storageService.setFileStorageState(destId, FileStorageState.UNSET);
persistence.updateMessageState(responseMsgId, MqMessageState.OK);
yield new End(); yield new End();
} }
} }
case Fail(long responseMsgId, String message) -> {
persistence.updateMessageState(responseMsgId, MqMessageState.ERR);
yield new Error(message);
}
default -> new Error(); default -> new Error();
}; };
@ -75,10 +83,12 @@ public class ExportTermFreqActor extends RecordActorPrototype {
public ExportTermFreqActor(Gson gson, public ExportTermFreqActor(Gson gson,
FileStorageService storageService, FileStorageService storageService,
ProcessOutboxes processOutboxes, ProcessOutboxes processOutboxes,
MqPersistence persistence,
ActorProcessWatcher processWatcher) ActorProcessWatcher processWatcher)
{ {
super(gson); super(gson);
this.storageService = storageService; this.storageService = storageService;
this.persistence = persistence;
this.processWatcher = processWatcher; this.processWatcher = processWatcher;
this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); this.exportTasksOutbox = processOutboxes.getExportTasksOutbox();
} }

View File

@ -5,8 +5,10 @@ import com.google.inject.Singleton;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.precession.ExportAllPrecessionActor;
import nu.marginalia.actor.task.*; import nu.marginalia.actor.task.*;
import nu.marginalia.functions.execution.api.*; import nu.marginalia.functions.execution.api.*;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.server.DiscoverableService; import nu.marginalia.service.server.DiscoverableService;
import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageId;
@ -16,17 +18,21 @@ public class ExecutorExportGrpcService
implements DiscoverableService implements DiscoverableService
{ {
private final ExecutorActorControlService actorControlService; private final ExecutorActorControlService actorControlService;
private final ServiceConfiguration serviceConfiguration;
@Inject @Inject
public ExecutorExportGrpcService(ExecutorActorControlService actorControlService) { public ExecutorExportGrpcService(ExecutorActorControlService actorControlService, ServiceConfiguration serviceConfiguration) {
this.actorControlService = actorControlService; this.actorControlService = actorControlService;
this.serviceConfiguration = serviceConfiguration;
} }
@Override @Override
public void exportAtags(RpcFileStorageId request, StreamObserver<Empty> responseObserver) { public void exportAtags(RpcExportRequest request, StreamObserver<Empty> responseObserver) {
try { try {
actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS, actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS,
new ExportAtagsActor.Export(FileStorageId.of(request.getFileStorageId())) new ExportAtagsActor.Export(
request.getMsgId(),
FileStorageId.of(request.getFileStorageId()))
); );
responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
@ -55,10 +61,12 @@ public class ExecutorExportGrpcService
} }
@Override @Override
public void exportRssFeeds(RpcFileStorageId request, StreamObserver<Empty> responseObserver) { public void exportRssFeeds(RpcExportRequest request, StreamObserver<Empty> responseObserver) {
try { try {
actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS, actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS,
new ExportFeedsActor.Export(FileStorageId.of(request.getFileStorageId())) new ExportFeedsActor.Export(
request.getMsgId(),
FileStorageId.of(request.getFileStorageId()))
); );
responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
@ -69,10 +77,10 @@ public class ExecutorExportGrpcService
} }
@Override @Override
public void exportTermFrequencies(RpcFileStorageId request, StreamObserver<Empty> responseObserver) { public void exportTermFrequencies(RpcExportRequest request, StreamObserver<Empty> responseObserver) {
try { try {
actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES, actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES,
new ExportTermFreqActor.Export(FileStorageId.of(request.getFileStorageId())) new ExportTermFreqActor.Export(request.getMsgId(), FileStorageId.of(request.getFileStorageId()))
); );
responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
@ -109,4 +117,48 @@ public class ExecutorExportGrpcService
} }
} }
@Override
public void exportAllAtags(Empty request, StreamObserver<Empty> responseObserver) {
if (serviceConfiguration.node() != 1) {
responseObserver.onError(new IllegalArgumentException("Export all atags is only available on node 1"));
}
try {
actorControlService.startFrom(ExecutorActor.PREC_EXPORT_ALL,
new ExportAllPrecessionActor.Initial(ExportAllPrecessionActor.ExportTask.ATAGS)
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportAllFeeds(Empty request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.PREC_EXPORT_ALL,
new ExportAllPrecessionActor.Initial(ExportAllPrecessionActor.ExportTask.FEEDS)
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void exportAllTfreqs(Empty request, StreamObserver<Empty> responseObserver) {
try {
actorControlService.startFrom(ExecutorActor.PREC_EXPORT_ALL,
new ExportAllPrecessionActor.Initial(ExportAllPrecessionActor.ExportTask.TFREQ)
);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
} }

View File

@ -14,6 +14,7 @@ import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule; import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule; import nu.marginalia.service.module.ServiceDiscoveryModule;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -28,6 +29,7 @@ public class ExportTasksMain extends ProcessMainClass {
private final SampleDataExporter sampleDataExporter; private final SampleDataExporter sampleDataExporter;
private final TermFrequencyExporter termFrequencyExporter; private final TermFrequencyExporter termFrequencyExporter;
private final WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator; private final WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator;
private final ProcessHeartbeat heartbeat;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -58,7 +60,8 @@ public class ExportTasksMain extends ProcessMainClass {
FeedExporter feedExporter, FeedExporter feedExporter,
SampleDataExporter sampleDataExporter, SampleDataExporter sampleDataExporter,
TermFrequencyExporter termFrequencyExporter, TermFrequencyExporter termFrequencyExporter,
Gson gson, WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator) Gson gson,
WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator, ProcessHeartbeat heartbeat)
{ {
super(messageQueueFactory, config, gson, ProcessInboxNames.EXPORT_TASK_INBOX); super(messageQueueFactory, config, gson, ProcessInboxNames.EXPORT_TASK_INBOX);
this.atagExporter = atagExporter; this.atagExporter = atagExporter;
@ -66,15 +69,37 @@ public class ExportTasksMain extends ProcessMainClass {
this.sampleDataExporter = sampleDataExporter; this.sampleDataExporter = sampleDataExporter;
this.termFrequencyExporter = termFrequencyExporter; this.termFrequencyExporter = termFrequencyExporter;
this.websiteAdjacenciesCalculator = websiteAdjacenciesCalculator; this.websiteAdjacenciesCalculator = websiteAdjacenciesCalculator;
this.heartbeat = heartbeat;
}
enum ProcessSteps {
RUN,
END
} }
private void run(ExportTaskRequest request) throws Exception { private void run(ExportTaskRequest request) throws Exception {
switch (request.task) { try (var hb = heartbeat.createProcessTaskHeartbeat(ProcessSteps.class, request.task.toString())) {
case ATAGS: atagExporter.export(request.crawlId, request.destId); break; hb.progress(ProcessSteps.RUN);
case FEEDS: feedExporter.export(request.crawlId, request.destId); break;
case TERM_FREQ: termFrequencyExporter.export(request.crawlId, request.destId); break; switch (request.task) {
case SAMPLE_DATA: sampleDataExporter.export(request.crawlId, request.destId, request.size, request.name); break; case ATAGS:
case ADJACENCIES: websiteAdjacenciesCalculator.export(); break; atagExporter.export(request.crawlId, request.destId);
break;
case FEEDS:
feedExporter.export(request.crawlId, request.destId);
break;
case TERM_FREQ:
termFrequencyExporter.export(request.crawlId, request.destId);
break;
case SAMPLE_DATA:
sampleDataExporter.export(request.crawlId, request.destId, request.size, request.name);
break;
case ADJACENCIES:
websiteAdjacenciesCalculator.export();
break;
}
hb.progress(ProcessSteps.RUN);
} }
} }

View File

@ -304,7 +304,7 @@ public class ControlNodeActionsService {
return ""; return "";
} }
private Object exportFromCrawlData(Request req, Response rsp) { private Object exportFromCrawlData(Request req, Response rsp) throws Exception {
String exportType = req.queryParams("exportType"); String exportType = req.queryParams("exportType");
FileStorageId source = parseSourceFileStorageId(req.queryParams("source")); FileStorageId source = parseSourceFileStorageId(req.queryParams("source"));

View File

@ -7,6 +7,7 @@ import nu.marginalia.control.actor.ControlActor;
import nu.marginalia.control.actor.ControlActorService; import nu.marginalia.control.actor.ControlActorService;
import nu.marginalia.db.DomainTypes; import nu.marginalia.db.DomainTypes;
import nu.marginalia.executor.client.ExecutorClient; import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.executor.client.ExecutorExportClient;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.NodeConfigurationService;
@ -29,6 +30,7 @@ public class ControlSysActionsService {
private final NodeConfigurationService nodeConfigurationService; private final NodeConfigurationService nodeConfigurationService;
private final FileStorageService fileStorageService; private final FileStorageService fileStorageService;
private final ExecutorClient executorClient; private final ExecutorClient executorClient;
private final ExecutorExportClient exportClient;
@Inject @Inject
public ControlSysActionsService(MessageQueueFactory mqFactory, public ControlSysActionsService(MessageQueueFactory mqFactory,
@ -38,7 +40,7 @@ public class ControlSysActionsService {
ControlActorService controlActorService, ControlActorService controlActorService,
NodeConfigurationService nodeConfigurationService, NodeConfigurationService nodeConfigurationService,
FileStorageService fileStorageService, FileStorageService fileStorageService,
ExecutorClient executorClient) ExecutorClient executorClient, ExecutorExportClient exportClient)
{ {
this.apiOutbox = createApiOutbox(mqFactory); this.apiOutbox = createApiOutbox(mqFactory);
this.eventLog = eventLog; this.eventLog = eventLog;
@ -48,6 +50,7 @@ public class ControlSysActionsService {
this.nodeConfigurationService = nodeConfigurationService; this.nodeConfigurationService = nodeConfigurationService;
this.fileStorageService = fileStorageService; this.fileStorageService = fileStorageService;
this.executorClient = executorClient; this.executorClient = executorClient;
this.exportClient = exportClient;
} }
/** This is a hack to get around the fact that the API service is not a core service /** This is a hack to get around the fact that the API service is not a core service
@ -65,6 +68,7 @@ public class ControlSysActionsService {
Spark.get("/actions", this::actionsModel, actionsView::render); Spark.get("/actions", this::actionsModel, actionsView::render);
Spark.post("/actions/recalculate-adjacencies-graph", this::calculateAdjacencies, Redirects.redirectToOverview); Spark.post("/actions/recalculate-adjacencies-graph", this::calculateAdjacencies, Redirects.redirectToOverview);
Spark.post("/actions/export-all", this::exportAll, Redirects.redirectToOverview);
Spark.post("/actions/reindex-all", this::reindexAll, Redirects.redirectToOverview); Spark.post("/actions/reindex-all", this::reindexAll, Redirects.redirectToOverview);
Spark.post("/actions/reprocess-all", this::reprocessAll, Redirects.redirectToOverview); Spark.post("/actions/reprocess-all", this::reprocessAll, Redirects.redirectToOverview);
Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview); Spark.post("/actions/recrawl-all", this::recrawlAll, Redirects.redirectToOverview);
@ -76,6 +80,23 @@ public class ControlSysActionsService {
} }
} }
private Object exportAll(Request request, Response response) {
String exportType = request.queryParams("exportType");
switch (exportType) {
case "atags":
exportClient.exportAllAtags();
break;
case "feeds":
exportClient.exportAllFeeds();
break;
default:
throw new IllegalArgumentException("Unknown export type: " + exportType);
}
return "";
}
private Object actionsModel(Request request, Response response) { private Object actionsModel(Request request, Response response) {
try { try {
List<Map<String, Object>> eligibleNodes = new ArrayList<>(); List<Map<String, Object>> eligibleNodes = new ArrayList<>();

View File

@ -53,6 +53,43 @@
</div> </div>
</div> </div>
<div class="accordion-item">
<h2 class="accordion-header">
<button class="accordion-button collapsed"
type="button"
data-bs-toggle="collapse"
data-bs-target="#collapseCalculateExportAll"
aria-expanded="false"
aria-controls="collapseCalculateExportAll">
Export From All Nodes
</button>
</h2>
<div id="collapseCalculateExportAll" class="accordion-collapse collapse p-3" data-bs-parent="#accordionActions">
<div class="mb-3">
This will trigger an export job from all nodes in succession
</div>
<form method="post" action="actions/export-all">
<div class="form-check mb-3">
<input class="form-check-input" type="radio" name="exportType" id="exportFeeds" value="feeds">
<label class="form-check-label" for="exportFeeds">
RSS Feeds
</label>
</div>
<div class="form-check mb-3">
<input class="form-check-input" type="radio" name="exportType" id="exportAtags" value="atags">
<label class="form-check-label" for="exportAtags">
Anchor Tags
</label>
</div>
<button
class="btn btn-primary me-md-2"
onclick="return confirm('Confirm export');"
type="submit">
Export</button>
</form>
</div>
</div>
<!-- <!--
<div class="accordion-item"> <div class="accordion-item">
<h2 class="accordion-header"> <h2 class="accordion-header">