(control) Reprocess-all actor

This commit is contained in:
Viktor Lofgren 2023-11-28 17:58:48 +01:00
parent 347fe6b7be
commit 4155fbe94c
7 changed files with 258 additions and 12 deletions

View File

@ -0,0 +1,89 @@
package nu.marginalia.executor.client;
import com.google.inject.Inject;
import jakarta.inject.Singleton;
import nu.marginalia.model.gson.GsonFactory;
import com.google.gson.Gson;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.storage.model.FileStorageId;
import java.util.concurrent.TimeUnit;
/** This class permits direct interaction with executor actors. This is inherently a pretty flimsy abstraction,
* and care must be taken to not leak too many implementation details through this interface.
*/
@Singleton
public class ExecutorRemoteActorFactory {
private final MqPersistence persistence;
@Inject
public ExecutorRemoteActorFactory(MqPersistence persistence) {
this.persistence = persistence;
}
/** Create a remote actor for the RecrawlActor */
public ExecutorRemoteActorIf<CrawlData> createCrawlRemote(int node) {
return new ExecutorRemoteActor<>(persistence, "fsm:recrawl:" + node, "INITIAL");
}
/** Create a remote actor for the ConvertAndLoadActor */
public ExecutorRemoteActorIf<ConvertAndLoadData> createConvertAndLoadRemote(int node) {
return new ExecutorRemoteActor<>(persistence, "fsm:convert_and_load:" + node, "INITIAL");
}
public interface ExecutorRemoteActorIf<T> {
boolean trigger(T object) throws Exception;
String getState();
}
public record CrawlData(FileStorageId storageId) {}
public record ConvertAndLoadData(FileStorageId fid) {}
}
class ExecutorRemoteActor<T> implements ExecutorRemoteActorFactory.ExecutorRemoteActorIf<T> {
private final MqPersistence persistence;
private final String inboxName;
private final String triggerFunction;
private static final Gson gson = GsonFactory.get();
ExecutorRemoteActor(MqPersistence persistence,
String inboxName,
String triggerFunction
) {
this.persistence = persistence;
this.inboxName = inboxName;
this.triggerFunction = triggerFunction;
}
public boolean trigger(T object) throws Exception {
return trigger(gson.toJson(object));
}
public boolean trigger(String payload) throws Exception {
long id = persistence.sendNewMessage(inboxName, null, null, triggerFunction, payload, null);
// Wait for the remote actor to respond to the message
for (int i = 0; i < 120; i++) {
var msg = persistence.getMessage(id);
if (msg.state() == MqMessageState.ACK || msg.state() == MqMessageState.OK)
return true;
if (msg.state() == MqMessageState.ERR || msg.state() == MqMessageState.DEAD)
return false;
TimeUnit.SECONDS.sleep(1);
}
return false; // Timeout
}
public String getState() {
return persistence
.getHeadMessage(inboxName)
.map(MqMessage::function)
.orElse("INITIAL");
}
}

View File

@ -495,4 +495,35 @@ public class MqPersistence {
public Gson getGson() {
return gson;
}
/** Returns the last message sent to this inbox with a state of 'OK' */
public Optional<MqMessage> getHeadMessage(String inboxName) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX
FROM MESSAGE_QUEUE
WHERE RECIPIENT_INBOX = ? AND STATE='OK'
ORDER BY ID DESC LIMIT 1
"""))
{
query.setString(1, inboxName);
var rs = query.executeQuery();
if (rs.next()) {
long msgId = rs.getLong(1);
long relatedId = rs.getLong(2);
String function = rs.getString(3);
String payload = rs.getString(4);
MqMessageState state = MqMessageState.valueOf(rs.getString(5));
boolean expectsResponse = rs.getBoolean(6);
return Optional.of(new MqMessage(msgId, relatedId, function, payload, state, expectsResponse));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return Optional.empty();
}
}

View File

@ -4,6 +4,7 @@ public enum ControlActor {
MONITOR_MESSAGE_QUEUE,
REINDEX_ALL,
REPROCESS_ALL,
REBALANCE;
public String id() {

View File

@ -9,6 +9,7 @@ import nu.marginalia.actor.prototype.ActorPrototype;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.control.actor.monitor.MessageQueueMonitorActor;
import nu.marginalia.control.actor.precession.ReindexAllActor;
import nu.marginalia.control.actor.precession.ReprocessAllActor;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.service.control.ServiceEventLog;
@ -33,7 +34,8 @@ public class ControlActorService {
public ControlActorService(MessageQueueFactory messageQueueFactory,
BaseServiceParams baseServiceParams,
MessageQueueMonitorActor messageQueueMonitor,
ReindexAllActor reindexAllActor
ReindexAllActor reindexAllActor,
ReprocessAllActor reprocessAllActor
) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
@ -44,6 +46,7 @@ public class ControlActorService {
register(ControlActor.MONITOR_MESSAGE_QUEUE, messageQueueMonitor);
register(ControlActor.REINDEX_ALL, reindexAllActor);
register(ControlActor.REPROCESS_ALL, reprocessAllActor);
}
private void register(ControlActor process, ActorPrototype graph) {

View File

@ -25,6 +25,7 @@ public class ReindexAllActor extends RecordActorPrototype {
public record Initial() implements ActorStep {}
@Resume(behavior=ActorResumeBehavior.RETRY)
public record ReindexNode(int node, long msgId) implements ActorStep {
public ReindexNode(int node) { this(node, -1L); }

View File

@ -0,0 +1,132 @@
package nu.marginalia.control.actor.precession;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.executor.client.ExecutorRemoteActorFactory;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageType;
import java.sql.SQLException;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
@Singleton
public class ReprocessAllActor extends RecordActorPrototype {
private final ExecutorRemoteActorFactory remoteActorFactory;
private final FileStorageService fileStorageService;
private final NodeConfigurationService nodeConfigurationService;
public record Initial() implements ActorStep {}
public record WaitFinished(int node) implements ActorStep {}
@Resume(behavior=ActorResumeBehavior.RETRY)
public record Trigger(int node) implements ActorStep {}
public record AdvanceNode(int node) implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
PrecessionNodes precessionNodes = new PrecessionNodes();
return switch (self) {
case Initial i -> {
var id = precessionNodes.first();
if (id.isPresent()) {
yield new Trigger(id.getAsInt());
}
else {
yield new End();
}
}
case Trigger(int node) -> {
var activeFileStorage = fileStorageService.getActiveFileStorages(node, FileStorageType.CRAWL_DATA);
if (activeFileStorage.size() != 1) {
yield new AdvanceNode(node);
}
var data = new ExecutorRemoteActorFactory.ConvertAndLoadData(activeFileStorage.get(0));
if (remoteActorFactory.createConvertAndLoadRemote(node).trigger(data)) {
yield new WaitFinished(node);
}
else {
yield new AdvanceNode(node);
}
}
case WaitFinished(int node) -> {
var remoteActor = remoteActorFactory.createConvertAndLoadRemote(node);
for (;;) {
var state = remoteActor.getState();
if ("END".equals(state) || "ERROR".equals(state))
break;
TimeUnit.SECONDS.sleep(10);
}
yield new AdvanceNode(node);
}
case AdvanceNode(int node) -> {
var id = precessionNodes.next(node);
if (id.isPresent())
yield new Trigger(id.getAsInt());
else
yield new End();
}
default -> new Error();
};
}
@Inject
public ReprocessAllActor(Gson gson,
ExecutorRemoteActorFactory remoteActorFactory,
FileStorageService fileStorageService,
NodeConfigurationService nodeConfigurationService)
{
super(gson);
this.remoteActorFactory = remoteActorFactory;
this.fileStorageService = fileStorageService;
this.nodeConfigurationService = nodeConfigurationService;
}
@Override
public String describe() {
return "Triggers a cascade of reindex instructions across each node included in the precession";
}
private class PrecessionNodes {
private final int[] nodes;
private PrecessionNodes() throws SQLException {
nodes = nodeConfigurationService.getAll().stream()
.filter(NodeConfiguration::includeInPrecession)
.mapToInt(NodeConfiguration::node)
.sorted()
.toArray();
}
public OptionalInt first() {
if (nodes.length == 0)
return OptionalInt.empty();
else
return OptionalInt.of(nodes[0]);
}
public OptionalInt next(int current) {
for (int i = 0; i < nodes.length - 1 && nodes[i] <= current; i++) {
if (nodes[i] == current) {
return OptionalInt.of(nodes[i + 1]);
}
}
return OptionalInt.empty();
}
}
}

View File

@ -40,18 +40,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
// STATES
public static final String INITIAL = "INITIAL";
public static final String RECONVERT = "RECONVERT";
public static final String RECONVERT_WAIT = "RECONVERT-WAIT";
public static final String LOAD = "LOAD";
public static final String BACKUP = "BACKUP";
public static final String REPARTITION = "REPARTITION";
public static final String REINDEX_FWD = "REINDEX_FWD";
public static final String REINDEX_FULL = "REINDEX_FULL";
public static final String REINDEX_PRIO = "REINDEX_PRIO";
public static final String SWITCH_OVER = "SWITCH-OVER";
public static final String END = "END";
private final ActorProcessWatcher processWatcher;
private final MqOutbox mqConverterOutbox;
private final MqOutbox mqLoaderOutbox;