(control) Message Queue GUI

This commit is contained in:
Viktor Lofgren 2023-08-04 17:54:18 +02:00
parent 624b78ec3a
commit 912129311d
9 changed files with 267 additions and 62 deletions

View File

@ -5,7 +5,7 @@ import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors; import nu.marginalia.client.ServiceMonitors;
import nu.marginalia.control.model.Actor; import nu.marginalia.control.model.Actor;
import nu.marginalia.control.model.DomainComplaintModel; import nu.marginalia.control.model.DomainComplaintModel;
import nu.marginalia.control.model.ProcessHeartbeat; import nu.marginalia.control.model.MessageQueueEntry;
import nu.marginalia.control.svc.*; import nu.marginalia.control.svc.*;
import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.db.storage.model.FileStorageType;
@ -27,6 +27,7 @@ import java.sql.SQLException;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class ControlService extends Service { public class ControlService extends Service {
@ -80,8 +81,11 @@ public class ControlService extends Service {
var apiKeysRenderer = rendererFactory.renderer("control/api-keys"); var apiKeysRenderer = rendererFactory.renderer("control/api-keys");
var domainComplaintsRenderer = rendererFactory.renderer("control/domain-complaints"); var domainComplaintsRenderer = rendererFactory.renderer("control/domain-complaints");
var messageQueueRenderer = rendererFactory.renderer("control/message-queue");
var storageDetailsRenderer = rendererFactory.renderer("control/storage-details"); var storageDetailsRenderer = rendererFactory.renderer("control/storage-details");
var updateMessageStateRenderer = rendererFactory.renderer("control/dialog-update-message-state"); var updateMessageStateRenderer = rendererFactory.renderer("control/dialog-update-message-state");
var newMessageRenderer = rendererFactory.renderer("control/new-message");
this.controlActorService = controlActorService; this.controlActorService = controlActorService;
@ -98,7 +102,7 @@ public class ControlService extends Service {
Spark.get("/public/services", this::servicesModel, servicesRenderer::render); Spark.get("/public/services", this::servicesModel, servicesRenderer::render);
Spark.get("/public/services/:id", this::serviceModel, serviceByIdRenderer::render); Spark.get("/public/services/:id", this::serviceModel, serviceByIdRenderer::render);
Spark.get("/public/messages/:id", this::messageModel, gson::toJson); Spark.get("/public/messages/:id", this::existingMessageModel, gson::toJson);
Spark.get("/public/actors", this::processesModel, actorsRenderer::render); Spark.get("/public/actors", this::processesModel, actorsRenderer::render);
Spark.get("/public/actors/:fsm", this::actorDetailsModel, actorDetailsRenderer::render); Spark.get("/public/actors/:fsm", this::actorDetailsModel, actorDetailsRenderer::render);
Spark.get("/public/storage", this::storageModel, storageRenderer::render); Spark.get("/public/storage", this::storageModel, storageRenderer::render);
@ -114,9 +118,38 @@ public class ControlService extends Service {
final HtmlRedirect redirectToApiKeys = new HtmlRedirect("/api-keys"); final HtmlRedirect redirectToApiKeys = new HtmlRedirect("/api-keys");
final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage"); final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage");
final HtmlRedirect redirectToComplaints = new HtmlRedirect("/complaints"); final HtmlRedirect redirectToComplaints = new HtmlRedirect("/complaints");
final HtmlRedirect redirectToMessageQueue = new HtmlRedirect("/message-queue");
Spark.post("/public/fsms/:fsm/start", controlActorService::startFsm, redirectToProcesses); Spark.post("/public/fsms/:fsm/start", controlActorService::startFsm, redirectToProcesses);
Spark.post("/public/fsms/:fsm/stop", controlActorService::stopFsm, redirectToProcesses); Spark.post("/public/fsms/:fsm/stop", controlActorService::stopFsm, redirectToProcesses);
Spark.get("/public/message-queue", this::messageQueueModel, messageQueueRenderer::render);
Spark.post("/public/message-queue/", (rq, rsp) -> {
String recipient = rq.queryParams("recipientInbox");
String sender = rq.queryParams("senderInbox");
String relatedMessage = rq.queryParams("relatedId");
String function = rq.queryParams("function");
String payload = rq.queryParams("payload");
persistence.sendNewMessage(recipient,
sender,
relatedMessage == null ? null : Long.parseLong(relatedMessage),
function,
payload,
null);
return "";
}, redirectToMessageQueue);
Spark.get("/public/message-queue/new", this::newMessageModel, newMessageRenderer::render);
Spark.get("/public/message-queue/:id/reply", this::replyMessageModel, newMessageRenderer::render);
Spark.get("/public/message-queue/:id/edit", (rq, rsp) -> persistence.getMessage(Long.parseLong(rq.params("id"))), updateMessageStateRenderer::render);
Spark.post("/public/message-queue/:id/edit", (rq, rsp) -> {
MqMessageState state = MqMessageState.valueOf(rq.queryParams("state"));
long id = Long.parseLong(rq.params("id"));
persistence.updateMessageState(id, state);
return "";
}, redirectToMessageQueue);
Spark.post("/public/storage/:fid/crawl", controlActorService::triggerCrawling, redirectToProcesses); Spark.post("/public/storage/:fid/crawl", controlActorService::triggerCrawling, redirectToProcesses);
Spark.post("/public/storage/:fid/recrawl", controlActorService::triggerRecrawling, redirectToProcesses); Spark.post("/public/storage/:fid/recrawl", controlActorService::triggerRecrawling, redirectToProcesses);
Spark.post("/public/storage/:fid/process", controlActorService::triggerProcessing, redirectToProcesses); Spark.post("/public/storage/:fid/process", controlActorService::triggerProcessing, redirectToProcesses);
@ -134,19 +167,42 @@ public class ControlService extends Service {
Spark.get("/public/complaints", this::complaintsModel, domainComplaintsRenderer::render); Spark.get("/public/complaints", this::complaintsModel, domainComplaintsRenderer::render);
Spark.post("/public/complaints/:domain", this::reviewComplaint, redirectToComplaints); Spark.post("/public/complaints/:domain", this::reviewComplaint, redirectToComplaints);
Spark.get("/public/message/:id/state", (rq, rsp) -> persistence.getMessage(Long.parseLong(rq.params("id"))), updateMessageStateRenderer::render);
Spark.post("/public/message/:id/state", (rq, rsp) -> {
MqMessageState state = MqMessageState.valueOf(rq.queryParams("state"));
long id = Long.parseLong(rq.params("id"));
persistence.updateMessageState(id, state);
return "";
}, redirectToProcesses);
Spark.get("/public/:resource", this::serveStatic); Spark.get("/public/:resource", this::serveStatic);
monitors.subscribe(this::logMonitorStateChange); monitors.subscribe(this::logMonitorStateChange);
} }
private Object messageQueueModel(Request request, Response response) {
String inboxParam = request.queryParams("inbox");
String instanceParam = request.queryParams("instance");
String afterParam = request.queryParams("after");
long afterId = Optional.ofNullable(afterParam).map(Long::parseLong).orElse(Long.MAX_VALUE);
List<MessageQueueEntry> entries;
if (inboxParam != null) {
entries = messageQueueViewService.getEntriesForInbox(inboxParam, afterId, 20);
}
else if (instanceParam != null) {
entries = messageQueueViewService.getEntriesForInstance(instanceParam, afterId, 20);
}
else {
entries = messageQueueViewService.getEntries(afterId, 20);
}
Object next;
if (entries.size() == 20)
next = entries.stream().mapToLong(MessageQueueEntry::id).min().getAsLong();
else
next = "";
Object prev = afterParam == null ? "" : afterParam;
return Map.of("messages", entries, "next", next, "prev", prev);
}
private Object complaintsModel(Request request, Response response) { private Object complaintsModel(Request request, Response response) {
Map<Boolean, List<DomainComplaintModel>> complaintsByReviewed = Map<Boolean, List<DomainComplaintModel>> complaintsByReviewed =
domainComplaintService.getComplaints().stream().collect(Collectors.partitioningBy(DomainComplaintModel::reviewed)); domainComplaintService.getComplaints().stream().collect(Collectors.partitioningBy(DomainComplaintModel::reviewed));
@ -224,7 +280,7 @@ public class ControlService extends Service {
} }
private Object messageModel(Request request, Response response) { private Object existingMessageModel(Request request, Response response) {
var message = messageQueueViewService.getMessage(Long.parseLong(request.params("id"))); var message = messageQueueViewService.getMessage(Long.parseLong(request.params("id")));
if (message != null) { if (message != null) {
response.type("application/json"); response.type("application/json");
@ -236,11 +292,34 @@ public class ControlService extends Service {
} }
} }
private Object newMessageModel(Request request, Response response) {
String idParam = request.queryParams("id");
if (null == idParam)
return Map.of("relatedId", "-1");
var message = messageQueueViewService.getMessage(Long.parseLong(idParam));
if (message != null)
return message;
return Map.of("relatedId", "-1");
}
private Object replyMessageModel(Request request, Response response) {
String idParam = request.params("id");
var message = messageQueueViewService.getMessage(Long.parseLong(idParam));
return Map.of("relatedId", message.id(),
"recipientInbox", message.senderInbox(),
"function", "REPLY");
}
private Object serviceModel(Request request, Response response) { private Object serviceModel(Request request, Response response) {
String serviceName = request.params("id"); String serviceName = request.params("id");
return Map.of( return Map.of(
"id", serviceName, "id", serviceName,
"messages", messageQueueViewService.getEntriesForInbox(serviceName, Long.MAX_VALUE, 20),
"events", eventLogService.getLastEntriesForService(serviceName, 20)); "events", eventLogService.getLastEntriesForService(serviceName, 20));
} }

View File

@ -7,6 +7,7 @@ import nu.marginalia.control.model.Actor;
import nu.marginalia.control.model.MessageQueueEntry; import nu.marginalia.control.model.MessageQueueEntry;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.graph.AbstractStateGraph;
import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -34,20 +35,7 @@ public class MessageQueueViewService {
List<MessageQueueEntry> entries = new ArrayList<>(n); List<MessageQueueEntry> entries = new ArrayList<>(n);
var rs = query.executeQuery(); var rs = query.executeQuery();
while (rs.next()) { while (rs.next()) {
entries.add(new MessageQueueEntry( entries.add(newEntry(rs));
rs.getLong("ID"),
rs.getLong("RELATED_ID"),
rs.getString("SENDER_INBOX"),
rs.getString("RECIPIENT_INBOX"),
rs.getString("FUNCTION"),
rs.getString("PAYLOAD"),
rs.getString("OWNER_INSTANCE"),
rs.getLong("OWNER_TICK"),
rs.getString("STATE"),
rs.getTimestamp("CREATED_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getTimestamp("UPDATED_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getInt("TTL")
));
} }
return entries; return entries;
} }
@ -68,20 +56,7 @@ public class MessageQueueViewService {
var rs = query.executeQuery(); var rs = query.executeQuery();
if (rs.next()) { if (rs.next()) {
return new MessageQueueEntry( return newEntry(rs);
rs.getLong("ID"),
rs.getLong("RELATED_ID"),
rs.getString("SENDER_INBOX"),
rs.getString("RECIPIENT_INBOX"),
rs.getString("FUNCTION"),
rs.getString("PAYLOAD"),
rs.getString("OWNER_INSTANCE"),
rs.getLong("OWNER_TICK"),
rs.getString("STATE"),
rs.getTimestamp("CREATED_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getTimestamp("UPDATED_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getInt("TTL")
);
} }
} }
catch (SQLException ex) { catch (SQLException ex) {
@ -105,7 +80,94 @@ public class MessageQueueViewService {
List<MessageQueueEntry> entries = new ArrayList<>(n); List<MessageQueueEntry> entries = new ArrayList<>(n);
var rs = query.executeQuery(); var rs = query.executeQuery();
while (rs.next()) { while (rs.next()) {
entries.add(new MessageQueueEntry( entries.add(newEntry(rs));
}
return entries;
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
public List<MessageQueueEntry> getEntriesForInbox(String inbox, long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ? AND (RECIPIENT_INBOX = ? OR SENDER_INBOX = ?)
ORDER BY ID DESC
LIMIT ?
""")) {
query.setLong(1, afterId);
query.setString(2, inbox);
query.setString(3, inbox);
query.setInt(4, n);
List<MessageQueueEntry> entries = new ArrayList<>(n);
var rs = query.executeQuery();
while (rs.next()) {
entries.add(newEntry(rs));
}
return entries;
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
public List<MessageQueueEntry> getEntriesForInstance(String instance, long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ? AND OWNER_INSTANCE = ?
ORDER BY ID DESC
LIMIT ?
""")) {
query.setLong(1, afterId);
query.setString(2, instance);
query.setInt(3, n);
List<MessageQueueEntry> entries = new ArrayList<>(n);
var rs = query.executeQuery();
while (rs.next()) {
entries.add(newEntry(rs));
}
return entries;
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
public List<MessageQueueEntry> getEntries(long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ?
ORDER BY ID DESC
LIMIT ?
""")) {
query.setLong(1, afterId);
query.setInt(2, n);
List<MessageQueueEntry> entries = new ArrayList<>(n);
var rs = query.executeQuery();
while (rs.next()) {
entries.add(newEntry(rs));
}
return entries;
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
private MessageQueueEntry newEntry(ResultSet rs) throws SQLException {
return new MessageQueueEntry(
rs.getLong("ID"), rs.getLong("ID"),
rs.getLong("RELATED_ID"), rs.getLong("RELATED_ID"),
rs.getString("SENDER_INBOX"), rs.getString("SENDER_INBOX"),
@ -117,13 +179,6 @@ public class MessageQueueViewService {
rs.getString("STATE"), rs.getString("STATE"),
rs.getTimestamp("CREATED_TIME").toLocalDateTime().toLocalTime().toString(), rs.getTimestamp("CREATED_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getTimestamp("UPDATED_TIME").toLocalDateTime().toLocalTime().toString(), rs.getTimestamp("UPDATED_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getInt("TTL") rs.getInt("TTL"));
));
}
return entries;
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
} }
} }

View File

@ -9,7 +9,7 @@
<p>Update the of a message in the message queue. This may be useful to prevent an actor <p>Update the of a message in the message queue. This may be useful to prevent an actor
from resuming an action when this is not desirable. Setting an old message to 'NEW' will from resuming an action when this is not desirable. Setting an old message to 'NEW' will
erase information about its owner, and inboxes will consider the message new again.</p> erase information about its owner, and inboxes will consider the message new again.</p>
<form method="post" action="/message/{{msgId}}/state"> <form method="post" action="/message-queue/{{msgId}}/edit">
<label for="msgId">msgId</label><br> <label for="msgId">msgId</label><br>
<input type="text" disabled id="msgId" name="msgId" value="{{msgId}}"> <input type="text" disabled id="msgId" name="msgId" value="{{msgId}}">
<br> <br>

View File

@ -0,0 +1,20 @@
<!DOCTYPE html>
<html>
<head>
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="/style.css" />
</head>
<body>
{{> control/partials/nav}}
<section>
{{> control/partials/message-queue-table }}
</section>
</body>
<script src="/refresh.js"></script>
<script>
window.setInterval(() => {
refresh(["queue"]);
}, 5000);
</script>
</html>

View File

@ -0,0 +1,31 @@
<!doctype html>
<html>
<link rel="stylesheet" href="/style.css" />
<head><title>Update ID</title></head>
<body>
{{> control/partials/nav}}
<section>
<h1>Create Message</h1>
<form method="post" action="/message-queue/">
<label for="recipientInbox">recipientInbox</label><br>
<input type="text" id="recipientInbox" name="recipientInbox" value="{{recipientInbox}}">
<br>
<label for="senderInbox">senderInbox</label><br>
<input type="text" id="senderInbox" name="senderInbox" value="{{senderInbox}}">
<br>
<label for="relatedId">relatedId</label><br>
<input type="text" id="relatedId" name="relatedId" value="{{relatedId}}">
<br>
<label for="function">function</label><br>
<input type="text" id="function" name="function" value="{{function}}">
<br>
<label for="payload">payload</label><br>
<textarea rows="6" cols="40" id="payload" name="payload">{{payload}}</textarea>
<br>
<br>
<input type="submit" value="Create Message">
</form>
</section>
</body>
</html>

View File

@ -1,7 +1,9 @@
<h1>Message Queue</h1> <h1>Message Queue</h1>
<table id="queue" class="table-rh-2"> <table id="queue" class="table-rh-2">
<thead>
<tr> <tr>
<th>Action</th>
<th>State<br>TTL</th> <th>State<br>TTL</th>
<th>Msg ID<br>Related ID</th> <th>Msg ID<br>Related ID</th>
<th>Recipient<br>Sender</th> <th>Recipient<br>Sender</th>
@ -9,25 +11,42 @@
<th>Owner Instance<br>Owner Tick</th> <th>Owner Instance<br>Owner Tick</th>
<th>Created<br>Updated</th> <th>Created<br>Updated</th>
</tr> </tr>
<tr></tr> <tr>
<td colspan="7" style="padding: 0.5ch">
<a style="float:right" href="/message-queue/new">[Add Message]</a>
</td>
</tr>
</thead>
{{#each messages}} {{#each messages}}
<tr> <tr>
<td>{{stateCode}}&nbsp;<a onClick="updateMsgState({{id}})" href="/message/{{id}}/state">{{state}}</a></td> <td><a href="/message-queue/{{id}}/edit">[Edit]</a></td>
<td>{{stateCode}}&nbsp;{{state}}</td>
<td>{{id}}</td> <td>{{id}}</td>
<td>{{recipientInbox}}</td> <td><a href="/message-queue?inbox={{recipientInbox}}">{{recipientInbox}}</a></td>
<td>{{function}}</td> <td>{{function}}</td>
<td title="{{ownerInstanceFull}}"> <td title="{{ownerInstanceFull}}">
<span style="background-color: {{ownerInstanceColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{ownerInstanceColor2}}" class="uuidPip">&nbsp;</span>&nbsp;{{ownerInstance}} <span style="background-color: {{ownerInstanceColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{ownerInstanceColor2}}" class="uuidPip">&nbsp;</span>&nbsp;<a href="/message-queue?instance={{ownerInstanceFull}}">{{ownerInstance}}</a>
</td> </td>
<td>{{createdTime}}</td> <td>{{createdTime}}</td>
</tr> </tr>
<tr> <tr>
<td>
{{#if senderInbox}}<a href="/message-queue/{{id}}/reply">[Reply]</a>{{/if}}
</td>
<td>{{ttl}}</td> <td>{{ttl}}</td>
<td>{{relatedId}}</td> <td>{{relatedId}}</td>
<td>{{senderInbox}}</td> <td><a href="/message-queue?inbox={{senderInbox}}">{{senderInbox}}</a></td>
<td>{{payload}}</td> <td>{{payload}}</td>
<td>{{ownerTick}}</td> <td>{{ownerTick}}</td>
<td>{{updatedTime}}</td> <td>{{updatedTime}}</td>
</tr> </tr>
{{/each}} {{/each}}
<tfoot>
<tr>
<td colspan="7" style="padding: 0.5ch">
{{#if prev}}<a href="?after={{prev}}">Prev</a>{{/if}}
{{#if next}}<a href="?after={{next}}" style="float:right">Next</a>{{/if}}
</td>
</tr>
</tfoot>
</table> </table>

View File

@ -3,6 +3,7 @@
<li><a href="/">Overview</a></li> <li><a href="/">Overview</a></li>
<li><a href="/services">Services</a></li> <li><a href="/services">Services</a></li>
<li><a href="/actors">Actors</a></li> <li><a href="/actors">Actors</a></li>
<li><a href="/message-queue">Message Queue</a></li>
<li><a href="/storage">Storage</a></li> <li><a href="/storage">Storage</a></li>
<li><a href="/api-keys">API Keys</a></li> <li><a href="/api-keys">API Keys</a></li>
<li><a href="/blacklist">Blacklist</a></li> <li><a href="/blacklist">Blacklist</a></li>

View File

@ -11,10 +11,9 @@
</tr> </tr>
{{#each processes}} {{#each processes}}
<tr class="{{#if isMissing}}missing{{/if}}"> <tr class="{{#if isMissing}}missing{{/if}}">
<td>{{displayName}}</td> <td><a href="/message-queue?inbox={{processId}}">{{displayName}}</a></td>
<td title="{{uuidFull}}"> <td title="{{uuidFull}}">
<span style="background-color: {{uuidColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{uuidColor2}}" class="uuidPip">&nbsp;</span> <span style="background-color: {{uuidColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{uuidColor2}}" class="uuidPip">&nbsp;</span>{{uuid}}
{{uuid}}
</td> </td>
<td>{{status}}</td> <td>{{status}}</td>
<td style="{{progressStyle}}">{{#if progress}}{{progress}}%{{/if}}</td> <td style="{{progressStyle}}">{{#if progress}}{{progress}}%{{/if}}</td>

View File

@ -10,6 +10,7 @@
<section> <section>
<h1>Services/{{id}}</h1> <h1>Services/{{id}}</h1>
{{> control/partials/events-table }} {{> control/partials/events-table }}
{{> control/partials/message-queue-table }}
</section> </section>
</body> </body>
<script src="/refresh.js"></script> <script src="/refresh.js"></script>