diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java index 94118113..94e969a9 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java @@ -28,7 +28,11 @@ public class StateMachine { private final MqInboxIf smInbox; private final MqOutbox smOutbox; private final String queueName; - private MachineState state; + + + private volatile MachineState state; + private volatile ExpectedMessage expectedMessage = ExpectedMessage.anyUnrelated(); + private final MachineState errorState = new StateFactory.ErrorState(); private final MachineState finalState = new StateFactory.FinalState(); @@ -37,7 +41,6 @@ public class StateMachine { private final List> stateChangeListeners = new ArrayList<>(); private final Map allStates = new HashMap<>(); - private ExpectedMessage expectedMessage = ExpectedMessage.anyUnrelated(); public StateMachine(MessageQueueFactory messageQueueFactory, String queueName, @@ -237,8 +240,13 @@ public class StateMachine { logger.info("Transitining from state {}", state.name()); var transition = state.next(msg.payload()); - expectedMessage = ExpectedMessage.responseTo(msg); - smOutbox.notify(expectedMessage.id, transition.state(), transition.message()); + if (!expectedMessage.isExpected(msg)) { + logger.warn("Expected message changed during execution, skipping state transition to {}", transition.state()); + } + else { + expectedMessage = ExpectedMessage.responseTo(msg); + smOutbox.notify(expectedMessage.id, transition.state(), transition.message()); + } } else { // On terminal transition, we expect any message @@ -258,6 +266,33 @@ public class StateMachine { } } + public MachineState getState() { + return state; + } + + public void abortExecution() throws Exception { + // Create a fake message to abort the execution + // This helps make sense of the queue when debugging + // and also permits the real termination message to have an + // unique expected ID + + long abortMsgId = smOutbox.notify(expectedMessage.id, "ABORT", "Aborting execution"); + + // Set it as dead to clean up the queue from mystery ACK messages + smOutbox.flagAsDead(abortMsgId); + + // Set the expected message to the abort message, + // technically there's a slight chance of a race condition here, + // which will cause this message to be ERR'd and the process to + // continue, but it's very unlikely and the worst that can happen + // is you have to abort twice. + + expectedMessage = ExpectedMessage.expectId(abortMsgId); + + // Add a state transition to the final state + smOutbox.notify(abortMsgId, finalState.name(), ""); + } + private class StateEventSubscription implements MqSubscription { @Override @@ -308,6 +343,10 @@ class ExpectedMessage { return new ExpectedMessage(-1); } + public static ExpectedMessage expectId(long id) { + return new ExpectedMessage(id); + } + public boolean isExpected(MqMessage message) { if (id < 0) return true; diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java index 9d660a1e..88f51186 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -34,6 +34,7 @@ public class ControlService extends Service { private final MustacheRenderer> processesRenderer; private final MustacheRenderer> eventsRenderer; private final MustacheRenderer> messageQueueRenderer; + private final MustacheRenderer> fsmStateRenderer; private final MqPersistence messageQueuePersistence; private final StaticResources staticResources; private final MessageQueueMonitorService messageQueueMonitorService; @@ -61,6 +62,7 @@ public class ControlService extends Service { processesRenderer = rendererFactory.renderer("control/processes"); eventsRenderer = rendererFactory.renderer("control/events"); messageQueueRenderer = rendererFactory.renderer("control/message-queue"); + fsmStateRenderer = rendererFactory.renderer("control/fsm-states"); this.messageQueuePersistence = messageQueuePersistence; this.staticResources = staticResources; @@ -73,16 +75,34 @@ public class ControlService extends Service { Spark.get("/public/", (req, rsp) -> indexRenderer.render(Map.of())); - Spark.get("/public/services", (req, rsp) -> servicesRenderer.render(Map.of("heartbeats", heartbeatService.getServiceHeartbeats()))); - Spark.get("/public/processes", (req, rsp) -> processesRenderer.render(Map.of("heartbeats", heartbeatService.getProcessHeartbeats()))); - Spark.get("/public/events", (req, rsp) -> eventsRenderer.render(Map.of("events", eventLogService.getLastEntries(20)))); - Spark.get("/public/message-queue", (req, rsp) -> messageQueueRenderer.render(Map.of("messages", messageQueueViewService.getLastEntries(20)))); + Spark.get("/public/services", + (req, rsp) -> Map.of("services", heartbeatService.getServiceHeartbeats(), + "events", eventLogService.getLastEntries(20)), + (map) -> servicesRenderer.render((Map) map)); + + Spark.get("/public/processes", + (req, rsp) -> Map.of("processes", heartbeatService.getProcessHeartbeats(), + "fsms", controlProcesses.getFsmStates(), + "messages", messageQueueViewService.getLastEntries(20)), + (map) -> processesRenderer.render((Map) map)); + + Spark.post("/public/fsms/:fsm/start", (req, rsp) -> { + controlProcesses.start(ControlProcess.valueOf(req.params("fsm").toUpperCase())); + rsp.redirect("/processes"); + return ""; + }); + Spark.post("/public/fsms/:fsm/stop", (req, rsp) -> { + controlProcesses.stop(ControlProcess.valueOf(req.params("fsm").toUpperCase())); + rsp.redirect("/processes"); + return ""; + }); // TODO: This should be a POST Spark.get("/public/repartition", (req, rsp) -> { controlProcesses.start(ControlProcess.REPARTITION_REINDEX); return "OK"; }); + // TODO: This should be a POST Spark.get("/public/reconvert", (req, rsp) -> { controlProcesses.start(ControlProcess.RECONVERT_LOAD, "/samples/crawl-blogs/plan.yaml"); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcessState.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcessState.java new file mode 100644 index 00000000..39d69ebd --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcessState.java @@ -0,0 +1,12 @@ +package nu.marginalia.control.model; + +public record ControlProcessState(String name, String state, boolean terminal) { + public String stateIcon() { + if (terminal) { + return "\uD83D\uDE34"; + } + else { + return "\uD83C\uDFC3"; + } + } +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/MessageQueueEntry.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/MessageQueueEntry.java index f11591ac..43c5bf07 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/MessageQueueEntry.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/MessageQueueEntry.java @@ -6,6 +6,7 @@ public record MessageQueueEntry ( String senderInbox, String recipientInbox, String function, + String payload, String ownerInstanceFull, long ownerTick, String state, diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java index 35987f14..404bd273 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java @@ -3,15 +3,19 @@ package nu.marginalia.control.process; import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; +import lombok.SneakyThrows; import nu.marginalia.control.model.ControlProcess; +import nu.marginalia.control.model.ControlProcessState; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mqsm.StateMachine; import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.state.MachineState; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.BaseServiceParams; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -60,4 +64,21 @@ public class ControlProcesses { stateMachines.get(process).init(gson.toJson(arg)); } + public List getFsmStates() { + return stateMachines.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> { + + final MachineState state = e.getValue().getState(); + + final String machineName = e.getKey().name(); + final String stateName = state.name(); + final boolean terminal = state.isFinal(); + + return new ControlProcessState(machineName, stateName, terminal); + }).toList(); + } + + @SneakyThrows + public void stop(ControlProcess fsm) { + stateMachines.get(fsm).abortExecution(); + } } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java index 9531c0b4..439b1c2f 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java @@ -22,7 +22,7 @@ public class MessageQueueViewService { public List getLastEntries(int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE ORDER BY ID DESC LIMIT ? @@ -38,6 +38,7 @@ public class MessageQueueViewService { rs.getString("SENDER_INBOX"), rs.getString("RECIPIENT_INBOX"), rs.getString("FUNCTION"), + rs.getString("PAYLOAD"), rs.getString("OWNER_INSTANCE"), rs.getLong("OWNER_TICK"), rs.getString("STATE"), diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/events.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/events.hdb deleted file mode 100644 index 83d5f449..00000000 --- a/code/services-satellite/control-service/src/main/resources/templates/control/events.hdb +++ /dev/null @@ -1,43 +0,0 @@ - - - - Control Service - - - - - {{> control/partials/nav}} - -
-

Events

- - - - - - - - - - {{#each events}} - - - - - - - - {{/each}} -
Service NameInstanceEvent TimeTypeMessage
{{serviceName}} -    - {{instance}} - {{eventTime}}{{eventType}}{{eventMessage}}
-
- - - - diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/message-queue.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/message-queue.hdb deleted file mode 100644 index c46193b4..00000000 --- a/code/services-satellite/control-service/src/main/resources/templates/control/message-queue.hdb +++ /dev/null @@ -1,52 +0,0 @@ - - - - Control Service - - - - - {{> control/partials/nav}} - -
-

Message Queue

- - - - - - - - - - - {{#each messages}} - - - - - - - - - - - - - - - - - {{/each}} -
State
TTL
Msg ID
Related ID
Recipient
Sender
FunctionOwner Instance
Owner Tick
Created
Updated
{{stateCode}} {{state}}{{id}}{{recipientInbox}}{{function}} -    {{ownerInstance}} - {{createdTime}}
{{ttl}}{{relatedId}}{{senderInbox}}{{ownerTick}}{{updatedTime}}
-
- - - - \ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/partials/events-table.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/partials/events-table.hdb new file mode 100644 index 00000000..23324a13 --- /dev/null +++ b/code/services-satellite/control-service/src/main/resources/templates/control/partials/events-table.hdb @@ -0,0 +1,23 @@ +

Events

+ + + + + + + + + + {{#each events}} + + + + + + + + {{/each}} +
Service NameInstanceEvent TimeTypeMessage
{{serviceName}} +    + {{instance}} + {{eventTime}}{{eventType}}{{eventMessage}}
\ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/partials/fsm-table.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/partials/fsm-table.hdb new file mode 100644 index 00000000..c7b66e9a --- /dev/null +++ b/code/services-satellite/control-service/src/main/resources/templates/control/partials/fsm-table.hdb @@ -0,0 +1,23 @@ +

FSMs

+ + + + + + + {{#each fsms}} + + + + + + {{/each}} +
FSMStateAction
{{name}}{{stateIcon}} {{state}} + {{#unless terminal}} +
+ {{/unless}} + {{#if terminal}} +
+ {{/if}} + +
\ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb new file mode 100644 index 00000000..cf584ab2 --- /dev/null +++ b/code/services-satellite/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb @@ -0,0 +1,32 @@ +

Message Queue

+ + + + + + + + + + + {{#each messages}} + + + + + + + + + + + + + + + + + {{/each}} +
State
TTL
Msg ID
Related ID
Recipient
Sender
Function
Payload
Owner Instance
Owner Tick
Created
Updated
{{stateCode}} {{state}}{{id}}{{recipientInbox}}{{function}} +    {{ownerInstance}} + {{createdTime}}
{{ttl}}{{relatedId}}{{senderInbox}}{{payload}}{{ownerTick}}{{updatedTime}}
\ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/partials/nav.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/partials/nav.hdb index 771266f2..9b68f4b2 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/partials/nav.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/partials/nav.hdb @@ -3,7 +3,5 @@
  • Overview
  • Services
  • Processes
  • -
  • Events
  • -
  • Message Queue
  • \ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/partials/processes-table.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/partials/processes-table.hdb new file mode 100644 index 00000000..47d7dc64 --- /dev/null +++ b/code/services-satellite/control-service/src/main/resources/templates/control/partials/processes-table.hdb @@ -0,0 +1,23 @@ + +

    Processes

    + + + + + + + + + {{#each processes}} + + + + + + + + {{/each}} +
    Process IDUUIDStatusProgressLast Seen (ms)
    {{processId}} +    + {{uuid}} + {{status}}{{#if progress}}{{progress}}%{{/if}}{{#unless isStopped}}{{lastSeenMillis}}{{/unless}}
    \ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/partials/services-table.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/partials/services-table.hdb new file mode 100644 index 00000000..2137f1fe --- /dev/null +++ b/code/services-satellite/control-service/src/main/resources/templates/control/partials/services-table.hdb @@ -0,0 +1,18 @@ +

    Services

    + + + + + + + {{#each services}} + + + + + + {{/each}} +
    Service IDUUIDLast Seen (ms)
    {{serviceId}} +    + {{uuid}} + {{lastSeenMillis}}
    \ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/processes.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/processes.hdb index 53902f39..7d348be1 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/processes.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/processes.hdb @@ -7,36 +7,16 @@ {{> control/partials/nav}} -
    -

    Processes

    - - - - - - - - - {{#each heartbeats}} - - - - - - - - {{/each}} -
    Process IDUUIDStatusProgressLast Seen (ms)
    {{processId}} -    - {{uuid}} - {{status}}{{#if progress}}{{progress}}%{{/if}}{{#unless isStopped}}{{lastSeenMillis}}{{/unless}}
    + {{> control/partials/processes-table}} + {{> control/partials/fsm-table}} + {{> control/partials/message-queue-table}}
    \ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/services.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/services.hdb index a09d5c27..2c0542b9 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/services.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/services.hdb @@ -7,32 +7,15 @@ {{> control/partials/nav}} -
    -

    Services

    - - - - - - - {{#each heartbeats}} - - - - - - {{/each}} -
    Service IDUUIDLast Seen (ms)
    {{serviceId}} -    - {{uuid}} - {{lastSeenMillis}}
    + {{> control/partials/services-table }} + {{> control/partials/events-table }}
    \ No newline at end of file