diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java index 4e5cb8cb..ffbe168c 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java @@ -34,8 +34,23 @@ public class ExecutorRemoteActorFactory { } public interface ExecutorRemoteActorIf { - boolean trigger(T object) throws Exception; - String getState(); + + /** Trigger the remote actor with the given object. The object will be serialized to JSON and sent to the + * remote actor. If the remote actor does not respond after a time period, a timeout will occur and a negative + * message id will be returned. + * + * @param object The message to send to the remote actot + * @return The message id of the response message, or a negative number if the remote actor did not respond + * within a reasonable timeout seconds. + */ + long trigger(T object) throws Exception; + + /** Get the last finished state of the actor. + *

+ * The message id of the request initiating the actor must be provided to ensure that + * we don't get a state from a previous run. + */ + String getState(long fromMsgId); } public record CrawlData(FileStorageId storageId, boolean cascadeLoad) {} @@ -58,11 +73,11 @@ class ExecutorRemoteActor implements ExecutorRemoteActorFactory.ExecutorRemot this.triggerFunction = triggerFunction; } - public boolean trigger(T object) throws Exception { + public long trigger(T object) throws Exception { return trigger(gson.toJson(object)); } - public boolean trigger(String payload) throws Exception { + public long trigger(String payload) throws Exception { long id = persistence.sendNewMessage(inboxName, null, null, triggerFunction, payload, null); // Wait for the remote actor to respond to the message @@ -70,19 +85,19 @@ class ExecutorRemoteActor implements ExecutorRemoteActorFactory.ExecutorRemot for (int i = 0; i < 120; i++) { var msg = persistence.getMessage(id); if (msg.state() == MqMessageState.ACK || msg.state() == MqMessageState.OK) - return true; + return id; if (msg.state() == MqMessageState.ERR || msg.state() == MqMessageState.DEAD) - return false; + return -id; TimeUnit.SECONDS.sleep(1); } - return false; // Timeout + return -1; // Timeout } - public String getState() { + public String getState(long fromMsgId) { return persistence - .getHeadMessage(inboxName) + .getHeadMessage(inboxName, fromMsgId) .map(MqMessage::function) .orElse("INITIAL"); } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java index 8b4e9ac5..732aa70c 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java @@ -496,17 +496,21 @@ public class MqPersistence { return gson; } - /** Returns the last message sent to this inbox with a state of 'OK' */ - public Optional getHeadMessage(String inboxName) { + /** Returns the last message sent to this inbox with a state of 'OK' + * with an id greater than or equal to fromMsgId + */ + public Optional getHeadMessage(String inboxName, long fromMsgId) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX FROM MESSAGE_QUEUE - WHERE RECIPIENT_INBOX = ? AND STATE='OK' + WHERE RECIPIENT_INBOX = ? AND STATE='OK' AND ID >= ? ORDER BY ID DESC LIMIT 1 """)) { query.setString(1, inboxName); + query.setLong(2, fromMsgId); + var rs = query.executeQuery(); if (rs.next()) { long msgId = rs.getLong(1); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/RecrawlAllActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/RecrawlAllActor.java index a6733b63..e0268901 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/RecrawlAllActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/RecrawlAllActor.java @@ -25,7 +25,7 @@ public class RecrawlAllActor extends RecordActorPrototype { public record Initial() implements ActorStep {} - public record WaitFinished(int node) implements ActorStep {} + public record WaitFinished(int node, long msgId) implements ActorStep {} @Resume(behavior=ActorResumeBehavior.RETRY) public record Trigger(int node) implements ActorStep {} public record AdvanceNode(int node) implements ActorStep {} @@ -49,17 +49,18 @@ public class RecrawlAllActor extends RecordActorPrototype { var data = new ExecutorRemoteActorFactory.CrawlData(activeFileStorage.get(0), true); - if (remoteActorFactory.createCrawlRemote(node).trigger(data)) { - yield new WaitFinished(node); + long msgId = remoteActorFactory.createCrawlRemote(node).trigger(data); + if (msgId >= 0) { + yield new WaitFinished(node, msgId); } else { yield new AdvanceNode(node); } } - case WaitFinished(int node) -> { + case WaitFinished(int node, long msgId) -> { var remoteActor = remoteActorFactory.createCrawlRemote(node); for (;;) { - var state = remoteActor.getState(); + var state = remoteActor.getState(msgId); if ("END".equals(state) || "ERROR".equals(state)) { break; } @@ -80,8 +81,7 @@ public class RecrawlAllActor extends RecordActorPrototype { public RecrawlAllActor(Gson gson, ExecutorRemoteActorFactory remoteActorFactory, FileStorageService fileStorageService, - PrecessionNodes precessionNodes, - NodeConfigurationService nodeConfigurationService) + PrecessionNodes precessionNodes) { super(gson); this.remoteActorFactory = remoteActorFactory; diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReprocessAllActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReprocessAllActor.java index 416f4d90..d37ecc2a 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReprocessAllActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReprocessAllActor.java @@ -24,7 +24,7 @@ public class ReprocessAllActor extends RecordActorPrototype { public record Initial() implements ActorStep {} - public record WaitFinished(int node) implements ActorStep {} + public record WaitFinished(int node, long msgId) implements ActorStep {} @Resume(behavior=ActorResumeBehavior.RETRY) public record Trigger(int node) implements ActorStep {} public record AdvanceNode(int node) implements ActorStep {} @@ -47,17 +47,18 @@ public class ReprocessAllActor extends RecordActorPrototype { var data = new ExecutorRemoteActorFactory.ConvertAndLoadData(activeFileStorage.get(0)); - if (remoteActorFactory.createConvertAndLoadRemote(node).trigger(data)) { - yield new WaitFinished(node); + long msgId = remoteActorFactory.createConvertAndLoadRemote(node).trigger(data); + if (msgId >= 0) { + yield new WaitFinished(node, msgId); } else { yield new AdvanceNode(node); } } - case WaitFinished(int node) -> { + case WaitFinished(int node, long msgId) -> { var remoteActor = remoteActorFactory.createConvertAndLoadRemote(node); for (;;) { - var state = remoteActor.getState(); + var state = remoteActor.getState(msgId); if ("END".equals(state) || "ERROR".equals(state)) break; TimeUnit.SECONDS.sleep(10);