diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java new file mode 100644 index 00000000..07299853 --- /dev/null +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java @@ -0,0 +1,89 @@ +package nu.marginalia.executor.client; + +import com.google.inject.Inject; +import jakarta.inject.Singleton; +import nu.marginalia.model.gson.GsonFactory; +import com.google.gson.Gson; +import nu.marginalia.mq.MqMessage; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.storage.model.FileStorageId; + +import java.util.concurrent.TimeUnit; + +/** This class permits direct interaction with executor actors. This is inherently a pretty flimsy abstraction, + * and care must be taken to not leak too many implementation details through this interface. + */ +@Singleton +public class ExecutorRemoteActorFactory { + private final MqPersistence persistence; + + @Inject + public ExecutorRemoteActorFactory(MqPersistence persistence) { + this.persistence = persistence; + } + + /** Create a remote actor for the RecrawlActor */ + public ExecutorRemoteActorIf createCrawlRemote(int node) { + return new ExecutorRemoteActor<>(persistence, "fsm:recrawl:" + node, "INITIAL"); + } + + /** Create a remote actor for the ConvertAndLoadActor */ + public ExecutorRemoteActorIf createConvertAndLoadRemote(int node) { + return new ExecutorRemoteActor<>(persistence, "fsm:convert_and_load:" + node, "INITIAL"); + } + + public interface ExecutorRemoteActorIf { + boolean trigger(T object) throws Exception; + String getState(); + } + + public record CrawlData(FileStorageId storageId) {} + public record ConvertAndLoadData(FileStorageId fid) {} +} + + +class ExecutorRemoteActor implements ExecutorRemoteActorFactory.ExecutorRemoteActorIf { + private final MqPersistence persistence; + private final String inboxName; + private final String triggerFunction; + private static final Gson gson = GsonFactory.get(); + + ExecutorRemoteActor(MqPersistence persistence, + String inboxName, + String triggerFunction + ) { + this.persistence = persistence; + this.inboxName = inboxName; + this.triggerFunction = triggerFunction; + } + + public boolean trigger(T object) throws Exception { + return trigger(gson.toJson(object)); + } + + public boolean trigger(String payload) throws Exception { + long id = persistence.sendNewMessage(inboxName, null, null, triggerFunction, payload, null); + + // Wait for the remote actor to respond to the message + + for (int i = 0; i < 120; i++) { + var msg = persistence.getMessage(id); + if (msg.state() == MqMessageState.ACK || msg.state() == MqMessageState.OK) + return true; + if (msg.state() == MqMessageState.ERR || msg.state() == MqMessageState.DEAD) + return false; + + TimeUnit.SECONDS.sleep(1); + } + + return false; // Timeout + } + + public String getState() { + return persistence + .getHeadMessage(inboxName) + .map(MqMessage::function) + .orElse("INITIAL"); + } +} \ No newline at end of file diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java index 12c4c987..8b4e9ac5 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java @@ -495,4 +495,35 @@ public class MqPersistence { public Gson getGson() { return gson; } + + /** Returns the last message sent to this inbox with a state of 'OK' */ + public Optional getHeadMessage(String inboxName) { + try (var conn = dataSource.getConnection(); + var query = conn.prepareStatement(""" + SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX + FROM MESSAGE_QUEUE + WHERE RECIPIENT_INBOX = ? AND STATE='OK' + ORDER BY ID DESC LIMIT 1 + """)) + { + query.setString(1, inboxName); + var rs = query.executeQuery(); + if (rs.next()) { + long msgId = rs.getLong(1); + long relatedId = rs.getLong(2); + + String function = rs.getString(3); + String payload = rs.getString(4); + + MqMessageState state = MqMessageState.valueOf(rs.getString(5)); + boolean expectsResponse = rs.getBoolean(6); + + return Optional.of(new MqMessage(msgId, relatedId, function, payload, state, expectsResponse)); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return Optional.empty(); + } } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java index 8b179b44..dfa038e2 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java @@ -4,6 +4,7 @@ public enum ControlActor { MONITOR_MESSAGE_QUEUE, REINDEX_ALL, + REPROCESS_ALL, REBALANCE; public String id() { diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java index 6dc854ec..cd829901 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java @@ -9,6 +9,7 @@ import nu.marginalia.actor.prototype.ActorPrototype; import nu.marginalia.actor.state.ActorStateInstance; import nu.marginalia.control.actor.monitor.MessageQueueMonitorActor; import nu.marginalia.control.actor.precession.ReindexAllActor; +import nu.marginalia.control.actor.precession.ReprocessAllActor; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.service.control.ServiceEventLog; @@ -33,7 +34,8 @@ public class ControlActorService { public ControlActorService(MessageQueueFactory messageQueueFactory, BaseServiceParams baseServiceParams, MessageQueueMonitorActor messageQueueMonitor, - ReindexAllActor reindexAllActor + ReindexAllActor reindexAllActor, + ReprocessAllActor reprocessAllActor ) { this.messageQueueFactory = messageQueueFactory; this.eventLog = baseServiceParams.eventLog; @@ -44,6 +46,7 @@ public class ControlActorService { register(ControlActor.MONITOR_MESSAGE_QUEUE, messageQueueMonitor); register(ControlActor.REINDEX_ALL, reindexAllActor); + register(ControlActor.REPROCESS_ALL, reprocessAllActor); } private void register(ControlActor process, ActorPrototype graph) { diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReindexAllActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReindexAllActor.java index bde12eac..dcf5241e 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReindexAllActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReindexAllActor.java @@ -25,6 +25,7 @@ public class ReindexAllActor extends RecordActorPrototype { public record Initial() implements ActorStep {} + @Resume(behavior=ActorResumeBehavior.RETRY) public record ReindexNode(int node, long msgId) implements ActorStep { public ReindexNode(int node) { this(node, -1L); } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReprocessAllActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReprocessAllActor.java new file mode 100644 index 00000000..7a1bb1be --- /dev/null +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReprocessAllActor.java @@ -0,0 +1,132 @@ +package nu.marginalia.control.actor.precession; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.actor.prototype.RecordActorPrototype; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.actor.state.Resume; +import nu.marginalia.executor.client.ExecutorRemoteActorFactory; +import nu.marginalia.nodecfg.NodeConfigurationService; +import nu.marginalia.nodecfg.model.NodeConfiguration; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageType; + +import java.sql.SQLException; +import java.util.OptionalInt; +import java.util.concurrent.TimeUnit; + +@Singleton +public class ReprocessAllActor extends RecordActorPrototype { + private final ExecutorRemoteActorFactory remoteActorFactory; + + private final FileStorageService fileStorageService; + private final NodeConfigurationService nodeConfigurationService; + + + public record Initial() implements ActorStep {} + + public record WaitFinished(int node) implements ActorStep {} + @Resume(behavior=ActorResumeBehavior.RETRY) + public record Trigger(int node) implements ActorStep {} + public record AdvanceNode(int node) implements ActorStep {} + + + @Override + public ActorStep transition(ActorStep self) throws Exception { + PrecessionNodes precessionNodes = new PrecessionNodes(); + + return switch (self) { + case Initial i -> { + var id = precessionNodes.first(); + if (id.isPresent()) { + yield new Trigger(id.getAsInt()); + } + else { + yield new End(); + } + } + case Trigger(int node) -> { + var activeFileStorage = fileStorageService.getActiveFileStorages(node, FileStorageType.CRAWL_DATA); + if (activeFileStorage.size() != 1) { + yield new AdvanceNode(node); + } + + var data = new ExecutorRemoteActorFactory.ConvertAndLoadData(activeFileStorage.get(0)); + + if (remoteActorFactory.createConvertAndLoadRemote(node).trigger(data)) { + yield new WaitFinished(node); + } + else { + yield new AdvanceNode(node); + } + } + case WaitFinished(int node) -> { + var remoteActor = remoteActorFactory.createConvertAndLoadRemote(node); + for (;;) { + var state = remoteActor.getState(); + if ("END".equals(state) || "ERROR".equals(state)) + break; + TimeUnit.SECONDS.sleep(10); + } + yield new AdvanceNode(node); + } + case AdvanceNode(int node) -> { + var id = precessionNodes.next(node); + + if (id.isPresent()) + yield new Trigger(id.getAsInt()); + else + yield new End(); + } + default -> new Error(); + }; + } + + @Inject + public ReprocessAllActor(Gson gson, + ExecutorRemoteActorFactory remoteActorFactory, + FileStorageService fileStorageService, + NodeConfigurationService nodeConfigurationService) + { + super(gson); + this.remoteActorFactory = remoteActorFactory; + this.fileStorageService = fileStorageService; + this.nodeConfigurationService = nodeConfigurationService; + } + + @Override + public String describe() { + return "Triggers a cascade of reindex instructions across each node included in the precession"; + } + + private class PrecessionNodes { + private final int[] nodes; + + private PrecessionNodes() throws SQLException { + nodes = nodeConfigurationService.getAll().stream() + .filter(NodeConfiguration::includeInPrecession) + .mapToInt(NodeConfiguration::node) + .sorted() + .toArray(); + } + + public OptionalInt first() { + if (nodes.length == 0) + return OptionalInt.empty(); + else + return OptionalInt.of(nodes[0]); + } + + public OptionalInt next(int current) { + for (int i = 0; i < nodes.length - 1 && nodes[i] <= current; i++) { + if (nodes[i] == current) { + return OptionalInt.of(nodes[i + 1]); + } + } + + return OptionalInt.empty(); + } + } +} diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertAndLoadActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertAndLoadActor.java index bbec71d5..d0cb85f7 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertAndLoadActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertAndLoadActor.java @@ -40,18 +40,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype { // STATES - public static final String INITIAL = "INITIAL"; - public static final String RECONVERT = "RECONVERT"; - public static final String RECONVERT_WAIT = "RECONVERT-WAIT"; - public static final String LOAD = "LOAD"; - public static final String BACKUP = "BACKUP"; public static final String REPARTITION = "REPARTITION"; - public static final String REINDEX_FWD = "REINDEX_FWD"; - public static final String REINDEX_FULL = "REINDEX_FULL"; - public static final String REINDEX_PRIO = "REINDEX_PRIO"; - public static final String SWITCH_OVER = "SWITCH-OVER"; - - public static final String END = "END"; private final ActorProcessWatcher processWatcher; private final MqOutbox mqConverterOutbox; private final MqOutbox mqLoaderOutbox;