(control) Reindex-all actor

This commit is contained in:
Viktor Lofgren 2023-11-28 16:41:09 +01:00
parent ff3ceb981e
commit 347fe6b7be
6 changed files with 133 additions and 8 deletions

View File

@ -77,8 +77,8 @@ public class IndexClient extends AbstractDynamicClient {
return super.get(ctx, node, "/is-blocked", Boolean.class);
}
public void triggerRepartition(int node) throws Exception {
messageQueueFactory.sendSingleShotRequest(
public long triggerRepartition(int node) throws Exception {
return messageQueueFactory.sendSingleShotRequest(
ServiceId.Index.withNode(node),
IndexMqEndpoints.INDEX_REPARTITION,
null

View File

@ -45,7 +45,7 @@ public class MessageQueueFactory {
/** Send a request to the specified inbox with a dummy reply inbox,
* do not wait for a response.
*/
public void sendSingleShotRequest(String inboxName, String function, @Nullable String payload) throws Exception {
persistence.sendNewMessage(inboxName, null, null, function, payload, null);
public long sendSingleShotRequest(String inboxName, String function, @Nullable String payload) throws Exception {
return persistence.sendNewMessage(inboxName, null, null, function, payload, null);
}
}

View File

@ -10,5 +10,9 @@ public enum MqMessageState {
/** The message processing has failed */
ERR,
/** The message did not reach a terminal state within the TTL */
DEAD
DEAD;
public boolean isTerminal() {
return this == OK || this == ERR || this == DEAD;
}
}

View File

@ -3,6 +3,7 @@ package nu.marginalia.control.actor;
public enum ControlActor {
MONITOR_MESSAGE_QUEUE,
REINDEX_ALL,
REBALANCE;
public String id() {

View File

@ -8,7 +8,7 @@ import nu.marginalia.actor.ActorStateMachine;
import nu.marginalia.actor.prototype.ActorPrototype;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.control.actor.monitor.MessageQueueMonitorActor;
import nu.marginalia.control.actor.rebalance.RebalanceActor;
import nu.marginalia.control.actor.precession.ReindexAllActor;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.service.control.ServiceEventLog;
@ -23,6 +23,7 @@ import java.util.stream.Collectors;
@Singleton
public class ControlActorService {
private final ServiceEventLog eventLog;
private final ReindexAllActor reindexAllActor;
private final Gson gson;
private final MessageQueueFactory messageQueueFactory;
public Map<ControlActor, ActorStateMachine> stateMachines = new HashMap<>();
@ -32,16 +33,17 @@ public class ControlActorService {
public ControlActorService(MessageQueueFactory messageQueueFactory,
BaseServiceParams baseServiceParams,
MessageQueueMonitorActor messageQueueMonitor,
RebalanceActor rebalanceActor
ReindexAllActor reindexAllActor
) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
this.reindexAllActor = reindexAllActor;
this.gson = GsonFactory.get();
this.node = baseServiceParams.configuration.node();
register(ControlActor.MONITOR_MESSAGE_QUEUE, messageQueueMonitor);
// register(ControlActor.REBALANCE, rebalanceActor);
register(ControlActor.REINDEX_ALL, reindexAllActor);
}
private void register(ControlActor process, ActorPrototype graph) {

View File

@ -0,0 +1,118 @@
package nu.marginalia.control.actor.precession;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import java.sql.SQLException;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
@Singleton
public class ReindexAllActor extends RecordActorPrototype {
private final MqPersistence persistence;
private final IndexClient indexClient;
private final NodeConfigurationService nodeConfigurationService;
public record Initial() implements ActorStep {}
@Resume(behavior=ActorResumeBehavior.RETRY)
public record ReindexNode(int node, long msgId) implements ActorStep {
public ReindexNode(int node) { this(node, -1L); }
}
public record AdvanceNode(int node) implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
PrecessionNodes precessionNodes = new PrecessionNodes();
return switch (self) {
case Initial i -> {
var id = precessionNodes.first();
if (id.isPresent()) {
yield new ReindexNode(id.getAsInt());
}
else {
yield new End();
}
}
case ReindexNode(int node, long msgId) when msgId < 0 -> new ReindexNode(node, indexClient.triggerRepartition(node));
case ReindexNode(int node, long msgId) -> {
while (!isMessageTerminal(msgId)) {
TimeUnit.SECONDS.sleep(10);
}
yield new AdvanceNode(node);
}
case AdvanceNode(int node) -> {
var id = precessionNodes.next(node);
if (id.isPresent())
yield new ReindexNode(id.getAsInt());
else
yield new End();
}
default -> new Error();
};
}
private boolean isMessageTerminal(long msgId) throws SQLException {
return persistence.getMessage(msgId).state().isTerminal();
}
@Inject
public ReindexAllActor(Gson gson,
MqPersistence persistence,
IndexClient indexClient, NodeConfigurationService nodeConfigurationService)
{
super(gson);
this.persistence = persistence;
this.indexClient = indexClient;
this.nodeConfigurationService = nodeConfigurationService;
}
@Override
public String describe() {
return "Triggeres a cascade of reindex instructions across each node included in the precession";
}
private class PrecessionNodes {
private final int[] nodes;
private PrecessionNodes() throws SQLException {
nodes = nodeConfigurationService.getAll().stream()
.filter(NodeConfiguration::includeInPrecession)
.mapToInt(NodeConfiguration::node)
.sorted()
.toArray();
}
public OptionalInt first() {
if (nodes.length == 0)
return OptionalInt.empty();
else
return OptionalInt.of(nodes[0]);
}
public OptionalInt next(int current) {
for (int i = 0; i < nodes.length - 1 && nodes[i] <= current; i++) {
if (nodes[i] == current) {
return OptionalInt.of(nodes[i + 1]);
}
}
return OptionalInt.empty();
}
}
}