From c4dd9a0547b5c443b20b6a46c5050ce1bd12ea6f Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Sun, 16 Jul 2023 11:58:47 +0200 Subject: [PATCH] (control) Use MQFSMs to monitor and spawn processes when messages are sent to them --- .../mq/persistence/MqPersistence.java | 63 ++++++++++++++-- .../control/model/ControlProcess.java | 5 +- .../control/process/ControlProcesses.java | 7 +- .../process/ConverterMonitorProcess.java | 73 +++++++++++++++++++ .../control/process/LoaderMonitorProcess.java | 73 +++++++++++++++++++ .../process/ReconvertAndLoadProcess.java | 42 +++++------ 6 files changed, 230 insertions(+), 33 deletions(-) create mode 100644 code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ConverterMonitorProcess.java create mode 100644 code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/LoaderMonitorProcess.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java index 4f2cc564..5d6511f4 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java @@ -191,6 +191,48 @@ public class MqPersistence { } } + /** Return up to n unprocessed messages from the specified inbox that are in states 'NEW' or 'ACK' */ + public Collection eavesdrop(String inboxName, int n) throws SQLException { + try (var conn = dataSource.getConnection(); + var queryStmt = conn.prepareStatement(""" + SELECT + ID, + RELATED_ID, + FUNCTION, + PAYLOAD, + STATE, + SENDER_INBOX IS NOT NULL AS EXPECTS_RESPONSE + FROM MESSAGE_QUEUE + WHERE STATE IN ('NEW', 'ACK') + AND RECIPIENT_INBOX=? + LIMIT ? + """) + ) { + queryStmt.setString(1, inboxName); + queryStmt.setInt(2, n); + var rs = queryStmt.executeQuery(); + + List messages = new ArrayList<>(n); + + while (rs.next()) { + long msgId = rs.getLong("ID"); + long relatedId = rs.getLong("RELATED_ID"); + + String function = rs.getString("FUNCTION"); + String payload = rs.getString("PAYLOAD"); + + MqMessageState state = MqMessageState.valueOf(rs.getString("STATE")); + boolean expectsResponse = rs.getBoolean("EXPECTS_RESPONSE"); + + var msg = new MqMessage(msgId, relatedId, function, payload, state, expectsResponse); + + messages.add(msg); + } + + return messages; + } + +} /** Marks unclaimed messages addressed to this inbox with instanceUUID and tick, * then returns these messages. */ @@ -205,7 +247,14 @@ public class MqPersistence { // Then fetch the messages that were marked try (var conn = dataSource.getConnection(); var queryStmt = conn.prepareStatement(""" - SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX FROM MESSAGE_QUEUE + SELECT + ID, + RELATED_ID, + FUNCTION, + PAYLOAD, + STATE, + SENDER_INBOX IS NOT NULL AS EXPECTS_RESPONSE + FROM MESSAGE_QUEUE WHERE OWNER_INSTANCE=? AND OWNER_TICK=? """) ) { @@ -216,14 +265,14 @@ public class MqPersistence { List messages = new ArrayList<>(expected); while (rs.next()) { - long msgId = rs.getLong(1); - long relatedId = rs.getLong(2); + long msgId = rs.getLong("ID"); + long relatedId = rs.getLong("RELATED_ID"); - String function = rs.getString(3); - String payload = rs.getString(4); + String function = rs.getString("FUNCTION"); + String payload = rs.getString("PAYLOAD"); - MqMessageState state = MqMessageState.valueOf(rs.getString(5)); - boolean expectsResponse = rs.getBoolean(6); + MqMessageState state = MqMessageState.valueOf(rs.getString("STATE")); + boolean expectsResponse = rs.getBoolean("EXPECTS_RESPONSE"); var msg = new MqMessage(msgId, relatedId, function, payload, state, expectsResponse); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java index b7db26db..6cdc219a 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java @@ -2,7 +2,10 @@ package nu.marginalia.control.model; public enum ControlProcess { REPARTITION_REINDEX, - RECONVERT_LOAD; + RECONVERT_LOAD, + CONVERTER_MONITOR, + LOADER_MONITOR + ; public String id() { return "fsm:" + name().toLowerCase(); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java index 404bd273..eb5eaef7 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java @@ -31,13 +31,18 @@ public class ControlProcesses { GsonFactory gsonFactory, BaseServiceParams baseServiceParams, RepartitionReindexProcess repartitionReindexProcess, - ReconvertAndLoadProcess reconvertAndLoadProcess + ReconvertAndLoadProcess reconvertAndLoadProcess, + ConverterMonitorProcess converterMonitorProcess, + LoaderMonitorProcess loaderMonitorProcess ) { this.messageQueueFactory = messageQueueFactory; this.eventLog = baseServiceParams.eventLog; this.gson = gsonFactory.get(); + register(ControlProcess.REPARTITION_REINDEX, repartitionReindexProcess); register(ControlProcess.RECONVERT_LOAD, reconvertAndLoadProcess); + register(ControlProcess.CONVERTER_MONITOR, converterMonitorProcess); + register(ControlProcess.LOADER_MONITOR, loaderMonitorProcess); } private void register(ControlProcess process, AbstractStateGraph graph) { diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ConverterMonitorProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ConverterMonitorProcess.java new file mode 100644 index 00000000..a1c0258f --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ConverterMonitorProcess.java @@ -0,0 +1,73 @@ +package nu.marginalia.control.process; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.converting.mqapi.ConverterInboxNames; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.mqsm.graph.ResumeBehavior; + +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +@Singleton +public class ConverterMonitorProcess extends AbstractStateGraph { + + private final MqPersistence persistence; + private final ProcessService processService; + public static final String INITIAL = "INITIAL"; + public static final String CHECK = "CHECK"; + public static final String RUN = "RUN"; + public static final String END = "END"; + + public static final int MAX_ATTEMPTS = 3; + public static final String inboxName = ConverterInboxNames.CONVERTER_INBOX; + public static final ProcessService.ProcessId processId = ProcessService.ProcessId.CONVERTER; + + @Inject + public ConverterMonitorProcess(StateFactory stateFactory, + MqPersistence persistence, + ProcessService processService) { + super(stateFactory); + this.persistence = persistence; + this.processService = processService; + } + + @GraphState(name = INITIAL, next = CHECK) + public void init() { + + } + + @GraphState(name = CHECK, resume = ResumeBehavior.RETRY) + public void check() throws SQLException, InterruptedException { + + for (;;) { + var messages = persistence.eavesdrop(inboxName, 1); + + if (messages.isEmpty() && !processService.isRunning(processId)) { + TimeUnit.SECONDS.sleep(5); + } else { + transition(RUN, 0); + } + } + } + + @GraphState(name = RUN) + public void run(Integer attempts) throws Exception { + try { + processService.trigger(processId); + } + catch (Exception e) { + if (attempts < MAX_ATTEMPTS) { + transition(RUN, attempts + 1); + } + else throw e; + } + + transition(CHECK); + } + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/LoaderMonitorProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/LoaderMonitorProcess.java new file mode 100644 index 00000000..813c7da7 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/LoaderMonitorProcess.java @@ -0,0 +1,73 @@ +package nu.marginalia.control.process; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.converting.mqapi.ConverterInboxNames; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.mqsm.graph.ResumeBehavior; + +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +@Singleton +public class LoaderMonitorProcess extends AbstractStateGraph { + + private final MqPersistence persistence; + private final ProcessService processService; + public static final String INITIAL = "INITIAL"; + public static final String CHECK = "CHECK"; + public static final String RUN = "RUN"; + public static final String END = "END"; + + public static final int MAX_ATTEMPTS = 1; + public static final String inboxName = ConverterInboxNames.LOADER_INBOX; + public static final ProcessService.ProcessId processId = ProcessService.ProcessId.LOADER; + + @Inject + public LoaderMonitorProcess(StateFactory stateFactory, + MqPersistence persistence, + ProcessService processService) { + super(stateFactory); + this.persistence = persistence; + this.processService = processService; + } + + @GraphState(name = INITIAL, next = CHECK) + public void init() { + + } + + @GraphState(name = CHECK, resume = ResumeBehavior.RETRY) + public void check() throws SQLException, InterruptedException { + + for (;;) { + var messages = persistence.eavesdrop(inboxName, 1); + + if (messages.isEmpty() && !processService.isRunning(processId)) { + TimeUnit.SECONDS.sleep(5); + } else { + transition(RUN, 0); + } + } + } + + @GraphState(name = RUN) + public void run(Integer attempts) throws Exception { + try { + processService.trigger(processId); + } + catch (Exception e) { + if (attempts < MAX_ATTEMPTS) { + transition(RUN, attempts + 1); + } + else throw e; + } + + transition(CHECK); + } + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java index b72876a9..1c1439f9 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java @@ -105,14 +105,6 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph { var request = new ConvertRequest(message.crawlStorageId, processedArea.id()); long id = mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)); - Executors.defaultThreadFactory().newThread(() -> { - try { - processService.trigger(ProcessService.ProcessId.CONVERTER); - } catch (Exception e) { - throw new RuntimeException(e); - } - }).start(); - return message .withProcessedStorageId(processedArea.id()) .withConverterMsgId(id); @@ -134,14 +126,6 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph { var request = new LoadRequest(message.processedStorageId); long id = mqLoaderOutbox.sendAsync(LoadRequest.class.getSimpleName(), gson.toJson(request)); - Executors.defaultThreadFactory().newThread(() -> { - try { - processService.trigger(ProcessService.ProcessId.LOADER); - } catch (Exception e) { - throw new RuntimeException(e); - } - }).start(); - return message.withLoaderMsgId(id); } @@ -155,23 +139,33 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph { } public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception { - + if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { + error("Process " + processId + " did not launch"); + } for (;;) { try { return outbox.waitResponse(id, 1, TimeUnit.SECONDS); } catch (TimeoutException ex) { - if (!processService.isRunning(processId)) { - try { - return outbox.waitResponse(id, 10, TimeUnit.SECONDS); - } - catch (TimeoutException ex2) { - error("Process " + processId + " is not running"); - } + if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { + error("Process " + processId + " died and did not re-launch"); } } } + } + public boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException { + + // Wait for process to start + long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30); + while (System.currentTimeMillis() < deadline) { + if (processService.isRunning(processId)) + return true; + + TimeUnit.SECONDS.sleep(1); + } + + return false; } // @GraphState(name = MOVE_INDEX_FILES, next = RELOAD_LEXICON, resume = ResumeBehavior.ERROR)