diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java index 78ec0e5f..eca19900 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -5,7 +5,7 @@ import com.google.inject.Inject; import nu.marginalia.client.ServiceMonitors; import nu.marginalia.control.model.Actor; import nu.marginalia.control.model.DomainComplaintModel; -import nu.marginalia.control.model.ProcessHeartbeat; +import nu.marginalia.control.model.MessageQueueEntry; import nu.marginalia.control.svc.*; import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageType; @@ -27,6 +27,7 @@ import java.sql.SQLException; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; public class ControlService extends Service { @@ -80,8 +81,11 @@ public class ControlService extends Service { var apiKeysRenderer = rendererFactory.renderer("control/api-keys"); var domainComplaintsRenderer = rendererFactory.renderer("control/domain-complaints"); + var messageQueueRenderer = rendererFactory.renderer("control/message-queue"); + var storageDetailsRenderer = rendererFactory.renderer("control/storage-details"); var updateMessageStateRenderer = rendererFactory.renderer("control/dialog-update-message-state"); + var newMessageRenderer = rendererFactory.renderer("control/new-message"); this.controlActorService = controlActorService; @@ -98,7 +102,7 @@ public class ControlService extends Service { Spark.get("/public/services", this::servicesModel, servicesRenderer::render); Spark.get("/public/services/:id", this::serviceModel, serviceByIdRenderer::render); - Spark.get("/public/messages/:id", this::messageModel, gson::toJson); + Spark.get("/public/messages/:id", this::existingMessageModel, gson::toJson); Spark.get("/public/actors", this::processesModel, actorsRenderer::render); Spark.get("/public/actors/:fsm", this::actorDetailsModel, actorDetailsRenderer::render); Spark.get("/public/storage", this::storageModel, storageRenderer::render); @@ -114,9 +118,38 @@ public class ControlService extends Service { final HtmlRedirect redirectToApiKeys = new HtmlRedirect("/api-keys"); final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage"); final HtmlRedirect redirectToComplaints = new HtmlRedirect("/complaints"); + final HtmlRedirect redirectToMessageQueue = new HtmlRedirect("/message-queue"); Spark.post("/public/fsms/:fsm/start", controlActorService::startFsm, redirectToProcesses); Spark.post("/public/fsms/:fsm/stop", controlActorService::stopFsm, redirectToProcesses); + + Spark.get("/public/message-queue", this::messageQueueModel, messageQueueRenderer::render); + Spark.post("/public/message-queue/", (rq, rsp) -> { + String recipient = rq.queryParams("recipientInbox"); + String sender = rq.queryParams("senderInbox"); + String relatedMessage = rq.queryParams("relatedId"); + String function = rq.queryParams("function"); + String payload = rq.queryParams("payload"); + + persistence.sendNewMessage(recipient, + sender, + relatedMessage == null ? null : Long.parseLong(relatedMessage), + function, + payload, + null); + + return ""; + }, redirectToMessageQueue); + Spark.get("/public/message-queue/new", this::newMessageModel, newMessageRenderer::render); + Spark.get("/public/message-queue/:id/reply", this::replyMessageModel, newMessageRenderer::render); + Spark.get("/public/message-queue/:id/edit", (rq, rsp) -> persistence.getMessage(Long.parseLong(rq.params("id"))), updateMessageStateRenderer::render); + Spark.post("/public/message-queue/:id/edit", (rq, rsp) -> { + MqMessageState state = MqMessageState.valueOf(rq.queryParams("state")); + long id = Long.parseLong(rq.params("id")); + persistence.updateMessageState(id, state); + return ""; + }, redirectToMessageQueue); + Spark.post("/public/storage/:fid/crawl", controlActorService::triggerCrawling, redirectToProcesses); Spark.post("/public/storage/:fid/recrawl", controlActorService::triggerRecrawling, redirectToProcesses); Spark.post("/public/storage/:fid/process", controlActorService::triggerProcessing, redirectToProcesses); @@ -134,19 +167,42 @@ public class ControlService extends Service { Spark.get("/public/complaints", this::complaintsModel, domainComplaintsRenderer::render); Spark.post("/public/complaints/:domain", this::reviewComplaint, redirectToComplaints); - Spark.get("/public/message/:id/state", (rq, rsp) -> persistence.getMessage(Long.parseLong(rq.params("id"))), updateMessageStateRenderer::render); - Spark.post("/public/message/:id/state", (rq, rsp) -> { - MqMessageState state = MqMessageState.valueOf(rq.queryParams("state")); - long id = Long.parseLong(rq.params("id")); - persistence.updateMessageState(id, state); - return ""; - }, redirectToProcesses); - Spark.get("/public/:resource", this::serveStatic); monitors.subscribe(this::logMonitorStateChange); } + private Object messageQueueModel(Request request, Response response) { + String inboxParam = request.queryParams("inbox"); + String instanceParam = request.queryParams("instance"); + String afterParam = request.queryParams("after"); + + long afterId = Optional.ofNullable(afterParam).map(Long::parseLong).orElse(Long.MAX_VALUE); + + List entries; + + if (inboxParam != null) { + entries = messageQueueViewService.getEntriesForInbox(inboxParam, afterId, 20); + } + else if (instanceParam != null) { + entries = messageQueueViewService.getEntriesForInstance(instanceParam, afterId, 20); + } + else { + entries = messageQueueViewService.getEntries(afterId, 20); + } + + Object next; + + if (entries.size() == 20) + next = entries.stream().mapToLong(MessageQueueEntry::id).min().getAsLong(); + else + next = ""; + + Object prev = afterParam == null ? "" : afterParam; + + return Map.of("messages", entries, "next", next, "prev", prev); + } + private Object complaintsModel(Request request, Response response) { Map> complaintsByReviewed = domainComplaintService.getComplaints().stream().collect(Collectors.partitioningBy(DomainComplaintModel::reviewed)); @@ -224,7 +280,7 @@ public class ControlService extends Service { } - private Object messageModel(Request request, Response response) { + private Object existingMessageModel(Request request, Response response) { var message = messageQueueViewService.getMessage(Long.parseLong(request.params("id"))); if (message != null) { response.type("application/json"); @@ -236,11 +292,34 @@ public class ControlService extends Service { } } + private Object newMessageModel(Request request, Response response) { + String idParam = request.queryParams("id"); + if (null == idParam) + return Map.of("relatedId", "-1"); + + var message = messageQueueViewService.getMessage(Long.parseLong(idParam)); + if (message != null) + return message; + + return Map.of("relatedId", "-1"); + } + private Object replyMessageModel(Request request, Response response) { + String idParam = request.params("id"); + + var message = messageQueueViewService.getMessage(Long.parseLong(idParam)); + + return Map.of("relatedId", message.id(), + "recipientInbox", message.senderInbox(), + "function", "REPLY"); + } + + private Object serviceModel(Request request, Response response) { String serviceName = request.params("id"); return Map.of( "id", serviceName, + "messages", messageQueueViewService.getEntriesForInbox(serviceName, Long.MAX_VALUE, 20), "events", eventLogService.getLastEntriesForService(serviceName, 20)); } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java index f52ba3a1..8f3a45a7 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java @@ -7,6 +7,7 @@ import nu.marginalia.control.model.Actor; import nu.marginalia.control.model.MessageQueueEntry; import nu.marginalia.mqsm.graph.AbstractStateGraph; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; @@ -34,20 +35,7 @@ public class MessageQueueViewService { List entries = new ArrayList<>(n); var rs = query.executeQuery(); while (rs.next()) { - entries.add(new MessageQueueEntry( - rs.getLong("ID"), - rs.getLong("RELATED_ID"), - rs.getString("SENDER_INBOX"), - rs.getString("RECIPIENT_INBOX"), - rs.getString("FUNCTION"), - rs.getString("PAYLOAD"), - rs.getString("OWNER_INSTANCE"), - rs.getLong("OWNER_TICK"), - rs.getString("STATE"), - rs.getTimestamp("CREATED_TIME").toLocalDateTime().toLocalTime().toString(), - rs.getTimestamp("UPDATED_TIME").toLocalDateTime().toLocalTime().toString(), - rs.getInt("TTL") - )); + entries.add(newEntry(rs)); } return entries; } @@ -68,20 +56,7 @@ public class MessageQueueViewService { var rs = query.executeQuery(); if (rs.next()) { - return new MessageQueueEntry( - rs.getLong("ID"), - rs.getLong("RELATED_ID"), - rs.getString("SENDER_INBOX"), - rs.getString("RECIPIENT_INBOX"), - rs.getString("FUNCTION"), - rs.getString("PAYLOAD"), - rs.getString("OWNER_INSTANCE"), - rs.getLong("OWNER_TICK"), - rs.getString("STATE"), - rs.getTimestamp("CREATED_TIME").toLocalDateTime().toLocalTime().toString(), - rs.getTimestamp("UPDATED_TIME").toLocalDateTime().toLocalTime().toString(), - rs.getInt("TTL") - ); + return newEntry(rs); } } catch (SQLException ex) { @@ -105,20 +80,7 @@ public class MessageQueueViewService { List entries = new ArrayList<>(n); var rs = query.executeQuery(); while (rs.next()) { - entries.add(new MessageQueueEntry( - rs.getLong("ID"), - rs.getLong("RELATED_ID"), - rs.getString("SENDER_INBOX"), - rs.getString("RECIPIENT_INBOX"), - rs.getString("FUNCTION"), - rs.getString("PAYLOAD"), - rs.getString("OWNER_INSTANCE"), - rs.getLong("OWNER_TICK"), - rs.getString("STATE"), - rs.getTimestamp("CREATED_TIME").toLocalDateTime().toLocalTime().toString(), - rs.getTimestamp("UPDATED_TIME").toLocalDateTime().toLocalTime().toString(), - rs.getInt("TTL") - )); + entries.add(newEntry(rs)); } return entries; } @@ -126,4 +88,97 @@ public class MessageQueueViewService { throw new RuntimeException(ex); } } + + public List getEntriesForInbox(String inbox, long afterId, int n) { + try (var conn = dataSource.getConnection(); + var query = conn.prepareStatement(""" + SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + FROM MESSAGE_QUEUE + WHERE ID < ? AND (RECIPIENT_INBOX = ? OR SENDER_INBOX = ?) + ORDER BY ID DESC + LIMIT ? + """)) { + + query.setLong(1, afterId); + query.setString(2, inbox); + query.setString(3, inbox); + query.setInt(4, n); + + List entries = new ArrayList<>(n); + var rs = query.executeQuery(); + while (rs.next()) { + entries.add(newEntry(rs)); + } + return entries; + } + catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + public List getEntriesForInstance(String instance, long afterId, int n) { + try (var conn = dataSource.getConnection(); + var query = conn.prepareStatement(""" + SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + FROM MESSAGE_QUEUE + WHERE ID < ? AND OWNER_INSTANCE = ? + ORDER BY ID DESC + LIMIT ? + """)) { + + query.setLong(1, afterId); + query.setString(2, instance); + query.setInt(3, n); + + List entries = new ArrayList<>(n); + var rs = query.executeQuery(); + while (rs.next()) { + entries.add(newEntry(rs)); + } + return entries; + } + catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + + public List getEntries(long afterId, int n) { + try (var conn = dataSource.getConnection(); + var query = conn.prepareStatement(""" + SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + FROM MESSAGE_QUEUE + WHERE ID < ? + ORDER BY ID DESC + LIMIT ? + """)) { + + query.setLong(1, afterId); + query.setInt(2, n); + + List entries = new ArrayList<>(n); + var rs = query.executeQuery(); + while (rs.next()) { + entries.add(newEntry(rs)); + } + return entries; + } + catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + + private MessageQueueEntry newEntry(ResultSet rs) throws SQLException { + return new MessageQueueEntry( + rs.getLong("ID"), + rs.getLong("RELATED_ID"), + rs.getString("SENDER_INBOX"), + rs.getString("RECIPIENT_INBOX"), + rs.getString("FUNCTION"), + rs.getString("PAYLOAD"), + rs.getString("OWNER_INSTANCE"), + rs.getLong("OWNER_TICK"), + rs.getString("STATE"), + rs.getTimestamp("CREATED_TIME").toLocalDateTime().toLocalTime().toString(), + rs.getTimestamp("UPDATED_TIME").toLocalDateTime().toLocalTime().toString(), + rs.getInt("TTL")); + } } diff --git a/code/services-core/control-service/src/main/resources/templates/control/dialog-update-message-state.hdb b/code/services-core/control-service/src/main/resources/templates/control/dialog-update-message-state.hdb index 13d25615..7acae272 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/dialog-update-message-state.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/dialog-update-message-state.hdb @@ -9,7 +9,7 @@

Update the of a message in the message queue. This may be useful to prevent an actor from resuming an action when this is not desirable. Setting an old message to 'NEW' will erase information about its owner, and inboxes will consider the message new again.

-
+

diff --git a/code/services-core/control-service/src/main/resources/templates/control/message-queue.hdb b/code/services-core/control-service/src/main/resources/templates/control/message-queue.hdb new file mode 100644 index 00000000..cc5b5da9 --- /dev/null +++ b/code/services-core/control-service/src/main/resources/templates/control/message-queue.hdb @@ -0,0 +1,20 @@ + + + + Control Service + + + + + {{> control/partials/nav}} +
+ {{> control/partials/message-queue-table }} +
+ + + + \ No newline at end of file diff --git a/code/services-core/control-service/src/main/resources/templates/control/new-message.hdb b/code/services-core/control-service/src/main/resources/templates/control/new-message.hdb new file mode 100644 index 00000000..211d690c --- /dev/null +++ b/code/services-core/control-service/src/main/resources/templates/control/new-message.hdb @@ -0,0 +1,31 @@ + + + +Update ID + +{{> control/partials/nav}} +
+

Create Message

+ +
+ +
+
+ +
+
+ +
+
+ +
+
+ +
+
+ + + +
+ + \ No newline at end of file diff --git a/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb b/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb index b971c928..6aac4572 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb @@ -1,7 +1,9 @@

Message Queue

+ + @@ -9,25 +11,42 @@ - + + + + {{#each messages}} - + + - + + - + {{/each}} + + + + +
Action State
TTL
Msg ID
Related ID
Recipient
Sender
Owner Instance
Owner Tick
Created
Updated
+ [Add Message] +
{{stateCode}} {{state}}[Edit]{{stateCode}} {{state}} {{id}}{{recipientInbox}}{{recipientInbox}} {{function}} -    {{ownerInstance}} +    {{ownerInstance}} {{createdTime}}
+ {{#if senderInbox}}[Reply]{{/if}} + {{ttl}} {{relatedId}}{{senderInbox}}{{senderInbox}} {{payload}} {{ownerTick}} {{updatedTime}}
+ {{#if prev}}Prev{{/if}} + {{#if next}}Next{{/if}} +
diff --git a/code/services-core/control-service/src/main/resources/templates/control/partials/nav.hdb b/code/services-core/control-service/src/main/resources/templates/control/partials/nav.hdb index 184f28b9..94f3b13a 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/partials/nav.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/partials/nav.hdb @@ -3,6 +3,7 @@
  • Overview
  • Services
  • Actors
  • +
  • Message Queue
  • Storage
  • API Keys
  • Blacklist
  • diff --git a/code/services-core/control-service/src/main/resources/templates/control/partials/processes-table.hdb b/code/services-core/control-service/src/main/resources/templates/control/partials/processes-table.hdb index d1a4eeea..50ab8d58 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/partials/processes-table.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/partials/processes-table.hdb @@ -11,10 +11,9 @@ {{#each processes}} - {{displayName}} + {{displayName}} -    - {{uuid}} +   {{uuid}} {{status}} {{#if progress}}{{progress}}%{{/if}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/service-by-id.hdb b/code/services-core/control-service/src/main/resources/templates/control/service-by-id.hdb index 5b1fe6b4..f350ac5a 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/service-by-id.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/service-by-id.hdb @@ -10,6 +10,7 @@

    Services/{{id}}

    {{> control/partials/events-table }} + {{> control/partials/message-queue-table }}