(control) Clean up the ControlService, move mq-related endpoints to MessageQueueService.

This commit is contained in:
Viktor Lofgren 2023-08-09 12:42:01 +02:00
parent afad4f5ebb
commit 71dfe9f33e
22 changed files with 145 additions and 144 deletions

View File

@ -3,16 +3,13 @@ package nu.marginalia.control;
import com.google.gson.Gson;
import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.actor.Actor;
import nu.marginalia.control.model.DomainComplaintModel;
import nu.marginalia.control.model.MessageQueueEntry;
import nu.marginalia.control.svc.*;
import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.renderer.RendererFactory;
import nu.marginalia.service.server.*;
import org.eclipse.jetty.util.StringUtil;
@ -27,7 +24,6 @@ import java.sql.SQLException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class ControlService extends Service {
@ -42,7 +38,7 @@ public class ControlService extends Service {
private final DomainComplaintService domainComplaintService;
private final ControlActorService controlActorService;
private final StaticResources staticResources;
private final MessageQueueViewService messageQueueViewService;
private final MessageQueueService messageQueueService;
private final ControlFileStorageService controlFileStorageService;
@ -54,12 +50,11 @@ public class ControlService extends Service {
RendererFactory rendererFactory,
ControlActorService controlActorService,
StaticResources staticResources,
MessageQueueViewService messageQueueViewService,
MessageQueueService messageQueueService,
ControlFileStorageService controlFileStorageService,
ApiKeyService apiKeyService,
DomainComplaintService domainComplaintService,
ControlActionsService controlActionsService,
MqPersistence persistence
ControlActionsService controlActionsService
) throws IOException {
super(params);
@ -94,7 +89,7 @@ public class ControlService extends Service {
this.controlActorService = controlActorService;
this.staticResources = staticResources;
this.messageQueueViewService = messageQueueViewService;
this.messageQueueService = messageQueueService;
this.controlFileStorageService = controlFileStorageService;
Spark.get("/public/heartbeats", (req, res) -> {
@ -107,7 +102,6 @@ public class ControlService extends Service {
Spark.get("/public/actions", (rq,rsp) -> new Object() , actionsViewRenderer::render);
Spark.get("/public/services", this::servicesModel, servicesRenderer::render);
Spark.get("/public/services/:id", this::serviceModel, serviceByIdRenderer::render);
Spark.get("/public/messages/:id", this::existingMessageModel, gson::toJson);
Spark.get("/public/actors", this::processesModel, actorsRenderer::render);
Spark.get("/public/actors/:fsm", this::actorDetailsModel, actorDetailsRenderer::render);
@ -125,37 +119,13 @@ public class ControlService extends Service {
// Message Queue
Spark.get("/public/message-queue", this::messageQueueModel, messageQueueRenderer::render);
Spark.post("/public/message-queue/", (rq, rsp) -> {
String recipient = rq.queryParams("recipientInbox");
String sender = rq.queryParams("senderInbox");
String relatedMessage = rq.queryParams("relatedId");
String function = rq.queryParams("function");
String payload = rq.queryParams("payload");
persistence.sendNewMessage(recipient,
sender.isBlank() ? null : sender,
relatedMessage == null ? null : Long.parseLong(relatedMessage),
function,
payload,
null);
return "";
}, redirectToMessageQueue);
Spark.get("/public/message-queue/new", this::newMessageModel, newMessageRenderer::render);
Spark.get("/public/message-queue/:id",
(rq, rsp) -> Map.of("message", messageQueueViewService.getMessage(Long.parseLong(rq.params("id"))),
"relatedMessages", messageQueueViewService.getRelatedMessages(Long.parseLong(rq.params("id"))))
, viewMessageRenderer::render);
Spark.get("/public/message-queue/:id/reply", this::replyMessageModel, newMessageRenderer::render);
Spark.get("/public/message-queue/:id/edit", (rq, rsp) -> persistence.getMessage(Long.parseLong(rq.params("id"))), updateMessageStateRenderer::render);
Spark.post("/public/message-queue/:id/edit", (rq, rsp) -> {
MqMessageState state = MqMessageState.valueOf(rq.queryParams("state"));
long id = Long.parseLong(rq.params("id"));
persistence.updateMessageState(id, state);
return "";
}, redirectToMessageQueue);
Spark.get("/public/message-queue", messageQueueService::listMessageQueueModel, messageQueueRenderer::render);
Spark.post("/public/message-queue/", messageQueueService::createMessage, redirectToMessageQueue);
Spark.get("/public/message-queue/new", messageQueueService::newMessageModel, newMessageRenderer::render);
Spark.get("/public/message-queue/:id", messageQueueService::viewMessageModel, viewMessageRenderer::render);
Spark.get("/public/message-queue/:id/reply", messageQueueService::replyMessageModel, newMessageRenderer::render);
Spark.get("/public/message-queue/:id/edit", messageQueueService::viewMessageForEditStateModel, updateMessageStateRenderer::render);
Spark.post("/public/message-queue/:id/edit", messageQueueService::editMessageState, redirectToMessageQueue);
// Storage
Spark.get("/public/storage", this::storageModel, storageRenderer::render);
@ -211,42 +181,6 @@ public class ControlService extends Service {
);
}
private Object messageQueueModel(Request request, Response response) {
String inboxParam = request.queryParams("inbox");
String instanceParam = request.queryParams("instance");
String afterParam = request.queryParams("after");
long afterId = Optional.ofNullable(afterParam).map(Long::parseLong).orElse(Long.MAX_VALUE);
List<MessageQueueEntry> entries;
String mqFilter = "filter=none";
if (inboxParam != null) {
mqFilter = "inbox=" + inboxParam;
entries = messageQueueViewService.getEntriesForInbox(inboxParam, afterId, 20);
}
else if (instanceParam != null) {
mqFilter = "instance=" + instanceParam;
entries = messageQueueViewService.getEntriesForInstance(instanceParam, afterId, 20);
}
else {
entries = messageQueueViewService.getEntries(afterId, 20);
}
Object next;
if (entries.size() == 20)
next = entries.stream().mapToLong(MessageQueueEntry::id).min().getAsLong();
else
next = "";
Object prev = afterParam == null ? "" : afterParam;
return Map.of("messages", entries,
"next", next,
"prev", prev,
"mqFilter", mqFilter);
}
private Object complaintsModel(Request request, Response response) {
Map<Boolean, List<DomainComplaintModel>> complaintsByReviewed =
@ -325,46 +259,12 @@ public class ControlService extends Service {
}
private Object existingMessageModel(Request request, Response response) {
var message = messageQueueViewService.getMessage(Long.parseLong(request.params("id")));
if (message != null) {
response.type("application/json");
return message;
}
else {
response.status(404);
return "";
}
}
private Object newMessageModel(Request request, Response response) {
String idParam = request.queryParams("id");
if (null == idParam)
return Map.of("relatedId", "-1");
var message = messageQueueViewService.getMessage(Long.parseLong(idParam));
if (message != null)
return message;
return Map.of("relatedId", "-1");
}
private Object replyMessageModel(Request request, Response response) {
String idParam = request.params("id");
var message = messageQueueViewService.getMessage(Long.parseLong(idParam));
return Map.of("relatedId", message.id(),
"recipientInbox", message.senderInbox(),
"function", "REPLY");
}
private Object serviceModel(Request request, Response response) {
String serviceName = request.params("id");
return Map.of(
"id", serviceName,
"messages", messageQueueViewService.getEntriesForInbox(serviceName, Long.MAX_VALUE, 20),
"messages", messageQueueService.getEntriesForInbox(serviceName, Long.MAX_VALUE, 20),
"events", eventLogService.getLastEntriesForService(serviceName, 20));
}
@ -396,7 +296,7 @@ public class ControlService extends Service {
return Map.of("processes", processes,
"jobs", jobs,
"actors", controlActorService.getActorStates(),
"messages", messageQueueViewService.getLastEntries(20));
"messages", messageQueueService.getLastEntries(20));
}
private Object actorDetailsModel(Request request, Response response) {
final Actor actor = Actor.valueOf(request.params("fsm").toUpperCase());
@ -405,7 +305,7 @@ public class ControlService extends Service {
return Map.of(
"actor", actor,
"state-graph", controlActorService.getActorStateGraph(actor),
"messages", messageQueueViewService.getLastEntriesForInbox(inbox, 20));
"messages", messageQueueService.getLastEntriesForInbox(inbox, 20));
}
private Object serveStatic(Request request, Response response) {
String resource = request.params("resource");

View File

@ -5,6 +5,9 @@ import spark.ResponseTransformer;
public class HtmlRedirect implements ResponseTransformer {
private final String html;
/** Because Spark doesn't have a redirect method that works with relative URLs
* (without explicitly providing the external address),we use HTML and let the
* browser resolve the relative redirect instead */
public HtmlRedirect(String destination) {
this.html = """
<?doctype html>

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.model;
package nu.marginalia.control.actor;
public enum Actor {
CRAWL,

View File

@ -5,7 +5,6 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.control.actor.task.*;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.actor.monitor.*;
import nu.marginalia.control.actor.monitor.ConverterMonitorActor;
import nu.marginalia.control.actor.monitor.LoaderMonitorActor;

View File

@ -2,7 +2,7 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;

View File

@ -2,7 +2,7 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;

View File

@ -2,7 +2,7 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqsm.StateFactory;

View File

@ -2,7 +2,7 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;

View File

@ -2,10 +2,9 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.model.ProcessHeartbeat;
import nu.marginalia.control.model.ServiceHeartbeat;
import nu.marginalia.control.svc.HeartbeatService;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;

View File

@ -2,7 +2,7 @@ package nu.marginalia.control.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqsm.graph.ControlFlowException;

View File

@ -6,8 +6,8 @@ import com.google.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.control.svc.ProcessOutboxes;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessOutboxes;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorageBaseType;
import nu.marginalia.db.storage.model.FileStorageId;

View File

@ -6,8 +6,8 @@ import com.google.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.control.svc.ProcessOutboxes;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessOutboxes;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorageBaseType;
import nu.marginalia.db.storage.model.FileStorageId;

View File

@ -3,7 +3,7 @@ package nu.marginalia.control.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ControlFileStorageService;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageBaseType;

View File

@ -6,8 +6,8 @@ import com.google.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.control.svc.ProcessOutboxes;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessOutboxes;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.mqapi.converting.ConvertAction;

View File

@ -6,8 +6,8 @@ import com.google.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.control.svc.ProcessOutboxes;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessOutboxes;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageId;

View File

@ -2,7 +2,7 @@ package nu.marginalia.control.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;

View File

@ -1,6 +1,6 @@
package nu.marginalia.control.model;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.control.process.ProcessService;
public record ProcessHeartbeat(
String processId,

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.svc;
package nu.marginalia.control.process;
import com.google.inject.Inject;
import com.google.inject.Singleton;

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.svc;
package nu.marginalia.control.process;
import com.google.inject.name.Named;
import nu.marginalia.service.control.ServiceEventLog;

View File

@ -3,7 +3,7 @@ package nu.marginalia.control.svc;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.actor.ControlActors;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.actor.Actor;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.mq.MessageQueueFactory;

View File

@ -6,7 +6,7 @@ import nu.marginalia.control.actor.ControlActors;
import nu.marginalia.control.actor.task.CrawlJobExtractorActor;
import nu.marginalia.control.actor.task.ReconvertAndLoadActor;
import nu.marginalia.control.actor.task.RecrawlActor;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.actor.Actor;
import nu.marginalia.control.model.ActorRunState;
import nu.marginalia.control.model.ActorStateGraph;
import nu.marginalia.db.storage.model.FileStorageId;

View File

@ -3,23 +3,123 @@ package nu.marginalia.control.svc;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.model.MessageQueueEntry;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import spark.Request;
import spark.Response;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Singleton
public class MessageQueueViewService {
public class MessageQueueService {
private final HikariDataSource dataSource;
private final MqPersistence persistence;
@Inject
public MessageQueueViewService(HikariDataSource dataSource) {
public MessageQueueService(HikariDataSource dataSource, MqPersistence persistence) {
this.dataSource = dataSource;
this.persistence = persistence;
}
public Object viewMessageModel(Request request, Response response) {
return Map.of("message", getMessage(Long.parseLong(request.params("id"))),
"relatedMessages", getRelatedMessages(Long.parseLong(request.params("id"))));
}
public Object listMessageQueueModel(Request request, Response response) {
String inboxParam = request.queryParams("inbox");
String instanceParam = request.queryParams("instance");
String afterParam = request.queryParams("after");
long afterId = Optional.ofNullable(afterParam).map(Long::parseLong).orElse(Long.MAX_VALUE);
List<MessageQueueEntry> entries;
String mqFilter = "filter=none";
if (inboxParam != null) {
mqFilter = "inbox=" + inboxParam;
entries = getEntriesForInbox(inboxParam, afterId, 20);
}
else if (instanceParam != null) {
mqFilter = "instance=" + instanceParam;
entries = getEntriesForInstance(instanceParam, afterId, 20);
}
else {
entries = getEntries(afterId, 20);
}
Object next;
if (entries.size() == 20)
next = entries.stream().mapToLong(MessageQueueEntry::id).min().getAsLong();
else
next = "";
Object prev = afterParam == null ? "" : afterParam;
return Map.of("messages", entries,
"next", next,
"prev", prev,
"mqFilter", mqFilter);
}
public Object newMessageModel(Request request, Response response) {
String idParam = request.queryParams("id");
if (null == idParam)
return Map.of("relatedId", "-1");
var message = getMessage(Long.parseLong(idParam));
if (message != null)
return message;
return Map.of("relatedId", "-1");
}
public Object replyMessageModel(Request request, Response response) {
String idParam = request.params("id");
var message = getMessage(Long.parseLong(idParam));
return Map.of("relatedId", message.id(),
"recipientInbox", message.senderInbox(),
"function", "REPLY");
}
public Object createMessage(Request request, Response response) throws Exception {
String recipient = request.queryParams("recipientInbox");
String sender = request.queryParams("senderInbox");
String relatedMessage = request.queryParams("relatedId");
String function = request.queryParams("function");
String payload = request.queryParams("payload");
persistence.sendNewMessage(recipient,
sender.isBlank() ? null : sender,
relatedMessage == null ? null : Long.parseLong(relatedMessage),
function,
payload,
null);
return "";
}
public Object viewMessageForEditStateModel(Request request, Response response) throws SQLException {
return persistence.getMessage(Long.parseLong(request.params("id")));
}
public Object editMessageState(Request request, Response response) throws SQLException {
MqMessageState state = MqMessageState.valueOf(request.queryParams("state"));
long id = Long.parseLong(request.params("id"));
persistence.updateMessageState(id, state);
return "";
}
public List<MessageQueueEntry> getLastEntries(int n) {
@ -43,7 +143,6 @@ public class MessageQueueViewService {
throw new RuntimeException(ex);
}
}
public MessageQueueEntry getMessage(long id) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
@ -115,6 +214,7 @@ public class MessageQueueViewService {
throw new RuntimeException(ex);
}
}
public List<MessageQueueEntry> getEntriesForInstance(String instance, long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""