(control) Use MQFSMs to monitor and spawn processes when messages are sent to them

This commit is contained in:
Viktor Lofgren 2023-07-16 11:58:47 +02:00
parent 5ec10634d8
commit c4dd9a0547
6 changed files with 230 additions and 33 deletions

View File

@ -191,6 +191,48 @@ public class MqPersistence {
} }
} }
/** Return up to n unprocessed messages from the specified inbox that are in states 'NEW' or 'ACK' */
public Collection<MqMessage> eavesdrop(String inboxName, int n) throws SQLException {
try (var conn = dataSource.getConnection();
var queryStmt = conn.prepareStatement("""
SELECT
ID,
RELATED_ID,
FUNCTION,
PAYLOAD,
STATE,
SENDER_INBOX IS NOT NULL AS EXPECTS_RESPONSE
FROM MESSAGE_QUEUE
WHERE STATE IN ('NEW', 'ACK')
AND RECIPIENT_INBOX=?
LIMIT ?
""")
) {
queryStmt.setString(1, inboxName);
queryStmt.setInt(2, n);
var rs = queryStmt.executeQuery();
List<MqMessage> messages = new ArrayList<>(n);
while (rs.next()) {
long msgId = rs.getLong("ID");
long relatedId = rs.getLong("RELATED_ID");
String function = rs.getString("FUNCTION");
String payload = rs.getString("PAYLOAD");
MqMessageState state = MqMessageState.valueOf(rs.getString("STATE"));
boolean expectsResponse = rs.getBoolean("EXPECTS_RESPONSE");
var msg = new MqMessage(msgId, relatedId, function, payload, state, expectsResponse);
messages.add(msg);
}
return messages;
}
}
/** Marks unclaimed messages addressed to this inbox with instanceUUID and tick, /** Marks unclaimed messages addressed to this inbox with instanceUUID and tick,
* then returns these messages. * then returns these messages.
*/ */
@ -205,7 +247,14 @@ public class MqPersistence {
// Then fetch the messages that were marked // Then fetch the messages that were marked
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var queryStmt = conn.prepareStatement(""" var queryStmt = conn.prepareStatement("""
SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX FROM MESSAGE_QUEUE SELECT
ID,
RELATED_ID,
FUNCTION,
PAYLOAD,
STATE,
SENDER_INBOX IS NOT NULL AS EXPECTS_RESPONSE
FROM MESSAGE_QUEUE
WHERE OWNER_INSTANCE=? AND OWNER_TICK=? WHERE OWNER_INSTANCE=? AND OWNER_TICK=?
""") """)
) { ) {
@ -216,14 +265,14 @@ public class MqPersistence {
List<MqMessage> messages = new ArrayList<>(expected); List<MqMessage> messages = new ArrayList<>(expected);
while (rs.next()) { while (rs.next()) {
long msgId = rs.getLong(1); long msgId = rs.getLong("ID");
long relatedId = rs.getLong(2); long relatedId = rs.getLong("RELATED_ID");
String function = rs.getString(3); String function = rs.getString("FUNCTION");
String payload = rs.getString(4); String payload = rs.getString("PAYLOAD");
MqMessageState state = MqMessageState.valueOf(rs.getString(5)); MqMessageState state = MqMessageState.valueOf(rs.getString("STATE"));
boolean expectsResponse = rs.getBoolean(6); boolean expectsResponse = rs.getBoolean("EXPECTS_RESPONSE");
var msg = new MqMessage(msgId, relatedId, function, payload, state, expectsResponse); var msg = new MqMessage(msgId, relatedId, function, payload, state, expectsResponse);

View File

@ -2,7 +2,10 @@ package nu.marginalia.control.model;
public enum ControlProcess { public enum ControlProcess {
REPARTITION_REINDEX, REPARTITION_REINDEX,
RECONVERT_LOAD; RECONVERT_LOAD,
CONVERTER_MONITOR,
LOADER_MONITOR
;
public String id() { public String id() {
return "fsm:" + name().toLowerCase(); return "fsm:" + name().toLowerCase();

View File

@ -31,13 +31,18 @@ public class ControlProcesses {
GsonFactory gsonFactory, GsonFactory gsonFactory,
BaseServiceParams baseServiceParams, BaseServiceParams baseServiceParams,
RepartitionReindexProcess repartitionReindexProcess, RepartitionReindexProcess repartitionReindexProcess,
ReconvertAndLoadProcess reconvertAndLoadProcess ReconvertAndLoadProcess reconvertAndLoadProcess,
ConverterMonitorProcess converterMonitorProcess,
LoaderMonitorProcess loaderMonitorProcess
) { ) {
this.messageQueueFactory = messageQueueFactory; this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog; this.eventLog = baseServiceParams.eventLog;
this.gson = gsonFactory.get(); this.gson = gsonFactory.get();
register(ControlProcess.REPARTITION_REINDEX, repartitionReindexProcess); register(ControlProcess.REPARTITION_REINDEX, repartitionReindexProcess);
register(ControlProcess.RECONVERT_LOAD, reconvertAndLoadProcess); register(ControlProcess.RECONVERT_LOAD, reconvertAndLoadProcess);
register(ControlProcess.CONVERTER_MONITOR, converterMonitorProcess);
register(ControlProcess.LOADER_MONITOR, loaderMonitorProcess);
} }
private void register(ControlProcess process, AbstractStateGraph graph) { private void register(ControlProcess process, AbstractStateGraph graph) {

View File

@ -0,0 +1,73 @@
package nu.marginalia.control.process;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.converting.mqapi.ConverterInboxNames;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
@Singleton
public class ConverterMonitorProcess extends AbstractStateGraph {
private final MqPersistence persistence;
private final ProcessService processService;
public static final String INITIAL = "INITIAL";
public static final String CHECK = "CHECK";
public static final String RUN = "RUN";
public static final String END = "END";
public static final int MAX_ATTEMPTS = 3;
public static final String inboxName = ConverterInboxNames.CONVERTER_INBOX;
public static final ProcessService.ProcessId processId = ProcessService.ProcessId.CONVERTER;
@Inject
public ConverterMonitorProcess(StateFactory stateFactory,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory);
this.persistence = persistence;
this.processService = processService;
}
@GraphState(name = INITIAL, next = CHECK)
public void init() {
}
@GraphState(name = CHECK, resume = ResumeBehavior.RETRY)
public void check() throws SQLException, InterruptedException {
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
TimeUnit.SECONDS.sleep(5);
} else {
transition(RUN, 0);
}
}
}
@GraphState(name = RUN)
public void run(Integer attempts) throws Exception {
try {
processService.trigger(processId);
}
catch (Exception e) {
if (attempts < MAX_ATTEMPTS) {
transition(RUN, attempts + 1);
}
else throw e;
}
transition(CHECK);
}
}

View File

@ -0,0 +1,73 @@
package nu.marginalia.control.process;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.converting.mqapi.ConverterInboxNames;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
@Singleton
public class LoaderMonitorProcess extends AbstractStateGraph {
private final MqPersistence persistence;
private final ProcessService processService;
public static final String INITIAL = "INITIAL";
public static final String CHECK = "CHECK";
public static final String RUN = "RUN";
public static final String END = "END";
public static final int MAX_ATTEMPTS = 1;
public static final String inboxName = ConverterInboxNames.LOADER_INBOX;
public static final ProcessService.ProcessId processId = ProcessService.ProcessId.LOADER;
@Inject
public LoaderMonitorProcess(StateFactory stateFactory,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory);
this.persistence = persistence;
this.processService = processService;
}
@GraphState(name = INITIAL, next = CHECK)
public void init() {
}
@GraphState(name = CHECK, resume = ResumeBehavior.RETRY)
public void check() throws SQLException, InterruptedException {
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
TimeUnit.SECONDS.sleep(5);
} else {
transition(RUN, 0);
}
}
}
@GraphState(name = RUN)
public void run(Integer attempts) throws Exception {
try {
processService.trigger(processId);
}
catch (Exception e) {
if (attempts < MAX_ATTEMPTS) {
transition(RUN, attempts + 1);
}
else throw e;
}
transition(CHECK);
}
}

View File

@ -105,14 +105,6 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph {
var request = new ConvertRequest(message.crawlStorageId, processedArea.id()); var request = new ConvertRequest(message.crawlStorageId, processedArea.id());
long id = mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)); long id = mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));
Executors.defaultThreadFactory().newThread(() -> {
try {
processService.trigger(ProcessService.ProcessId.CONVERTER);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
return message return message
.withProcessedStorageId(processedArea.id()) .withProcessedStorageId(processedArea.id())
.withConverterMsgId(id); .withConverterMsgId(id);
@ -134,14 +126,6 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph {
var request = new LoadRequest(message.processedStorageId); var request = new LoadRequest(message.processedStorageId);
long id = mqLoaderOutbox.sendAsync(LoadRequest.class.getSimpleName(), gson.toJson(request)); long id = mqLoaderOutbox.sendAsync(LoadRequest.class.getSimpleName(), gson.toJson(request));
Executors.defaultThreadFactory().newThread(() -> {
try {
processService.trigger(ProcessService.ProcessId.LOADER);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
return message.withLoaderMsgId(id); return message.withLoaderMsgId(id);
} }
@ -155,23 +139,33 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph {
} }
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception { public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception {
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
error("Process " + processId + " did not launch");
}
for (;;) { for (;;) {
try { try {
return outbox.waitResponse(id, 1, TimeUnit.SECONDS); return outbox.waitResponse(id, 1, TimeUnit.SECONDS);
} }
catch (TimeoutException ex) { catch (TimeoutException ex) {
if (!processService.isRunning(processId)) { if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
try { error("Process " + processId + " died and did not re-launch");
return outbox.waitResponse(id, 10, TimeUnit.SECONDS);
}
catch (TimeoutException ex2) {
error("Process " + processId + " is not running");
}
} }
} }
} }
}
public boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException {
// Wait for process to start
long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30);
while (System.currentTimeMillis() < deadline) {
if (processService.isRunning(processId))
return true;
TimeUnit.SECONDS.sleep(1);
}
return false;
} }
// @GraphState(name = MOVE_INDEX_FILES, next = RELOAD_LEXICON, resume = ResumeBehavior.ERROR) // @GraphState(name = MOVE_INDEX_FILES, next = RELOAD_LEXICON, resume = ResumeBehavior.ERROR)