(mqfsm) Abortable state machine

This commit is contained in:
Viktor Lofgren 2023-07-15 14:11:48 +02:00
parent cdae74d395
commit 5ec10634d8
4 changed files with 46 additions and 10 deletions

View File

@ -26,12 +26,12 @@ public class MessageQueueFactory {
} }
public MqInboxIf createAsynchronousInbox(String inboxName, UUID instanceUUID) public MqAsynchronousInbox createAsynchronousInbox(String inboxName, UUID instanceUUID)
{ {
return new MqAsynchronousInbox(persistence, inboxName, instanceUUID); return new MqAsynchronousInbox(persistence, inboxName, instanceUUID);
} }
public MqInboxIf createSynchronousInbox(String inboxName, UUID instanceUUID) public MqSynchronousInbox createSynchronousInbox(String inboxName, UUID instanceUUID)
{ {
return new MqSynchronousInbox(persistence, inboxName, instanceUUID); return new MqSynchronousInbox(persistence, inboxName, instanceUUID);
} }

View File

@ -7,10 +7,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.*;
import java.util.Collection; import java.util.concurrent.ExecutorService;
import java.util.List; import java.util.concurrent.Executors;
import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** Message queue inbox that responds to a single message at a time /** Message queue inbox that responds to a single message at a time
@ -29,6 +28,7 @@ public class MqSynchronousInbox implements MqInboxIf {
private final List<MqSubscription> eventSubscribers = new ArrayList<>(); private final List<MqSubscription> eventSubscribers = new ArrayList<>();
private Thread pollDbThread; private Thread pollDbThread;
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public MqSynchronousInbox(MqPersistence persistence, public MqSynchronousInbox(MqPersistence persistence,
String inboxName, String inboxName,
@ -74,6 +74,8 @@ public class MqSynchronousInbox implements MqInboxIf {
run = false; run = false;
pollDbThread.join(); pollDbThread.join();
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
} }
@ -101,7 +103,8 @@ public class MqSynchronousInbox implements MqInboxIf {
try { try {
subscriber.onNotification(msg); subscriber.onNotification(msg);
updateMessageState(msg, MqMessageState.OK); updateMessageState(msg, MqMessageState.OK);
} catch (Exception ex) { }
catch (Exception ex) {
logger.error("Message Queue subscriber threw exception", ex); logger.error("Message Queue subscriber threw exception", ex);
updateMessageState(msg, MqMessageState.ERR); updateMessageState(msg, MqMessageState.ERR);
} }
@ -134,6 +137,7 @@ public class MqSynchronousInbox implements MqInboxIf {
} }
} }
private volatile java.util.concurrent.Future<?> currentTask = null;
public void pollDb() { public void pollDb() {
try { try {
for (long tick = 1; run; tick++) { for (long tick = 1; run; tick++) {
@ -141,7 +145,18 @@ public class MqSynchronousInbox implements MqInboxIf {
var messages = pollInbox(tick); var messages = pollInbox(tick);
for (var msg : messages) { for (var msg : messages) {
handleMessage(msg); // Handle message in a separate thread but wait for that thread, so we can interrupt that thread
// without interrupting the polling thread and shutting down the inbox completely
try {
currentTask = executorService.submit(() -> handleMessage(msg));
currentTask.get();
}
catch (Exception ex) {
logger.error("Inbox task was aborted", ex);
}
finally {
currentTask = null;
}
} }
if (messages.isEmpty()) { if (messages.isEmpty()) {
@ -154,6 +169,16 @@ public class MqSynchronousInbox implements MqInboxIf {
} }
} }
/** Attempt to abort the current task using an interrupt */
public void abortCurrentTask() {
var task = currentTask; // capture the value to avoid race conditions with the
// polling thread between the check and the interrupt
if (task != null) {
task.cancel(true);
}
}
private void handleMessage(MqMessage msg) { private void handleMessage(MqMessage msg) {
logger.info("Notifying subscribers of msg {}", msg.msgId()); logger.info("Notifying subscribers of msg {}", msg.msgId());

View File

@ -6,6 +6,7 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.inbox.MqInboxIf; import nu.marginalia.mq.inbox.MqInboxIf;
import nu.marginalia.mq.inbox.MqInboxResponse; import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSubscription; import nu.marginalia.mq.inbox.MqSubscription;
import nu.marginalia.mq.inbox.MqSynchronousInbox;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.mqsm.graph.ResumeBehavior;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.graph.AbstractStateGraph;
@ -25,7 +26,7 @@ import java.util.function.BiConsumer;
public class StateMachine { public class StateMachine {
private final Logger logger = LoggerFactory.getLogger(StateMachine.class); private final Logger logger = LoggerFactory.getLogger(StateMachine.class);
private final MqInboxIf smInbox; private final MqSynchronousInbox smInbox;
private final MqOutbox smOutbox; private final MqOutbox smOutbox;
private final String queueName; private final String queueName;
@ -291,6 +292,11 @@ public class StateMachine {
// Add a state transition to the final state // Add a state transition to the final state
smOutbox.notify(abortMsgId, finalState.name(), ""); smOutbox.notify(abortMsgId, finalState.name(), "");
// Dislodge the current task with an interrupt.
// It's actually fine if we accidentally interrupt the wrong thread
// (i.e. the abort task), since it shouldn't be doing anything interruptable
smInbox.abortCurrentTask();
} }
private class StateEventSubscription implements MqSubscription { private class StateEventSubscription implements MqSubscription {

View File

@ -124,7 +124,12 @@ public abstract class AbstractStateGraph {
if (ex instanceof ControlFlowException cfe) { if (ex instanceof ControlFlowException cfe) {
return stateFactory.transition(cfe.getState(), cfe.getPayload()); return stateFactory.transition(cfe.getState(), cfe.getPayload());
} else { }
else if (ex instanceof InterruptedException intE) {
logger.error("State execution was interrupted " + state);
return StateTransition.to("ERR", "Execution interrupted");
}
else {
logger.error("Error in state invocation " + state, ex); logger.error("Error in state invocation " + state, ex);
return StateTransition.to("ERROR", return StateTransition.to("ERROR",
"Exception: " + ex.getClass().getSimpleName() + "/" + ex.getMessage()); "Exception: " + ex.getClass().getSimpleName() + "/" + ex.getMessage());