From 347fe6b7bef6e7ff77c4225dbd3aa643358ebd26 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Tue, 28 Nov 2023 16:41:09 +0100 Subject: [PATCH] (control) Reindex-all actor --- .../marginalia/index/client/IndexClient.java | 4 +- .../nu/marginalia/mq/MessageQueueFactory.java | 4 +- .../java/nu/marginalia/mq/MqMessageState.java | 6 +- .../control/actor/ControlActor.java | 1 + .../control/actor/ControlActorService.java | 8 +- .../actor/precession/ReindexAllActor.java | 118 ++++++++++++++++++ 6 files changed, 133 insertions(+), 8 deletions(-) create mode 100644 code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReindexAllActor.java diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java index 82624c0f..d114a0e1 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java @@ -77,8 +77,8 @@ public class IndexClient extends AbstractDynamicClient { return super.get(ctx, node, "/is-blocked", Boolean.class); } - public void triggerRepartition(int node) throws Exception { - messageQueueFactory.sendSingleShotRequest( + public long triggerRepartition(int node) throws Exception { + return messageQueueFactory.sendSingleShotRequest( ServiceId.Index.withNode(node), IndexMqEndpoints.INDEX_REPARTITION, null diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java index c0f2ed0e..3f22a7ca 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java @@ -45,7 +45,7 @@ public class MessageQueueFactory { /** Send a request to the specified inbox with a dummy reply inbox, * do not wait for a response. */ - public void sendSingleShotRequest(String inboxName, String function, @Nullable String payload) throws Exception { - persistence.sendNewMessage(inboxName, null, null, function, payload, null); + public long sendSingleShotRequest(String inboxName, String function, @Nullable String payload) throws Exception { + return persistence.sendNewMessage(inboxName, null, null, function, payload, null); } } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MqMessageState.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MqMessageState.java index 94f7411b..da188ffc 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MqMessageState.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MqMessageState.java @@ -10,5 +10,9 @@ public enum MqMessageState { /** The message processing has failed */ ERR, /** The message did not reach a terminal state within the TTL */ - DEAD + DEAD; + + public boolean isTerminal() { + return this == OK || this == ERR || this == DEAD; + } } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java index 2705de60..8b179b44 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java @@ -3,6 +3,7 @@ package nu.marginalia.control.actor; public enum ControlActor { MONITOR_MESSAGE_QUEUE, + REINDEX_ALL, REBALANCE; public String id() { diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java index 50864be7..6dc854ec 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java @@ -8,7 +8,7 @@ import nu.marginalia.actor.ActorStateMachine; import nu.marginalia.actor.prototype.ActorPrototype; import nu.marginalia.actor.state.ActorStateInstance; import nu.marginalia.control.actor.monitor.MessageQueueMonitorActor; -import nu.marginalia.control.actor.rebalance.RebalanceActor; +import nu.marginalia.control.actor.precession.ReindexAllActor; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.service.control.ServiceEventLog; @@ -23,6 +23,7 @@ import java.util.stream.Collectors; @Singleton public class ControlActorService { private final ServiceEventLog eventLog; + private final ReindexAllActor reindexAllActor; private final Gson gson; private final MessageQueueFactory messageQueueFactory; public Map stateMachines = new HashMap<>(); @@ -32,16 +33,17 @@ public class ControlActorService { public ControlActorService(MessageQueueFactory messageQueueFactory, BaseServiceParams baseServiceParams, MessageQueueMonitorActor messageQueueMonitor, - RebalanceActor rebalanceActor + ReindexAllActor reindexAllActor ) { this.messageQueueFactory = messageQueueFactory; this.eventLog = baseServiceParams.eventLog; + this.reindexAllActor = reindexAllActor; this.gson = GsonFactory.get(); this.node = baseServiceParams.configuration.node(); register(ControlActor.MONITOR_MESSAGE_QUEUE, messageQueueMonitor); -// register(ControlActor.REBALANCE, rebalanceActor); + register(ControlActor.REINDEX_ALL, reindexAllActor); } private void register(ControlActor process, ActorPrototype graph) { diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReindexAllActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReindexAllActor.java new file mode 100644 index 00000000..bde12eac --- /dev/null +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/precession/ReindexAllActor.java @@ -0,0 +1,118 @@ +package nu.marginalia.control.actor.precession; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.actor.prototype.RecordActorPrototype; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.actor.state.Resume; +import nu.marginalia.index.client.IndexClient; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.nodecfg.NodeConfigurationService; +import nu.marginalia.nodecfg.model.NodeConfiguration; + +import java.sql.SQLException; +import java.util.OptionalInt; +import java.util.concurrent.TimeUnit; + +@Singleton +public class ReindexAllActor extends RecordActorPrototype { + + private final MqPersistence persistence; + private final IndexClient indexClient; + private final NodeConfigurationService nodeConfigurationService; + + + public record Initial() implements ActorStep {} + @Resume(behavior=ActorResumeBehavior.RETRY) + public record ReindexNode(int node, long msgId) implements ActorStep { + public ReindexNode(int node) { this(node, -1L); } + + } + public record AdvanceNode(int node) implements ActorStep {} + + + @Override + public ActorStep transition(ActorStep self) throws Exception { + PrecessionNodes precessionNodes = new PrecessionNodes(); + + return switch (self) { + case Initial i -> { + var id = precessionNodes.first(); + if (id.isPresent()) { + yield new ReindexNode(id.getAsInt()); + } + else { + yield new End(); + } + } + case ReindexNode(int node, long msgId) when msgId < 0 -> new ReindexNode(node, indexClient.triggerRepartition(node)); + case ReindexNode(int node, long msgId) -> { + while (!isMessageTerminal(msgId)) { + TimeUnit.SECONDS.sleep(10); + } + + yield new AdvanceNode(node); + } + case AdvanceNode(int node) -> { + var id = precessionNodes.next(node); + + if (id.isPresent()) + yield new ReindexNode(id.getAsInt()); + else + yield new End(); + } + default -> new Error(); + }; + } + + private boolean isMessageTerminal(long msgId) throws SQLException { + return persistence.getMessage(msgId).state().isTerminal(); + } + + @Inject + public ReindexAllActor(Gson gson, + MqPersistence persistence, + IndexClient indexClient, NodeConfigurationService nodeConfigurationService) + { + super(gson); + this.persistence = persistence; + this.indexClient = indexClient; + this.nodeConfigurationService = nodeConfigurationService; + } + + @Override + public String describe() { + return "Triggeres a cascade of reindex instructions across each node included in the precession"; + } + + private class PrecessionNodes { + private final int[] nodes; + + private PrecessionNodes() throws SQLException { + nodes = nodeConfigurationService.getAll().stream() + .filter(NodeConfiguration::includeInPrecession) + .mapToInt(NodeConfiguration::node) + .sorted() + .toArray(); + } + + public OptionalInt first() { + if (nodes.length == 0) + return OptionalInt.empty(); + else + return OptionalInt.of(nodes[0]); + } + + public OptionalInt next(int current) { + for (int i = 0; i < nodes.length - 1 && nodes[i] <= current; i++) { + if (nodes[i] == current) { + return OptionalInt.of(nodes[i + 1]); + } + } + + return OptionalInt.empty(); + } + } +}