diff --git a/code/common/db/src/main/java/nu/marginalia/db/storage/FileStorageService.java b/code/common/db/src/main/java/nu/marginalia/db/storage/FileStorageService.java index a954b6bb..7ed94a46 100644 --- a/code/common/db/src/main/java/nu/marginalia/db/storage/FileStorageService.java +++ b/code/common/db/src/main/java/nu/marginalia/db/storage/FileStorageService.java @@ -11,6 +11,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.attribute.PosixFilePermissions; import java.sql.SQLException; +import java.util.Optional; /** Manages file storage for processes and services */ @@ -23,6 +24,21 @@ public class FileStorageService { this.dataSource = dataSource; } + public Optional findFileStorageToDelete() { + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement(""" + SELECT ID FROM FILE_STORAGE WHERE DO_PURGE LIMIT 1 + """)) { + var rs = stmt.executeQuery(); + if (rs.next()) { + return Optional.of(getStorage(new FileStorageId(rs.getLong(1)))); + } + } catch (SQLException e) { + return Optional.empty(); + } + return Optional.empty(); + } + /** @return the storage base with the given id, or null if it does not exist */ public FileStorageBase getStorageBase(FileStorageBaseId type) throws SQLException { try (var conn = dataSource.getConnection(); @@ -278,4 +294,13 @@ public class FileStorageService { } } + public void removeFileStorage(FileStorageId id) throws SQLException { + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement(""" + DELETE FROM FILE_STORAGE WHERE ID = ? + """)) { + stmt.setLong(1, id.id()); + stmt.executeUpdate(); + } + } } diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java index 791a195c..85f7e2f5 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java @@ -1,5 +1,6 @@ package nu.marginalia.mq.inbox; +import lombok.SneakyThrows; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.persistence.MqPersistence; @@ -7,6 +8,7 @@ import java.sql.SQLException; import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; /** A single-shot inbox that can be used to wait for a single message * to arrive in an inbox, and then reply to that message @@ -26,6 +28,12 @@ public class MqSingleShotInbox { this.persistence = persistence; } + /** Wait for a message to arrive in the specified inbox, up to the specified timeout. + * + * @param timeout The timeout + * @param unit The time unit + * @return The message, or empty if no message arrived before the timeout + */ public Optional waitForMessage(long timeout, TimeUnit unit) throws InterruptedException, SQLException { final long deadline = System.currentTimeMillis() + unit.toMillis(timeout); @@ -44,6 +52,25 @@ public class MqSingleShotInbox { } } + + /** Steal a message from the inbox, and change the owner to this instance. This is useful + * for resuming an aborted process. + * + * @param predicate A predicate that must be true for the message to be stolen + * @return The stolen message, or empty if no message was stolen + */ + @SneakyThrows + public Optional stealMessage(Predicate predicate) { + for (var message : persistence.eavesdrop(inboxName, 5)) { + if (predicate.test(message)) { + persistence.changeOwner(message.msgId(), instanceUUID, -1); + return Optional.of(message); + } + } + + return Optional.empty(); + } + public void sendResponse(MqMessage originalMessage, MqInboxResponse response) { try { persistence.sendResponse(originalMessage.msgId(), response.state(), response.message()); @@ -51,4 +78,5 @@ public class MqSingleShotInbox { throw new RuntimeException(e); } } + } diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java index af0b5197..09749209 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java @@ -152,7 +152,7 @@ public class MqSynchronousInbox implements MqInboxIf { currentTask.get(); } catch (Exception ex) { - logger.error("Inbox task was aborted", ex); + logger.error("Inbox task was aborted"); } finally { currentTask = null; diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java index 5d6511f4..dce9d402 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java @@ -362,4 +362,20 @@ public class MqPersistence { } } + + public void changeOwner(long id, String instanceUUID, int tick) { + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement(""" + UPDATE MESSAGE_QUEUE SET OWNER_INSTANCE=?, OWNER_TICK=? + WHERE ID=? + """)) { + stmt.setString(1, instanceUUID); + stmt.setInt(2, tick); + stmt.setLong(3, id); + stmt.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index b07060cb..a42f5b67 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -22,7 +22,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.SQLException; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -174,14 +176,8 @@ public class ConverterMain { var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, UUID.randomUUID()); - var msgOpt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (msgOpt.isEmpty()) - throw new RuntimeException("No instruction received in inbox"); - var msg = msgOpt.get(); - - if (!nu.marginalia.converting.mqapi.ConvertRequest.class.getSimpleName().equals(msg.function())) { - throw new RuntimeException("Unexpected message in inbox: " + msg); - } + var msgOpt = getMessage(inbox, nu.marginalia.converting.mqapi.ConvertRequest.class.getSimpleName()); + var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); var request = gson.fromJson(msg.payload(), nu.marginalia.converting.mqapi.ConvertRequest.class); @@ -195,6 +191,21 @@ public class ConverterMain { return new ConvertRequest(plan, msg, inbox); } + private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { + var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); + if (opt.isPresent()) { + if (!opt.get().function().equals(expectedFunction)) { + throw new RuntimeException("Unexpected function: " + opt.get().function()); + } + return opt; + } + else { + var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); + stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); + return stolenMessage; + } + } + record ProcessingInstructions(String id, List instructions) {} diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java index 7250889d..08649808 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import java.nio.file.Path; import java.sql.SQLException; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -214,7 +215,7 @@ public class LoaderMain { var inbox = messageQueueFactory.createSingleShotInbox(LOADER_INBOX, UUID.randomUUID()); - var msgOpt = inbox.waitForMessage(30, TimeUnit.SECONDS); + var msgOpt = getMessage(inbox, nu.marginalia.converting.mqapi.LoadRequest.class.getSimpleName()); if (msgOpt.isEmpty()) throw new RuntimeException("No instruction received in inbox"); var msg = msgOpt.get(); @@ -232,4 +233,19 @@ public class LoaderMain { return new LoadRequest(plan, msg, inbox); } + private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { + var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); + if (opt.isPresent()) { + if (!opt.get().function().equals(expectedFunction)) { + throw new RuntimeException("Unexpected function: " + opt.get().function()); + } + return opt; + } + else { + var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); + stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); + return stolenMessage; + } + } + } diff --git a/code/services-satellite/control-service/build.gradle b/code/services-satellite/control-service/build.gradle index d90d926a..72c0552e 100644 --- a/code/services-satellite/control-service/build.gradle +++ b/code/services-satellite/control-service/build.gradle @@ -46,6 +46,7 @@ dependencies { implementation libs.trove implementation libs.spark implementation libs.fastutil + implementation libs.commons.io implementation libs.bundles.gson implementation libs.bundles.mariadb diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java index e2e53017..34e600f5 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -4,14 +4,12 @@ import com.google.gson.Gson; import com.google.inject.Inject; import nu.marginalia.client.ServiceMonitors; import nu.marginalia.control.model.ControlProcess; -import nu.marginalia.control.process.ControlProcesses; +import nu.marginalia.control.fsm.ControlFSMs; import nu.marginalia.control.svc.*; import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.model.gson.GsonFactory; -import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.renderer.MustacheRenderer; import nu.marginalia.renderer.RendererFactory; -import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,9 +18,7 @@ import spark.Response; import spark.Spark; import java.io.IOException; -import java.nio.file.Path; import java.util.Map; -import java.util.concurrent.TimeUnit; public class ControlService extends Service { @@ -43,7 +39,7 @@ public class ControlService extends Service { HeartbeatService heartbeatService, EventLogService eventLogService, RendererFactory rendererFactory, - ControlProcesses controlProcesses, + ControlFSMs controlFSMs, StaticResources staticResources, MessageQueueViewService messageQueueViewService, ControlFileStorageService controlFileStorageService @@ -73,7 +69,7 @@ public class ControlService extends Service { Spark.get("/public/processes", (req, rsp) -> Map.of("processes", heartbeatService.getProcessHeartbeats(), - "fsms", controlProcesses.getFsmStates(), + "fsms", controlFSMs.getFsmStates(), "messages", messageQueueViewService.getLastEntries(20)), (map) -> processesRenderer.render((Map) map)); @@ -82,14 +78,14 @@ public class ControlService extends Service { (map) -> storageRenderer.render((Map) map)); Spark.post("/public/fsms/:fsm/start", (req, rsp) -> { - controlProcesses.start(ControlProcess.valueOf(req.params("fsm").toUpperCase())); + controlFSMs.start(ControlProcess.valueOf(req.params("fsm").toUpperCase())); return """ """; }); Spark.post("/public/fsms/:fsm/stop", (req, rsp) -> { - controlProcesses.stop(ControlProcess.valueOf(req.params("fsm").toUpperCase())); + controlFSMs.stop(ControlProcess.valueOf(req.params("fsm").toUpperCase())); return """ @@ -98,7 +94,7 @@ public class ControlService extends Service { // TODO: This should be a POST Spark.get("/public/repartition", (req, rsp) -> { - controlProcesses.start(ControlProcess.REPARTITION_REINDEX); + controlFSMs.start(ControlProcess.REPARTITION_REINDEX); return """ @@ -106,8 +102,8 @@ public class ControlService extends Service { }); // TODO: This should be a POST - Spark.get("/public/reconvert", (req, rsp) -> { - controlProcesses.start(ControlProcess.RECONVERT_LOAD, FileStorageId.of(11)); + Spark.get("/public/reconvert/:fid", (req, rsp) -> { + controlFSMs.start(ControlProcess.RECONVERT_LOAD, FileStorageId.of(Integer.parseInt(req.params("fid")))); return """ diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/ControlFSMs.java similarity index 67% rename from code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java rename to code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/ControlFSMs.java index eb5eaef7..0c756114 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/ControlFSMs.java @@ -1,4 +1,4 @@ -package nu.marginalia.control.process; +package nu.marginalia.control.fsm; import com.google.gson.Gson; import com.google.inject.Inject; @@ -6,6 +6,11 @@ import com.google.inject.Singleton; import lombok.SneakyThrows; import nu.marginalia.control.model.ControlProcess; import nu.marginalia.control.model.ControlProcessState; +import nu.marginalia.control.fsm.monitor.*; +import nu.marginalia.control.fsm.monitor.ConverterMonitorFSM; +import nu.marginalia.control.fsm.monitor.LoaderMonitorFSM; +import nu.marginalia.control.fsm.task.ReconvertAndLoadFSM; +import nu.marginalia.control.fsm.task.RepartitionReindexFSM; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mqsm.StateMachine; @@ -20,29 +25,35 @@ import java.util.Map; import java.util.UUID; @Singleton -public class ControlProcesses { +public class ControlFSMs { private final ServiceEventLog eventLog; private final Gson gson; private final MessageQueueFactory messageQueueFactory; public Map stateMachines = new HashMap<>(); @Inject - public ControlProcesses(MessageQueueFactory messageQueueFactory, - GsonFactory gsonFactory, - BaseServiceParams baseServiceParams, - RepartitionReindexProcess repartitionReindexProcess, - ReconvertAndLoadProcess reconvertAndLoadProcess, - ConverterMonitorProcess converterMonitorProcess, - LoaderMonitorProcess loaderMonitorProcess + public ControlFSMs(MessageQueueFactory messageQueueFactory, + GsonFactory gsonFactory, + BaseServiceParams baseServiceParams, + RepartitionReindexFSM repartitionReindexFSM, + ReconvertAndLoadFSM reconvertAndLoadFSM, + ConverterMonitorFSM converterMonitorFSM, + LoaderMonitorFSM loaderMonitor, + MessageQueueMonitorFSM messageQueueMonitor, + ProcessLivenessMonitorFSM processMonitorFSM, + FileStorageMonitorFSM fileStorageMonitorFSM ) { this.messageQueueFactory = messageQueueFactory; this.eventLog = baseServiceParams.eventLog; this.gson = gsonFactory.get(); - register(ControlProcess.REPARTITION_REINDEX, repartitionReindexProcess); - register(ControlProcess.RECONVERT_LOAD, reconvertAndLoadProcess); - register(ControlProcess.CONVERTER_MONITOR, converterMonitorProcess); - register(ControlProcess.LOADER_MONITOR, loaderMonitorProcess); + register(ControlProcess.REPARTITION_REINDEX, repartitionReindexFSM); + register(ControlProcess.RECONVERT_LOAD, reconvertAndLoadFSM); + register(ControlProcess.CONVERTER_MONITOR, converterMonitorFSM); + register(ControlProcess.LOADER_MONITOR, loaderMonitor); + register(ControlProcess.MESSAGE_QUEUE_MONITOR, messageQueueMonitor); + register(ControlProcess.PROCESS_LIVENESS_MONITOR, processMonitorFSM); + register(ControlProcess.FILE_STORAGE_MONITOR, fileStorageMonitorFSM); } private void register(ControlProcess process, AbstractStateGraph graph) { diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ConverterMonitorProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/AbstractProcessSpawnerFSM.java similarity index 63% rename from code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ConverterMonitorProcess.java rename to code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/AbstractProcessSpawnerFSM.java index a1c0258f..75944553 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ConverterMonitorProcess.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/AbstractProcessSpawnerFSM.java @@ -1,9 +1,8 @@ -package nu.marginalia.control.process; +package nu.marginalia.control.fsm.monitor; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.control.svc.ProcessService; -import nu.marginalia.converting.mqapi.ConverterInboxNames; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqsm.StateFactory; import nu.marginalia.mqsm.graph.AbstractStateGraph; @@ -14,35 +13,39 @@ import java.sql.SQLException; import java.util.concurrent.TimeUnit; @Singleton -public class ConverterMonitorProcess extends AbstractStateGraph { +public class AbstractProcessSpawnerFSM extends AbstractStateGraph { private final MqPersistence persistence; private final ProcessService processService; public static final String INITIAL = "INITIAL"; - public static final String CHECK = "CHECK"; + public static final String MONITOR = "MONITOR"; public static final String RUN = "RUN"; public static final String END = "END"; public static final int MAX_ATTEMPTS = 3; - public static final String inboxName = ConverterInboxNames.CONVERTER_INBOX; - public static final ProcessService.ProcessId processId = ProcessService.ProcessId.CONVERTER; + private final String inboxName; + private final ProcessService.ProcessId processId; @Inject - public ConverterMonitorProcess(StateFactory stateFactory, - MqPersistence persistence, - ProcessService processService) { + public AbstractProcessSpawnerFSM(StateFactory stateFactory, + MqPersistence persistence, + ProcessService processService, + String inboxName, + ProcessService.ProcessId processId) { super(stateFactory); this.persistence = persistence; this.processService = processService; + this.inboxName = inboxName; + this.processId = processId; } - @GraphState(name = INITIAL, next = CHECK) + @GraphState(name = INITIAL, next = MONITOR) public void init() { } - @GraphState(name = CHECK, resume = ResumeBehavior.RETRY) - public void check() throws SQLException, InterruptedException { + @GraphState(name = MONITOR, resume = ResumeBehavior.RETRY) + public void monitor() throws SQLException, InterruptedException { for (;;) { var messages = persistence.eavesdrop(inboxName, 1); @@ -67,7 +70,7 @@ public class ConverterMonitorProcess extends AbstractStateGraph { else throw e; } - transition(CHECK); + transition(MONITOR); } } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/ConverterMonitorFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/ConverterMonitorFSM.java new file mode 100644 index 00000000..d5dd3908 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/ConverterMonitorFSM.java @@ -0,0 +1,22 @@ +package nu.marginalia.control.fsm.monitor; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.converting.mqapi.ConverterInboxNames; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqsm.StateFactory; + +@Singleton +public class ConverterMonitorFSM extends AbstractProcessSpawnerFSM { + + + @Inject + public ConverterMonitorFSM(StateFactory stateFactory, + MqPersistence persistence, + ProcessService processService) { + super(stateFactory, persistence, processService, ConverterInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER); + } + + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/FileStorageMonitorFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/FileStorageMonitorFSM.java new file mode 100644 index 00000000..5d760dfc --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/FileStorageMonitorFSM.java @@ -0,0 +1,72 @@ +package nu.marginalia.control.fsm.monitor; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.db.storage.FileStorageService; +import nu.marginalia.db.storage.model.FileStorage; +import nu.marginalia.db.storage.model.FileStorageId; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.mqsm.graph.ResumeBehavior; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +@Singleton +public class FileStorageMonitorFSM extends AbstractStateGraph { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + // STATES + + private static final String INITIAL = "INITIAL"; + private static final String MONITOR = "MONITOR"; + private static final String PURGE = "PURGE"; + private static final String END = "END"; + private final FileStorageService fileStorageService; + + + @Inject + public FileStorageMonitorFSM(StateFactory stateFactory, + FileStorageService fileStorageService) { + super(stateFactory); + this.fileStorageService = fileStorageService; + } + + @GraphState(name = INITIAL, next = MONITOR) + public void init() { + } + + @GraphState(name = MONITOR, resume = ResumeBehavior.RETRY) + public void monitor() throws Exception { + + for (;;) { + Optional toDeleteOpt = fileStorageService.findFileStorageToDelete(); + + if (toDeleteOpt.isEmpty()) { + TimeUnit.SECONDS.sleep(10); + } + else { + transition(PURGE, toDeleteOpt.get().id()); + } + } + } + + @GraphState(name = PURGE, next = MONITOR, resume = ResumeBehavior.RETRY) + public void purge(FileStorageId id) throws Exception { + var storage = fileStorageService.getStorage(id); + logger.info("Deleting {} ", storage.path()); + Path path = storage.asPath(); + + if (Files.exists(path)) { + FileUtils.deleteDirectory(path.toFile()); + } + + fileStorageService.removeFileStorage(storage.id()); + } +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/LoaderMonitorFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/LoaderMonitorFSM.java new file mode 100644 index 00000000..ff81433e --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/LoaderMonitorFSM.java @@ -0,0 +1,24 @@ +package nu.marginalia.control.fsm.monitor; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.converting.mqapi.ConverterInboxNames; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqsm.StateFactory; + +@Singleton +public class LoaderMonitorFSM extends AbstractProcessSpawnerFSM { + + + @Inject + public LoaderMonitorFSM(StateFactory stateFactory, + MqPersistence persistence, + ProcessService processService) { + + super(stateFactory, persistence, processService, + ConverterInboxNames.LOADER_INBOX, + ProcessService.ProcessId.LOADER); + } + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/MessageQueueMonitorFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/MessageQueueMonitorFSM.java new file mode 100644 index 00000000..d6c5ff82 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/MessageQueueMonitorFSM.java @@ -0,0 +1,45 @@ +package nu.marginalia.control.fsm.monitor; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.mqsm.graph.ResumeBehavior; + +import java.util.concurrent.TimeUnit; + +@Singleton +public class MessageQueueMonitorFSM extends AbstractStateGraph { + + // STATES + + private static final String INITIAL = "INITIAL"; + private static final String MONITOR = "MONITOR"; + private static final String END = "END"; + private final MqPersistence persistence; + + + @Inject + public MessageQueueMonitorFSM(StateFactory stateFactory, + MqPersistence persistence) { + super(stateFactory); + this.persistence = persistence; + } + + @GraphState(name = INITIAL, next = MONITOR) + public void init() { + } + + @GraphState(name = MONITOR, resume = ResumeBehavior.RETRY) + public void monitor() throws Exception { + + for (;;) { + persistence.reapDeadMessages(); + persistence.cleanOldMessages(); + TimeUnit.SECONDS.sleep(60); + } + } + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/ProcessLivenessMonitorFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/ProcessLivenessMonitorFSM.java new file mode 100644 index 00000000..f6afa68f --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/ProcessLivenessMonitorFSM.java @@ -0,0 +1,55 @@ +package nu.marginalia.control.fsm.monitor; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.control.model.ProcessHeartbeat; +import nu.marginalia.control.svc.HeartbeatService; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.mqsm.graph.ResumeBehavior; + +import java.util.concurrent.TimeUnit; + +@Singleton +public class ProcessLivenessMonitorFSM extends AbstractStateGraph { + + // STATES + + private static final String INITIAL = "INITIAL"; + private static final String MONITOR = "MONITOR"; + private static final String END = "END"; + private final ProcessService processService; + private final HeartbeatService heartbeatService; + + + @Inject + public ProcessLivenessMonitorFSM(StateFactory stateFactory, + ProcessService processService, + HeartbeatService heartbeatService) { + super(stateFactory); + this.processService = processService; + this.heartbeatService = heartbeatService; + } + + @GraphState(name = INITIAL, next = MONITOR) + public void init() { + } + + @GraphState(name = MONITOR, resume = ResumeBehavior.RETRY) + public void monitor() throws Exception { + + for (;;) { + var processHeartbeats = heartbeatService.getProcessHeartbeats(); + + processHeartbeats.stream() + .filter(ProcessHeartbeat::isRunning) + .filter(p -> !processService.isRunning(p.getProcessId())) + .forEach(heartbeatService::flagProcessAsStopped); + + TimeUnit.SECONDS.sleep(60); + } + } + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java similarity index 90% rename from code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java rename to code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java index 1c1439f9..19881851 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java @@ -1,4 +1,4 @@ -package nu.marginalia.control.process; +package nu.marginalia.control.fsm.task; import com.google.gson.Gson; import com.google.inject.Inject; @@ -15,7 +15,6 @@ import nu.marginalia.db.storage.model.FileStorageBaseType; import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.index.client.IndexClient; -import nu.marginalia.index.client.IndexMqEndpoints; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; @@ -24,17 +23,12 @@ import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.search.client.SearchClient; -import nu.marginalia.search.client.SearchMqEndpoints; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @Singleton -public class ReconvertAndLoadProcess extends AbstractStateGraph { +public class ReconvertAndLoadFSM extends AbstractStateGraph { // STATES @@ -66,13 +60,13 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph { }; @Inject - public ReconvertAndLoadProcess(StateFactory stateFactory, - ProcessService processService, - IndexClient indexClient, - ProcessOutboxFactory processOutboxFactory, - SearchClient searchClient, - FileStorageService storageService, - Gson gson + public ReconvertAndLoadFSM(StateFactory stateFactory, + ProcessService processService, + IndexClient indexClient, + ProcessOutboxFactory processOutboxFactory, + SearchClient searchClient, + FileStorageService storageService, + Gson gson ) { super(stateFactory); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/RepartitionReindexProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/RepartitionReindexFSM.java similarity index 90% rename from code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/RepartitionReindexProcess.java rename to code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/RepartitionReindexFSM.java index c668d230..ed3aad0a 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/RepartitionReindexProcess.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/RepartitionReindexFSM.java @@ -1,4 +1,4 @@ -package nu.marginalia.control.process; +package nu.marginalia.control.fsm.task; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -12,7 +12,7 @@ import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.mqsm.graph.ResumeBehavior; @Singleton -public class RepartitionReindexProcess extends AbstractStateGraph { +public class RepartitionReindexFSM extends AbstractStateGraph { private final MqOutbox indexOutbox; @@ -27,8 +27,8 @@ public class RepartitionReindexProcess extends AbstractStateGraph { @Inject - public RepartitionReindexProcess(StateFactory stateFactory, - IndexClient indexClient) { + public RepartitionReindexFSM(StateFactory stateFactory, + IndexClient indexClient) { super(stateFactory); indexOutbox = indexClient.outbox(); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java index 6cdc219a..a09ee9e9 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java @@ -4,9 +4,13 @@ public enum ControlProcess { REPARTITION_REINDEX, RECONVERT_LOAD, CONVERTER_MONITOR, - LOADER_MONITOR + LOADER_MONITOR, + MESSAGE_QUEUE_MONITOR, + PROCESS_LIVENESS_MONITOR, + FILE_STORAGE_MONITOR ; + public String id() { return "fsm:" + name().toLowerCase(); } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java index 4fbdcde9..e92a2a1a 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java @@ -1,5 +1,7 @@ package nu.marginalia.control.model; +import nu.marginalia.control.svc.ProcessService; + public record ProcessHeartbeat( String processId, String processBase, @@ -23,6 +25,9 @@ public record ProcessHeartbeat( public boolean isStopped() { return "STOPPED".equals(status); } + public boolean isRunning() { + return "RUNNING".equals(status); + } public String progressStyle() { if ("RUNNING".equals(status) && progress != null) { return """ @@ -31,4 +36,13 @@ public record ProcessHeartbeat( } return ""; } + + public ProcessService.ProcessId getProcessId() { + return switch (processBase) { + case "converter" -> ProcessService.ProcessId.CONVERTER; + case "crawler" -> ProcessService.ProcessId.CRAWLER; + case "loader" -> ProcessService.ProcessId.LOADER; + default -> throw new RuntimeException("Unknown process base: " + processBase); + }; + } } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/LoaderMonitorProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/LoaderMonitorProcess.java deleted file mode 100644 index 813c7da7..00000000 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/LoaderMonitorProcess.java +++ /dev/null @@ -1,73 +0,0 @@ -package nu.marginalia.control.process; - -import com.google.inject.Inject; -import com.google.inject.Singleton; -import nu.marginalia.control.svc.ProcessService; -import nu.marginalia.converting.mqapi.ConverterInboxNames; -import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; - -import java.sql.SQLException; -import java.util.concurrent.TimeUnit; - -@Singleton -public class LoaderMonitorProcess extends AbstractStateGraph { - - private final MqPersistence persistence; - private final ProcessService processService; - public static final String INITIAL = "INITIAL"; - public static final String CHECK = "CHECK"; - public static final String RUN = "RUN"; - public static final String END = "END"; - - public static final int MAX_ATTEMPTS = 1; - public static final String inboxName = ConverterInboxNames.LOADER_INBOX; - public static final ProcessService.ProcessId processId = ProcessService.ProcessId.LOADER; - - @Inject - public LoaderMonitorProcess(StateFactory stateFactory, - MqPersistence persistence, - ProcessService processService) { - super(stateFactory); - this.persistence = persistence; - this.processService = processService; - } - - @GraphState(name = INITIAL, next = CHECK) - public void init() { - - } - - @GraphState(name = CHECK, resume = ResumeBehavior.RETRY) - public void check() throws SQLException, InterruptedException { - - for (;;) { - var messages = persistence.eavesdrop(inboxName, 1); - - if (messages.isEmpty() && !processService.isRunning(processId)) { - TimeUnit.SECONDS.sleep(5); - } else { - transition(RUN, 0); - } - } - } - - @GraphState(name = RUN) - public void run(Integer attempts) throws Exception { - try { - processService.trigger(processId); - } - catch (Exception e) { - if (attempts < MAX_ATTEMPTS) { - transition(RUN, attempts + 1); - } - else throw e; - } - - transition(CHECK); - } - -} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java index 5f8b28f3..8a8a693e 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java @@ -5,6 +5,7 @@ import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; import nu.marginalia.control.model.ProcessHeartbeat; import nu.marginalia.control.model.ServiceHeartbeat; +import nu.marginalia.service.control.ServiceEventLog; import java.sql.SQLException; import java.util.ArrayList; @@ -13,10 +14,13 @@ import java.util.List; @Singleton public class HeartbeatService { private final HikariDataSource dataSource; + private final ServiceEventLog eventLogService; @Inject - public HeartbeatService(HikariDataSource dataSource) { + public HeartbeatService(HikariDataSource dataSource, + ServiceEventLog eventLogService) { this.dataSource = dataSource; + this.eventLogService = eventLogService; } public List getServiceHeartbeats() { @@ -77,4 +81,23 @@ public class HeartbeatService { return heartbeats; } + public void flagProcessAsStopped(ProcessHeartbeat processHeartbeat) { + eventLogService.logEvent("PROCESS-MISSING", "Marking stale process heartbeat " + + processHeartbeat.processId() + " / " + processHeartbeat.uuidFull() + " as stopped"); + + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement(""" + UPDATE PROCESS_HEARTBEAT + SET STATUS = 'STOPPED' + WHERE INSTANCE = ? + """)) { + + stmt.setString(1, processHeartbeat.uuidFull()); + stmt.executeUpdate(); + } + catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java deleted file mode 100644 index 4ba2585c..00000000 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java +++ /dev/null @@ -1,62 +0,0 @@ -package nu.marginalia.control.svc; - -import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.service.control.ServiceEventLog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; -import javax.inject.Singleton; -import java.sql.SQLException; -import java.util.concurrent.TimeUnit; - -@Singleton -public class MessageQueueMonitorService { - private final Logger logger = LoggerFactory.getLogger(MessageQueueMonitorService.class); - private final MqPersistence persistence; - private final ServiceEventLog eventLog; - - @Inject - public MessageQueueMonitorService(ServiceEventLog eventLog, MqPersistence persistence) { - this.eventLog = eventLog; - this.persistence = persistence; - - Thread reaperThread = new Thread(this::run, "message-queue-reaper"); - reaperThread.setDaemon(true); - reaperThread.start(); - } - - - private void run() { - - for (;;) { - try { - TimeUnit.MINUTES.sleep(10); - - reapMessages(); - } - catch (InterruptedException ex) { - logger.info("Message queue reaper interrupted"); - break; - } - catch (Exception ex) { - logger.error("Message queue reaper failed", ex); - } - } - } - - private void reapMessages() throws SQLException { - int outcome = persistence.reapDeadMessages(); - if (outcome > 0) { - eventLog.logEvent("MESSAGE-QUEUE-REAPED", Integer.toString(outcome)); - logger.info("Reaped {} dead messages from message queue", outcome); - } - - outcome = persistence.cleanOldMessages(); - if (outcome > 0) { - eventLog.logEvent("MESSAGE-QUEUE-CLEANED", Integer.toString(outcome)); - logger.info("Cleaned {} stale messages from message queue", outcome); - } - } - -}