From 4ee3f6ba3fc96627e4a7f36d65044f644301d874 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Tue, 11 Jul 2023 14:51:51 +0200 Subject: [PATCH] (minor) Refactor ControlService --- .../nu/marginalia/control/ControlService.java | 45 +++---------- .../control/{ => svc}/EventLogService.java | 2 +- .../control/{ => svc}/HeartbeatService.java | 2 +- .../svc/MessageQueueMonitorService.java | 63 +++++++++++++++++++ .../{ => svc}/MessageQueueViewService.java | 2 +- 5 files changed, 75 insertions(+), 39 deletions(-) rename code/services-satellite/control-service/src/main/java/nu/marginalia/control/{ => svc}/EventLogService.java (97%) rename code/services-satellite/control-service/src/main/java/nu/marginalia/control/{ => svc}/HeartbeatService.java (98%) create mode 100644 code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java rename code/services-satellite/control-service/src/main/java/nu/marginalia/control/{ => svc}/MessageQueueViewService.java (98%) diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java index fcd79ff4..39969083 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -5,6 +5,10 @@ import com.google.inject.Inject; import nu.marginalia.client.ServiceMonitors; import nu.marginalia.control.model.ControlProcess; import nu.marginalia.control.process.ControlProcesses; +import nu.marginalia.control.svc.EventLogService; +import nu.marginalia.control.svc.HeartbeatService; +import nu.marginalia.control.svc.MessageQueueMonitorService; +import nu.marginalia.control.svc.MessageQueueViewService; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.renderer.MustacheRenderer; @@ -34,7 +38,7 @@ public class ControlService extends Service { private final MustacheRenderer> messageQueueRenderer; private final MqPersistence messageQueuePersistence; private final StaticResources staticResources; - private final ServiceEventLog eventLog; + private final MessageQueueMonitorService messageQueueMonitorService; @Inject @@ -46,12 +50,12 @@ public class ControlService extends Service { MqPersistence messageQueuePersistence, ControlProcesses controlProcesses, StaticResources staticResources, - MessageQueueViewService messageQueueViewService + MessageQueueViewService messageQueueViewService, + MessageQueueMonitorService messageQueueMonitorService ) throws IOException { super(params); this.monitors = monitors; - this.eventLog = params.eventLog; indexRenderer = rendererFactory.renderer("control/index"); servicesRenderer = rendererFactory.renderer("control/services"); @@ -61,6 +65,7 @@ public class ControlService extends Service { this.messageQueuePersistence = messageQueuePersistence; this.staticResources = staticResources; + this.messageQueueMonitorService = messageQueueMonitorService; Spark.get("/public/heartbeats", (req, res) -> { res.type("application/json"); @@ -74,6 +79,7 @@ public class ControlService extends Service { Spark.get("/public/events", (req, rsp) -> eventsRenderer.render(Map.of("events", eventLogService.getLastEntries(20)))); Spark.get("/public/message-queue", (req, rsp) -> messageQueueRenderer.render(Map.of("messages", messageQueueViewService.getLastEntries(20)))); + // TODO: This should be a POST Spark.get("/public/repartition", (req, rsp) -> { controlProcesses.start(ControlProcess.REPARTITION_REINDEX); return "OK"; @@ -82,10 +88,6 @@ public class ControlService extends Service { Spark.get("/public/:resource", this::serveStatic); monitors.subscribe(this::logMonitorStateChange); - - Thread reaperThread = new Thread(this::reapMessageQueue, "message-queue-reaper"); - reaperThread.setDaemon(true); - reaperThread.start(); } @@ -98,35 +100,6 @@ public class ControlService extends Service { } - private void reapMessageQueue() { - - for (;;) { - try { - TimeUnit.MINUTES.sleep(10); - - int outcome = messageQueuePersistence.reapDeadMessages(); - if (outcome > 0) { - eventLog.logEvent("MESSAGE-QUEUE-REAPED", Integer.toString(outcome)); - logger.info("Reaped {} dead messages from message queue", outcome); - } - - outcome = messageQueuePersistence.cleanOldMessages(); - if (outcome > 0) { - eventLog.logEvent("MESSAGE-QUEUE-CLEANED", Integer.toString(outcome)); - logger.info("Cleaned {} stale messages from message queue", outcome); - } - - } - catch (InterruptedException ex) { - logger.info("Message queue reaper interrupted"); - return; - } - catch (Exception ex) { - logger.error("Message queue reaper failed", ex); - } - } - } - private void logMonitorStateChange() { logger.info("Service state change: {}", monitors.getRunningServices()); } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/EventLogService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/EventLogService.java similarity index 97% rename from code/services-satellite/control-service/src/main/java/nu/marginalia/control/EventLogService.java rename to code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/EventLogService.java index 41a325ec..f54e6996 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/EventLogService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/EventLogService.java @@ -1,4 +1,4 @@ -package nu.marginalia.control; +package nu.marginalia.control.svc; import com.google.inject.Inject; import com.google.inject.Singleton; diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/HeartbeatService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java similarity index 98% rename from code/services-satellite/control-service/src/main/java/nu/marginalia/control/HeartbeatService.java rename to code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java index 823ca045..def90b42 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/HeartbeatService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java @@ -1,4 +1,4 @@ -package nu.marginalia.control; +package nu.marginalia.control.svc; import com.google.inject.Inject; import com.google.inject.Singleton; diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java new file mode 100644 index 00000000..a5200275 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java @@ -0,0 +1,63 @@ +package nu.marginalia.control.svc; + +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.service.control.ServiceEventLog; +import nu.marginalia.service.server.BaseServiceParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.inject.Singleton; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +@Singleton +public class MessageQueueMonitorService { + private final Logger logger = LoggerFactory.getLogger(MessageQueueMonitorService.class); + private final MqPersistence persistence; + private final ServiceEventLog eventLog; + + @Inject + public MessageQueueMonitorService(BaseServiceParams params) { + this.persistence = params.messageQueuePersistence; + this.eventLog = params.eventLog; + + Thread reaperThread = new Thread(this::run, "message-queue-reaper"); + reaperThread.setDaemon(true); + reaperThread.start(); + } + + + private void run() { + + for (;;) { + try { + TimeUnit.MINUTES.sleep(10); + + reapMessages(); + } + catch (InterruptedException ex) { + logger.info("Message queue reaper interrupted"); + break; + } + catch (Exception ex) { + logger.error("Message queue reaper failed", ex); + } + } + } + + private void reapMessages() throws SQLException { + int outcome = persistence.reapDeadMessages(); + if (outcome > 0) { + eventLog.logEvent("MESSAGE-QUEUE-REAPED", Integer.toString(outcome)); + logger.info("Reaped {} dead messages from message queue", outcome); + } + + outcome = persistence.cleanOldMessages(); + if (outcome > 0) { + eventLog.logEvent("MESSAGE-QUEUE-CLEANED", Integer.toString(outcome)); + logger.info("Cleaned {} stale messages from message queue", outcome); + } + } + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/MessageQueueViewService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java similarity index 98% rename from code/services-satellite/control-service/src/main/java/nu/marginalia/control/MessageQueueViewService.java rename to code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java index 6cec667c..c8016c78 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/MessageQueueViewService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueViewService.java @@ -1,4 +1,4 @@ -package nu.marginalia.control; +package nu.marginalia.control.svc; import com.google.inject.Inject; import com.google.inject.Singleton;