(control) Clean up the number of GUI views, abortable FSM tasks

This commit is contained in:
Viktor Lofgren 2023-07-13 17:24:21 +02:00
parent 0960e18f8e
commit 948d4d5f08
16 changed files with 229 additions and 150 deletions

View File

@ -28,7 +28,11 @@ public class StateMachine {
private final MqInboxIf smInbox; private final MqInboxIf smInbox;
private final MqOutbox smOutbox; private final MqOutbox smOutbox;
private final String queueName; private final String queueName;
private MachineState state;
private volatile MachineState state;
private volatile ExpectedMessage expectedMessage = ExpectedMessage.anyUnrelated();
private final MachineState errorState = new StateFactory.ErrorState(); private final MachineState errorState = new StateFactory.ErrorState();
private final MachineState finalState = new StateFactory.FinalState(); private final MachineState finalState = new StateFactory.FinalState();
@ -37,7 +41,6 @@ public class StateMachine {
private final List<BiConsumer<String, String>> stateChangeListeners = new ArrayList<>(); private final List<BiConsumer<String, String>> stateChangeListeners = new ArrayList<>();
private final Map<String, MachineState> allStates = new HashMap<>(); private final Map<String, MachineState> allStates = new HashMap<>();
private ExpectedMessage expectedMessage = ExpectedMessage.anyUnrelated();
public StateMachine(MessageQueueFactory messageQueueFactory, public StateMachine(MessageQueueFactory messageQueueFactory,
String queueName, String queueName,
@ -237,8 +240,13 @@ public class StateMachine {
logger.info("Transitining from state {}", state.name()); logger.info("Transitining from state {}", state.name());
var transition = state.next(msg.payload()); var transition = state.next(msg.payload());
expectedMessage = ExpectedMessage.responseTo(msg); if (!expectedMessage.isExpected(msg)) {
smOutbox.notify(expectedMessage.id, transition.state(), transition.message()); logger.warn("Expected message changed during execution, skipping state transition to {}", transition.state());
}
else {
expectedMessage = ExpectedMessage.responseTo(msg);
smOutbox.notify(expectedMessage.id, transition.state(), transition.message());
}
} }
else { else {
// On terminal transition, we expect any message // On terminal transition, we expect any message
@ -258,6 +266,33 @@ public class StateMachine {
} }
} }
public MachineState getState() {
return state;
}
public void abortExecution() throws Exception {
// Create a fake message to abort the execution
// This helps make sense of the queue when debugging
// and also permits the real termination message to have an
// unique expected ID
long abortMsgId = smOutbox.notify(expectedMessage.id, "ABORT", "Aborting execution");
// Set it as dead to clean up the queue from mystery ACK messages
smOutbox.flagAsDead(abortMsgId);
// Set the expected message to the abort message,
// technically there's a slight chance of a race condition here,
// which will cause this message to be ERR'd and the process to
// continue, but it's very unlikely and the worst that can happen
// is you have to abort twice.
expectedMessage = ExpectedMessage.expectId(abortMsgId);
// Add a state transition to the final state
smOutbox.notify(abortMsgId, finalState.name(), "");
}
private class StateEventSubscription implements MqSubscription { private class StateEventSubscription implements MqSubscription {
@Override @Override
@ -308,6 +343,10 @@ class ExpectedMessage {
return new ExpectedMessage(-1); return new ExpectedMessage(-1);
} }
public static ExpectedMessage expectId(long id) {
return new ExpectedMessage(id);
}
public boolean isExpected(MqMessage message) { public boolean isExpected(MqMessage message) {
if (id < 0) if (id < 0)
return true; return true;

View File

@ -34,6 +34,7 @@ public class ControlService extends Service {
private final MustacheRenderer<Map<?,?>> processesRenderer; private final MustacheRenderer<Map<?,?>> processesRenderer;
private final MustacheRenderer<Map<?,?>> eventsRenderer; private final MustacheRenderer<Map<?,?>> eventsRenderer;
private final MustacheRenderer<Map<?,?>> messageQueueRenderer; private final MustacheRenderer<Map<?,?>> messageQueueRenderer;
private final MustacheRenderer<Map<?,?>> fsmStateRenderer;
private final MqPersistence messageQueuePersistence; private final MqPersistence messageQueuePersistence;
private final StaticResources staticResources; private final StaticResources staticResources;
private final MessageQueueMonitorService messageQueueMonitorService; private final MessageQueueMonitorService messageQueueMonitorService;
@ -61,6 +62,7 @@ public class ControlService extends Service {
processesRenderer = rendererFactory.renderer("control/processes"); processesRenderer = rendererFactory.renderer("control/processes");
eventsRenderer = rendererFactory.renderer("control/events"); eventsRenderer = rendererFactory.renderer("control/events");
messageQueueRenderer = rendererFactory.renderer("control/message-queue"); messageQueueRenderer = rendererFactory.renderer("control/message-queue");
fsmStateRenderer = rendererFactory.renderer("control/fsm-states");
this.messageQueuePersistence = messageQueuePersistence; this.messageQueuePersistence = messageQueuePersistence;
this.staticResources = staticResources; this.staticResources = staticResources;
@ -73,16 +75,34 @@ public class ControlService extends Service {
Spark.get("/public/", (req, rsp) -> indexRenderer.render(Map.of())); Spark.get("/public/", (req, rsp) -> indexRenderer.render(Map.of()));
Spark.get("/public/services", (req, rsp) -> servicesRenderer.render(Map.of("heartbeats", heartbeatService.getServiceHeartbeats()))); Spark.get("/public/services",
Spark.get("/public/processes", (req, rsp) -> processesRenderer.render(Map.of("heartbeats", heartbeatService.getProcessHeartbeats()))); (req, rsp) -> Map.of("services", heartbeatService.getServiceHeartbeats(),
Spark.get("/public/events", (req, rsp) -> eventsRenderer.render(Map.of("events", eventLogService.getLastEntries(20)))); "events", eventLogService.getLastEntries(20)),
Spark.get("/public/message-queue", (req, rsp) -> messageQueueRenderer.render(Map.of("messages", messageQueueViewService.getLastEntries(20)))); (map) -> servicesRenderer.render((Map<?, ?>) map));
Spark.get("/public/processes",
(req, rsp) -> Map.of("processes", heartbeatService.getProcessHeartbeats(),
"fsms", controlProcesses.getFsmStates(),
"messages", messageQueueViewService.getLastEntries(20)),
(map) -> processesRenderer.render((Map<?, ?>) map));
Spark.post("/public/fsms/:fsm/start", (req, rsp) -> {
controlProcesses.start(ControlProcess.valueOf(req.params("fsm").toUpperCase()));
rsp.redirect("/processes");
return "";
});
Spark.post("/public/fsms/:fsm/stop", (req, rsp) -> {
controlProcesses.stop(ControlProcess.valueOf(req.params("fsm").toUpperCase()));
rsp.redirect("/processes");
return "";
});
// TODO: This should be a POST // TODO: This should be a POST
Spark.get("/public/repartition", (req, rsp) -> { Spark.get("/public/repartition", (req, rsp) -> {
controlProcesses.start(ControlProcess.REPARTITION_REINDEX); controlProcesses.start(ControlProcess.REPARTITION_REINDEX);
return "OK"; return "OK";
}); });
// TODO: This should be a POST // TODO: This should be a POST
Spark.get("/public/reconvert", (req, rsp) -> { Spark.get("/public/reconvert", (req, rsp) -> {
controlProcesses.start(ControlProcess.RECONVERT_LOAD, "/samples/crawl-blogs/plan.yaml"); controlProcesses.start(ControlProcess.RECONVERT_LOAD, "/samples/crawl-blogs/plan.yaml");

View File

@ -0,0 +1,12 @@
package nu.marginalia.control.model;
public record ControlProcessState(String name, String state, boolean terminal) {
public String stateIcon() {
if (terminal) {
return "\uD83D\uDE34";
}
else {
return "\uD83C\uDFC3";
}
}
}

View File

@ -6,6 +6,7 @@ public record MessageQueueEntry (
String senderInbox, String senderInbox,
String recipientInbox, String recipientInbox,
String function, String function,
String payload,
String ownerInstanceFull, String ownerInstanceFull,
long ownerTick, long ownerTick,
String state, String state,

View File

@ -3,15 +3,19 @@ package nu.marginalia.control.process;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.control.model.ControlProcess; import nu.marginalia.control.model.ControlProcess;
import nu.marginalia.control.model.ControlProcessState;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqsm.StateMachine; import nu.marginalia.mqsm.StateMachine;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.state.MachineState;
import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.BaseServiceParams;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -60,4 +64,21 @@ public class ControlProcesses {
stateMachines.get(process).init(gson.toJson(arg)); stateMachines.get(process).init(gson.toJson(arg));
} }
public List<ControlProcessState> getFsmStates() {
return stateMachines.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> {
final MachineState state = e.getValue().getState();
final String machineName = e.getKey().name();
final String stateName = state.name();
final boolean terminal = state.isFinal();
return new ControlProcessState(machineName, stateName, terminal);
}).toList();
}
@SneakyThrows
public void stop(ControlProcess fsm) {
stateMachines.get(fsm).abortExecution();
}
} }

View File

@ -22,7 +22,7 @@ public class MessageQueueViewService {
public List<MessageQueueEntry> getLastEntries(int n) { public List<MessageQueueEntry> getLastEntries(int n) {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var query = conn.prepareStatement(""" var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE FROM MESSAGE_QUEUE
ORDER BY ID DESC ORDER BY ID DESC
LIMIT ? LIMIT ?
@ -38,6 +38,7 @@ public class MessageQueueViewService {
rs.getString("SENDER_INBOX"), rs.getString("SENDER_INBOX"),
rs.getString("RECIPIENT_INBOX"), rs.getString("RECIPIENT_INBOX"),
rs.getString("FUNCTION"), rs.getString("FUNCTION"),
rs.getString("PAYLOAD"),
rs.getString("OWNER_INSTANCE"), rs.getString("OWNER_INSTANCE"),
rs.getLong("OWNER_TICK"), rs.getLong("OWNER_TICK"),
rs.getString("STATE"), rs.getString("STATE"),

View File

@ -1,43 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
</head>
<body>
{{> control/partials/nav}}
<section>
<h1>Events</h1>
<table id="events">
<tr>
<th>Service Name</th>
<th>Instance</th>
<th>Event Time</th>
<th>Type</th>
<th>Message</th>
</tr>
{{#each events}}
<tr>
<td>{{serviceName}}</td>
<td title="{{instanceFull}}">
<span style="background-color: {{instanceColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{instanceColor2}}" class="uuidPip">&nbsp;</span>
{{instance}}
</td>
<td>{{eventTime}}</td>
<td>{{eventType}}</td>
<td>{{eventMessage}}</td>
</tr>
{{/each}}
</table>
</section>
</body>
<script src="/refresh.js"></script>
<script>
window.setInterval(() => {
refresh(["heartbeats"]);
}, 5000);
</script>
</html>

View File

@ -1,52 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
</head>
<body>
{{> control/partials/nav}}
<section>
<h1>Message Queue</h1>
<table id="queue">
<tr>
<th>State<br>TTL</th>
<th>Msg ID<br>Related ID</th>
<th>Recipient<br>Sender</th>
<th>Function</th>
<th>Owner Instance<br>Owner Tick</th>
<th>Created<br>Updated</th>
</tr>
{{#each messages}}
<tr>
<td>{{stateCode}}&nbsp;{{state}}</td>
<td>{{id}}</td>
<td>{{recipientInbox}}</td>
<td>{{function}}</td>
<td title="{{ownerInstanceFull}}">
<span style="background-color: {{ownerInstanceColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{ownerInstanceColor2}}" class="uuidPip">&nbsp;</span>&nbsp;{{ownerInstance}}
</td>
<td>{{createdTime}}</td>
</tr>
<tr>
<td>{{ttl}}</td>
<td>{{relatedId}}</td>
<td>{{senderInbox}}</td>
<td></td>
<td>{{ownerTick}}</td>
<td>{{updatedTime}}</td>
</tr>
{{/each}}
</table>
</section>
</body>
<script src="/refresh.js"></script>
<script>
window.setInterval(() => {
refresh(["queue"]);
}, 5000);
</script>
</html>

View File

@ -0,0 +1,23 @@
<h1>Events</h1>
<table id="events">
<tr>
<th>Service Name</th>
<th>Instance</th>
<th>Event Time</th>
<th>Type</th>
<th>Message</th>
</tr>
{{#each events}}
<tr>
<td>{{serviceName}}</td>
<td title="{{instanceFull}}">
<span style="background-color: {{instanceColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{instanceColor2}}" class="uuidPip">&nbsp;</span>
{{instance}}
</td>
<td>{{eventTime}}</td>
<td>{{eventType}}</td>
<td>{{eventMessage}}</td>
</tr>
{{/each}}
</table>

View File

@ -0,0 +1,23 @@
<h1>FSMs</h1>
<table id="fsms">
<tr>
<th>FSM</th>
<th>State</th>
<th>Action</th>
</tr>
{{#each fsms}}
<tr>
<td>{{name}}</td>
<td>{{stateIcon}}&nbsp;{{state}}</td>
<td>
{{#unless terminal}}
<form action="/fsms/{{name}}/stop" method="post"><input type="submit" value="Stop"></form>
{{/unless}}
{{#if terminal}}
<form action="/fsms/{{name}}/start" method="post"><input type="submit" value="Start"></form>
{{/if}}
</td>
</tr>
{{/each}}
</table>

View File

@ -0,0 +1,32 @@
<h1>Message Queue</h1>
<table id="queue">
<tr>
<th>State<br>TTL</th>
<th>Msg ID<br>Related ID</th>
<th>Recipient<br>Sender</th>
<th>Function<br>Payload</th>
<th>Owner Instance<br>Owner Tick</th>
<th>Created<br>Updated</th>
</tr>
{{#each messages}}
<tr>
<td>{{stateCode}}&nbsp;{{state}}</td>
<td>{{id}}</td>
<td>{{recipientInbox}}</td>
<td>{{function}}</td>
<td title="{{ownerInstanceFull}}">
<span style="background-color: {{ownerInstanceColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{ownerInstanceColor2}}" class="uuidPip">&nbsp;</span>&nbsp;{{ownerInstance}}
</td>
<td>{{createdTime}}</td>
</tr>
<tr>
<td>{{ttl}}</td>
<td>{{relatedId}}</td>
<td>{{senderInbox}}</td>
<td>{{payload}}</td>
<td>{{ownerTick}}</td>
<td>{{updatedTime}}</td>
</tr>
{{/each}}
</table>

View File

@ -3,7 +3,5 @@
<li><a href="/">Overview</a></li> <li><a href="/">Overview</a></li>
<li><a href="services">Services</a></li> <li><a href="services">Services</a></li>
<li><a href="processes">Processes</a></li> <li><a href="processes">Processes</a></li>
<li><a href="events">Events</a></li>
<li><a href="message-queue">Message Queue</a></li>
</ul> </ul>
</nav> </nav>

View File

@ -0,0 +1,23 @@
<h1>Processes</h1>
<table id="processes">
<tr>
<th>Process ID</th>
<th>UUID</th>
<th>Status</th>
<th>Progress</th>
<th>Last Seen (ms)</th>
</tr>
{{#each processes}}
<tr class="{{#if isMissing}}missing{{/if}}" style="{{progressStyle}}">
<td>{{processId}}</td>
<td title="{{uuidFull}}">
<span style="background-color: {{uuidColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{uuidColor2}}" class="uuidPip">&nbsp;</span>
{{uuid}}
</td>
<td>{{status}}</td>
<td>{{#if progress}}{{progress}}%{{/if}}</td>
<td>{{#unless isStopped}}{{lastSeenMillis}}{{/unless}}</td>
</tr>
{{/each}}
</table>

View File

@ -0,0 +1,18 @@
<h1>Services</h1>
<table id="services">
<tr>
<th>Service ID</th>
<th>UUID</th>
<th>Last Seen (ms)</th>
</tr>
{{#each services}}
<tr class="{{#if isMissing}}missing{{/if}} {{#unless alive}}terminated{{/unless}}">
<td>{{serviceId}}</td>
<td title="{{uuidFull}}">
<span style="background-color: {{uuidColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{uuidColor2}}" class="uuidPip">&nbsp;</span>
{{uuid}}
</td>
<td>{{lastSeenMillis}}</td>
</tr>
{{/each}}
</table>

View File

@ -7,36 +7,16 @@
</head> </head>
<body> <body>
{{> control/partials/nav}} {{> control/partials/nav}}
<section> <section>
<h1>Processes</h1> {{> control/partials/processes-table}}
<table id="heartbeats"> {{> control/partials/fsm-table}}
<tr> {{> control/partials/message-queue-table}}
<th>Process ID</th>
<th>UUID</th>
<th>Status</th>
<th>Progress</th>
<th>Last Seen (ms)</th>
</tr>
{{#each heartbeats}}
<tr class="{{#if isMissing}}missing{{/if}}" style="{{progressStyle}}">
<td>{{processId}}</td>
<td title="{{uuidFull}}">
<span style="background-color: {{uuidColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{uuidColor2}}" class="uuidPip">&nbsp;</span>
{{uuid}}
</td>
<td>{{status}}</td>
<td>{{#if progress}}{{progress}}%{{/if}}</td>
<td>{{#unless isStopped}}{{lastSeenMillis}}{{/unless}}</td>
</tr>
{{/each}}
</table>
</section> </section>
</body> </body>
<script src="/refresh.js"></script> <script src="/refresh.js"></script>
<script> <script>
window.setInterval(() => { window.setInterval(() => {
refresh(["heartbeats"]); refresh(["processes", "fsms", "queue"]);
}, 2000); }, 2000);
</script> </script>
</html> </html>

View File

@ -7,32 +7,15 @@
</head> </head>
<body> <body>
{{> control/partials/nav}} {{> control/partials/nav}}
<section> <section>
<h1>Services</h1> {{> control/partials/services-table }}
<table id="heartbeats"> {{> control/partials/events-table }}
<tr>
<th>Service ID</th>
<th>UUID</th>
<th>Last Seen (ms)</th>
</tr>
{{#each heartbeats}}
<tr class="{{#if isMissing}}missing{{/if}} {{#unless alive}}terminated{{/unless}}">
<td>{{serviceId}}</td>
<td title="{{uuidFull}}">
<span style="background-color: {{uuidColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{uuidColor2}}" class="uuidPip">&nbsp;</span>
{{uuid}}
</td>
<td>{{lastSeenMillis}}</td>
</tr>
{{/each}}
</table>
</section> </section>
</body> </body>
<script src="/refresh.js"></script> <script src="/refresh.js"></script>
<script> <script>
window.setInterval(() => { window.setInterval(() => {
refresh(["heartbeats"]); refresh(["services", "events"]);
}, 5000); }, 5000);
</script> </script>
</html> </html>