(control) Name change process->fsm, new fsm:s

* FSM for spawning processes when messages appear for them
* FSM for removing data flagged for purging
This commit is contained in:
Viktor Lofgren 2023-07-17 12:27:27 +02:00
parent 6e41e78f36
commit e618aa34e9
22 changed files with 429 additions and 204 deletions

View File

@ -11,6 +11,7 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.PosixFilePermissions;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Optional;
/** Manages file storage for processes and services /** Manages file storage for processes and services
*/ */
@ -23,6 +24,21 @@ public class FileStorageService {
this.dataSource = dataSource; this.dataSource = dataSource;
} }
public Optional<FileStorage> findFileStorageToDelete() {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT ID FROM FILE_STORAGE WHERE DO_PURGE LIMIT 1
""")) {
var rs = stmt.executeQuery();
if (rs.next()) {
return Optional.of(getStorage(new FileStorageId(rs.getLong(1))));
}
} catch (SQLException e) {
return Optional.empty();
}
return Optional.empty();
}
/** @return the storage base with the given id, or null if it does not exist */ /** @return the storage base with the given id, or null if it does not exist */
public FileStorageBase getStorageBase(FileStorageBaseId type) throws SQLException { public FileStorageBase getStorageBase(FileStorageBaseId type) throws SQLException {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
@ -278,4 +294,13 @@ public class FileStorageService {
} }
} }
public void removeFileStorage(FileStorageId id) throws SQLException {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
DELETE FROM FILE_STORAGE WHERE ID = ?
""")) {
stmt.setLong(1, id.id());
stmt.executeUpdate();
}
}
} }

View File

@ -1,5 +1,6 @@
package nu.marginalia.mq.inbox; package nu.marginalia.mq.inbox;
import lombok.SneakyThrows;
import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
@ -7,6 +8,7 @@ import java.sql.SQLException;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
/** A single-shot inbox that can be used to wait for a single message /** A single-shot inbox that can be used to wait for a single message
* to arrive in an inbox, and then reply to that message * to arrive in an inbox, and then reply to that message
@ -26,6 +28,12 @@ public class MqSingleShotInbox {
this.persistence = persistence; this.persistence = persistence;
} }
/** Wait for a message to arrive in the specified inbox, up to the specified timeout.
*
* @param timeout The timeout
* @param unit The time unit
* @return The message, or empty if no message arrived before the timeout
*/
public Optional<MqMessage> waitForMessage(long timeout, TimeUnit unit) throws InterruptedException, SQLException { public Optional<MqMessage> waitForMessage(long timeout, TimeUnit unit) throws InterruptedException, SQLException {
final long deadline = System.currentTimeMillis() + unit.toMillis(timeout); final long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
@ -44,6 +52,25 @@ public class MqSingleShotInbox {
} }
} }
/** Steal a message from the inbox, and change the owner to this instance. This is useful
* for resuming an aborted process.
*
* @param predicate A predicate that must be true for the message to be stolen
* @return The stolen message, or empty if no message was stolen
*/
@SneakyThrows
public Optional<MqMessage> stealMessage(Predicate<MqMessage> predicate) {
for (var message : persistence.eavesdrop(inboxName, 5)) {
if (predicate.test(message)) {
persistence.changeOwner(message.msgId(), instanceUUID, -1);
return Optional.of(message);
}
}
return Optional.empty();
}
public void sendResponse(MqMessage originalMessage, MqInboxResponse response) { public void sendResponse(MqMessage originalMessage, MqInboxResponse response) {
try { try {
persistence.sendResponse(originalMessage.msgId(), response.state(), response.message()); persistence.sendResponse(originalMessage.msgId(), response.state(), response.message());
@ -51,4 +78,5 @@ public class MqSingleShotInbox {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
} }

View File

@ -152,7 +152,7 @@ public class MqSynchronousInbox implements MqInboxIf {
currentTask.get(); currentTask.get();
} }
catch (Exception ex) { catch (Exception ex) {
logger.error("Inbox task was aborted", ex); logger.error("Inbox task was aborted");
} }
finally { finally {
currentTask = null; currentTask = null;

View File

@ -362,4 +362,20 @@ public class MqPersistence {
} }
} }
public void changeOwner(long id, String instanceUUID, int tick) {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
UPDATE MESSAGE_QUEUE SET OWNER_INSTANCE=?, OWNER_TICK=?
WHERE ID=?
""")) {
stmt.setString(1, instanceUUID);
stmt.setInt(2, tick);
stmt.setLong(3, id);
stmt.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
} }

View File

@ -22,7 +22,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -174,14 +176,8 @@ public class ConverterMain {
var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, UUID.randomUUID()); var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, UUID.randomUUID());
var msgOpt = inbox.waitForMessage(30, TimeUnit.SECONDS); var msgOpt = getMessage(inbox, nu.marginalia.converting.mqapi.ConvertRequest.class.getSimpleName());
if (msgOpt.isEmpty()) var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
throw new RuntimeException("No instruction received in inbox");
var msg = msgOpt.get();
if (!nu.marginalia.converting.mqapi.ConvertRequest.class.getSimpleName().equals(msg.function())) {
throw new RuntimeException("Unexpected message in inbox: " + msg);
}
var request = gson.fromJson(msg.payload(), nu.marginalia.converting.mqapi.ConvertRequest.class); var request = gson.fromJson(msg.payload(), nu.marginalia.converting.mqapi.ConvertRequest.class);
@ -195,6 +191,21 @@ public class ConverterMain {
return new ConvertRequest(plan, msg, inbox); return new ConvertRequest(plan, msg, inbox);
} }
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
record ProcessingInstructions(String id, List<Instruction> instructions) {} record ProcessingInstructions(String id, List<Instruction> instructions) {}

View File

@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Path; import java.nio.file.Path;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -214,7 +215,7 @@ public class LoaderMain {
var inbox = messageQueueFactory.createSingleShotInbox(LOADER_INBOX, UUID.randomUUID()); var inbox = messageQueueFactory.createSingleShotInbox(LOADER_INBOX, UUID.randomUUID());
var msgOpt = inbox.waitForMessage(30, TimeUnit.SECONDS); var msgOpt = getMessage(inbox, nu.marginalia.converting.mqapi.LoadRequest.class.getSimpleName());
if (msgOpt.isEmpty()) if (msgOpt.isEmpty())
throw new RuntimeException("No instruction received in inbox"); throw new RuntimeException("No instruction received in inbox");
var msg = msgOpt.get(); var msg = msgOpt.get();
@ -232,4 +233,19 @@ public class LoaderMain {
return new LoadRequest(plan, msg, inbox); return new LoadRequest(plan, msg, inbox);
} }
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
} }

View File

@ -46,6 +46,7 @@ dependencies {
implementation libs.trove implementation libs.trove
implementation libs.spark implementation libs.spark
implementation libs.fastutil implementation libs.fastutil
implementation libs.commons.io
implementation libs.bundles.gson implementation libs.bundles.gson
implementation libs.bundles.mariadb implementation libs.bundles.mariadb

View File

@ -4,14 +4,12 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors; import nu.marginalia.client.ServiceMonitors;
import nu.marginalia.control.model.ControlProcess; import nu.marginalia.control.model.ControlProcess;
import nu.marginalia.control.process.ControlProcesses; import nu.marginalia.control.fsm.ControlFSMs;
import nu.marginalia.control.svc.*; import nu.marginalia.control.svc.*;
import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.renderer.MustacheRenderer; import nu.marginalia.renderer.MustacheRenderer;
import nu.marginalia.renderer.RendererFactory; import nu.marginalia.renderer.RendererFactory;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.*; import nu.marginalia.service.server.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -20,9 +18,7 @@ import spark.Response;
import spark.Spark; import spark.Spark;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ControlService extends Service { public class ControlService extends Service {
@ -43,7 +39,7 @@ public class ControlService extends Service {
HeartbeatService heartbeatService, HeartbeatService heartbeatService,
EventLogService eventLogService, EventLogService eventLogService,
RendererFactory rendererFactory, RendererFactory rendererFactory,
ControlProcesses controlProcesses, ControlFSMs controlFSMs,
StaticResources staticResources, StaticResources staticResources,
MessageQueueViewService messageQueueViewService, MessageQueueViewService messageQueueViewService,
ControlFileStorageService controlFileStorageService ControlFileStorageService controlFileStorageService
@ -73,7 +69,7 @@ public class ControlService extends Service {
Spark.get("/public/processes", Spark.get("/public/processes",
(req, rsp) -> Map.of("processes", heartbeatService.getProcessHeartbeats(), (req, rsp) -> Map.of("processes", heartbeatService.getProcessHeartbeats(),
"fsms", controlProcesses.getFsmStates(), "fsms", controlFSMs.getFsmStates(),
"messages", messageQueueViewService.getLastEntries(20)), "messages", messageQueueViewService.getLastEntries(20)),
(map) -> processesRenderer.render((Map<?, ?>) map)); (map) -> processesRenderer.render((Map<?, ?>) map));
@ -82,14 +78,14 @@ public class ControlService extends Service {
(map) -> storageRenderer.render((Map<?, ?>) map)); (map) -> storageRenderer.render((Map<?, ?>) map));
Spark.post("/public/fsms/:fsm/start", (req, rsp) -> { Spark.post("/public/fsms/:fsm/start", (req, rsp) -> {
controlProcesses.start(ControlProcess.valueOf(req.params("fsm").toUpperCase())); controlFSMs.start(ControlProcess.valueOf(req.params("fsm").toUpperCase()));
return """ return """
<?doctype html> <?doctype html>
<html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html> <html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html>
"""; """;
}); });
Spark.post("/public/fsms/:fsm/stop", (req, rsp) -> { Spark.post("/public/fsms/:fsm/stop", (req, rsp) -> {
controlProcesses.stop(ControlProcess.valueOf(req.params("fsm").toUpperCase())); controlFSMs.stop(ControlProcess.valueOf(req.params("fsm").toUpperCase()));
return """ return """
<?doctype html> <?doctype html>
<html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html> <html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html>
@ -98,7 +94,7 @@ public class ControlService extends Service {
// TODO: This should be a POST // TODO: This should be a POST
Spark.get("/public/repartition", (req, rsp) -> { Spark.get("/public/repartition", (req, rsp) -> {
controlProcesses.start(ControlProcess.REPARTITION_REINDEX); controlFSMs.start(ControlProcess.REPARTITION_REINDEX);
return """ return """
<?doctype html> <?doctype html>
<html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html> <html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html>
@ -106,8 +102,8 @@ public class ControlService extends Service {
}); });
// TODO: This should be a POST // TODO: This should be a POST
Spark.get("/public/reconvert", (req, rsp) -> { Spark.get("/public/reconvert/:fid", (req, rsp) -> {
controlProcesses.start(ControlProcess.RECONVERT_LOAD, FileStorageId.of(11)); controlFSMs.start(ControlProcess.RECONVERT_LOAD, FileStorageId.of(Integer.parseInt(req.params("fid"))));
return """ return """
<?doctype html> <?doctype html>
<html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html> <html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html>

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.process; package nu.marginalia.control.fsm;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -6,6 +6,11 @@ import com.google.inject.Singleton;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.control.model.ControlProcess; import nu.marginalia.control.model.ControlProcess;
import nu.marginalia.control.model.ControlProcessState; import nu.marginalia.control.model.ControlProcessState;
import nu.marginalia.control.fsm.monitor.*;
import nu.marginalia.control.fsm.monitor.ConverterMonitorFSM;
import nu.marginalia.control.fsm.monitor.LoaderMonitorFSM;
import nu.marginalia.control.fsm.task.ReconvertAndLoadFSM;
import nu.marginalia.control.fsm.task.RepartitionReindexFSM;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqsm.StateMachine; import nu.marginalia.mqsm.StateMachine;
@ -20,29 +25,35 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
@Singleton @Singleton
public class ControlProcesses { public class ControlFSMs {
private final ServiceEventLog eventLog; private final ServiceEventLog eventLog;
private final Gson gson; private final Gson gson;
private final MessageQueueFactory messageQueueFactory; private final MessageQueueFactory messageQueueFactory;
public Map<ControlProcess, StateMachine> stateMachines = new HashMap<>(); public Map<ControlProcess, StateMachine> stateMachines = new HashMap<>();
@Inject @Inject
public ControlProcesses(MessageQueueFactory messageQueueFactory, public ControlFSMs(MessageQueueFactory messageQueueFactory,
GsonFactory gsonFactory, GsonFactory gsonFactory,
BaseServiceParams baseServiceParams, BaseServiceParams baseServiceParams,
RepartitionReindexProcess repartitionReindexProcess, RepartitionReindexFSM repartitionReindexFSM,
ReconvertAndLoadProcess reconvertAndLoadProcess, ReconvertAndLoadFSM reconvertAndLoadFSM,
ConverterMonitorProcess converterMonitorProcess, ConverterMonitorFSM converterMonitorFSM,
LoaderMonitorProcess loaderMonitorProcess LoaderMonitorFSM loaderMonitor,
MessageQueueMonitorFSM messageQueueMonitor,
ProcessLivenessMonitorFSM processMonitorFSM,
FileStorageMonitorFSM fileStorageMonitorFSM
) { ) {
this.messageQueueFactory = messageQueueFactory; this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog; this.eventLog = baseServiceParams.eventLog;
this.gson = gsonFactory.get(); this.gson = gsonFactory.get();
register(ControlProcess.REPARTITION_REINDEX, repartitionReindexProcess); register(ControlProcess.REPARTITION_REINDEX, repartitionReindexFSM);
register(ControlProcess.RECONVERT_LOAD, reconvertAndLoadProcess); register(ControlProcess.RECONVERT_LOAD, reconvertAndLoadFSM);
register(ControlProcess.CONVERTER_MONITOR, converterMonitorProcess); register(ControlProcess.CONVERTER_MONITOR, converterMonitorFSM);
register(ControlProcess.LOADER_MONITOR, loaderMonitorProcess); register(ControlProcess.LOADER_MONITOR, loaderMonitor);
register(ControlProcess.MESSAGE_QUEUE_MONITOR, messageQueueMonitor);
register(ControlProcess.PROCESS_LIVENESS_MONITOR, processMonitorFSM);
register(ControlProcess.FILE_STORAGE_MONITOR, fileStorageMonitorFSM);
} }
private void register(ControlProcess process, AbstractStateGraph graph) { private void register(ControlProcess process, AbstractStateGraph graph) {

View File

@ -1,9 +1,8 @@
package nu.marginalia.control.process; package nu.marginalia.control.fsm.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService; import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.converting.mqapi.ConverterInboxNames;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.graph.AbstractStateGraph;
@ -14,35 +13,39 @@ import java.sql.SQLException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Singleton @Singleton
public class ConverterMonitorProcess extends AbstractStateGraph { public class AbstractProcessSpawnerFSM extends AbstractStateGraph {
private final MqPersistence persistence; private final MqPersistence persistence;
private final ProcessService processService; private final ProcessService processService;
public static final String INITIAL = "INITIAL"; public static final String INITIAL = "INITIAL";
public static final String CHECK = "CHECK"; public static final String MONITOR = "MONITOR";
public static final String RUN = "RUN"; public static final String RUN = "RUN";
public static final String END = "END"; public static final String END = "END";
public static final int MAX_ATTEMPTS = 3; public static final int MAX_ATTEMPTS = 3;
public static final String inboxName = ConverterInboxNames.CONVERTER_INBOX; private final String inboxName;
public static final ProcessService.ProcessId processId = ProcessService.ProcessId.CONVERTER; private final ProcessService.ProcessId processId;
@Inject @Inject
public ConverterMonitorProcess(StateFactory stateFactory, public AbstractProcessSpawnerFSM(StateFactory stateFactory,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessService processService,
String inboxName,
ProcessService.ProcessId processId) {
super(stateFactory); super(stateFactory);
this.persistence = persistence; this.persistence = persistence;
this.processService = processService; this.processService = processService;
this.inboxName = inboxName;
this.processId = processId;
} }
@GraphState(name = INITIAL, next = CHECK) @GraphState(name = INITIAL, next = MONITOR)
public void init() { public void init() {
} }
@GraphState(name = CHECK, resume = ResumeBehavior.RETRY) @GraphState(name = MONITOR, resume = ResumeBehavior.RETRY)
public void check() throws SQLException, InterruptedException { public void monitor() throws SQLException, InterruptedException {
for (;;) { for (;;) {
var messages = persistence.eavesdrop(inboxName, 1); var messages = persistence.eavesdrop(inboxName, 1);
@ -67,7 +70,7 @@ public class ConverterMonitorProcess extends AbstractStateGraph {
else throw e; else throw e;
} }
transition(CHECK); transition(MONITOR);
} }
} }

View File

@ -0,0 +1,22 @@
package nu.marginalia.control.fsm.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.converting.mqapi.ConverterInboxNames;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;
@Singleton
public class ConverterMonitorFSM extends AbstractProcessSpawnerFSM {
@Inject
public ConverterMonitorFSM(StateFactory stateFactory,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory, persistence, processService, ConverterInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER);
}
}

View File

@ -0,0 +1,72 @@
package nu.marginalia.control.fsm.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Singleton
public class FileStorageMonitorFSM extends AbstractStateGraph {
private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES
private static final String INITIAL = "INITIAL";
private static final String MONITOR = "MONITOR";
private static final String PURGE = "PURGE";
private static final String END = "END";
private final FileStorageService fileStorageService;
@Inject
public FileStorageMonitorFSM(StateFactory stateFactory,
FileStorageService fileStorageService) {
super(stateFactory);
this.fileStorageService = fileStorageService;
}
@GraphState(name = INITIAL, next = MONITOR)
public void init() {
}
@GraphState(name = MONITOR, resume = ResumeBehavior.RETRY)
public void monitor() throws Exception {
for (;;) {
Optional<FileStorage> toDeleteOpt = fileStorageService.findFileStorageToDelete();
if (toDeleteOpt.isEmpty()) {
TimeUnit.SECONDS.sleep(10);
}
else {
transition(PURGE, toDeleteOpt.get().id());
}
}
}
@GraphState(name = PURGE, next = MONITOR, resume = ResumeBehavior.RETRY)
public void purge(FileStorageId id) throws Exception {
var storage = fileStorageService.getStorage(id);
logger.info("Deleting {} ", storage.path());
Path path = storage.asPath();
if (Files.exists(path)) {
FileUtils.deleteDirectory(path.toFile());
}
fileStorageService.removeFileStorage(storage.id());
}
}

View File

@ -0,0 +1,24 @@
package nu.marginalia.control.fsm.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.converting.mqapi.ConverterInboxNames;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;
@Singleton
public class LoaderMonitorFSM extends AbstractProcessSpawnerFSM {
@Inject
public LoaderMonitorFSM(StateFactory stateFactory,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory, persistence, processService,
ConverterInboxNames.LOADER_INBOX,
ProcessService.ProcessId.LOADER);
}
}

View File

@ -0,0 +1,45 @@
package nu.marginalia.control.fsm.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.util.concurrent.TimeUnit;
@Singleton
public class MessageQueueMonitorFSM extends AbstractStateGraph {
// STATES
private static final String INITIAL = "INITIAL";
private static final String MONITOR = "MONITOR";
private static final String END = "END";
private final MqPersistence persistence;
@Inject
public MessageQueueMonitorFSM(StateFactory stateFactory,
MqPersistence persistence) {
super(stateFactory);
this.persistence = persistence;
}
@GraphState(name = INITIAL, next = MONITOR)
public void init() {
}
@GraphState(name = MONITOR, resume = ResumeBehavior.RETRY)
public void monitor() throws Exception {
for (;;) {
persistence.reapDeadMessages();
persistence.cleanOldMessages();
TimeUnit.SECONDS.sleep(60);
}
}
}

View File

@ -0,0 +1,55 @@
package nu.marginalia.control.fsm.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.model.ProcessHeartbeat;
import nu.marginalia.control.svc.HeartbeatService;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.util.concurrent.TimeUnit;
@Singleton
public class ProcessLivenessMonitorFSM extends AbstractStateGraph {
// STATES
private static final String INITIAL = "INITIAL";
private static final String MONITOR = "MONITOR";
private static final String END = "END";
private final ProcessService processService;
private final HeartbeatService heartbeatService;
@Inject
public ProcessLivenessMonitorFSM(StateFactory stateFactory,
ProcessService processService,
HeartbeatService heartbeatService) {
super(stateFactory);
this.processService = processService;
this.heartbeatService = heartbeatService;
}
@GraphState(name = INITIAL, next = MONITOR)
public void init() {
}
@GraphState(name = MONITOR, resume = ResumeBehavior.RETRY)
public void monitor() throws Exception {
for (;;) {
var processHeartbeats = heartbeatService.getProcessHeartbeats();
processHeartbeats.stream()
.filter(ProcessHeartbeat::isRunning)
.filter(p -> !processService.isRunning(p.getProcessId()))
.forEach(heartbeatService::flagProcessAsStopped);
TimeUnit.SECONDS.sleep(60);
}
}
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.process; package nu.marginalia.control.fsm.task;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -15,7 +15,6 @@ import nu.marginalia.db.storage.model.FileStorageBaseType;
import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.index.client.IndexClient; import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
@ -24,17 +23,12 @@ import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.mqsm.graph.ResumeBehavior;
import nu.marginalia.search.client.SearchClient; import nu.marginalia.search.client.SearchClient;
import nu.marginalia.search.client.SearchMqEndpoints;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@Singleton @Singleton
public class ReconvertAndLoadProcess extends AbstractStateGraph { public class ReconvertAndLoadFSM extends AbstractStateGraph {
// STATES // STATES
@ -66,13 +60,13 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph {
}; };
@Inject @Inject
public ReconvertAndLoadProcess(StateFactory stateFactory, public ReconvertAndLoadFSM(StateFactory stateFactory,
ProcessService processService, ProcessService processService,
IndexClient indexClient, IndexClient indexClient,
ProcessOutboxFactory processOutboxFactory, ProcessOutboxFactory processOutboxFactory,
SearchClient searchClient, SearchClient searchClient,
FileStorageService storageService, FileStorageService storageService,
Gson gson Gson gson
) )
{ {
super(stateFactory); super(stateFactory);

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.process; package nu.marginalia.control.fsm.task;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -12,7 +12,7 @@ import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.mqsm.graph.ResumeBehavior;
@Singleton @Singleton
public class RepartitionReindexProcess extends AbstractStateGraph { public class RepartitionReindexFSM extends AbstractStateGraph {
private final MqOutbox indexOutbox; private final MqOutbox indexOutbox;
@ -27,8 +27,8 @@ public class RepartitionReindexProcess extends AbstractStateGraph {
@Inject @Inject
public RepartitionReindexProcess(StateFactory stateFactory, public RepartitionReindexFSM(StateFactory stateFactory,
IndexClient indexClient) { IndexClient indexClient) {
super(stateFactory); super(stateFactory);
indexOutbox = indexClient.outbox(); indexOutbox = indexClient.outbox();

View File

@ -4,9 +4,13 @@ public enum ControlProcess {
REPARTITION_REINDEX, REPARTITION_REINDEX,
RECONVERT_LOAD, RECONVERT_LOAD,
CONVERTER_MONITOR, CONVERTER_MONITOR,
LOADER_MONITOR LOADER_MONITOR,
MESSAGE_QUEUE_MONITOR,
PROCESS_LIVENESS_MONITOR,
FILE_STORAGE_MONITOR
; ;
public String id() { public String id() {
return "fsm:" + name().toLowerCase(); return "fsm:" + name().toLowerCase();
} }

View File

@ -1,5 +1,7 @@
package nu.marginalia.control.model; package nu.marginalia.control.model;
import nu.marginalia.control.svc.ProcessService;
public record ProcessHeartbeat( public record ProcessHeartbeat(
String processId, String processId,
String processBase, String processBase,
@ -23,6 +25,9 @@ public record ProcessHeartbeat(
public boolean isStopped() { public boolean isStopped() {
return "STOPPED".equals(status); return "STOPPED".equals(status);
} }
public boolean isRunning() {
return "RUNNING".equals(status);
}
public String progressStyle() { public String progressStyle() {
if ("RUNNING".equals(status) && progress != null) { if ("RUNNING".equals(status) && progress != null) {
return """ return """
@ -31,4 +36,13 @@ public record ProcessHeartbeat(
} }
return ""; return "";
} }
public ProcessService.ProcessId getProcessId() {
return switch (processBase) {
case "converter" -> ProcessService.ProcessId.CONVERTER;
case "crawler" -> ProcessService.ProcessId.CRAWLER;
case "loader" -> ProcessService.ProcessId.LOADER;
default -> throw new RuntimeException("Unknown process base: " + processBase);
};
}
} }

View File

@ -1,73 +0,0 @@
package nu.marginalia.control.process;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.converting.mqapi.ConverterInboxNames;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
@Singleton
public class LoaderMonitorProcess extends AbstractStateGraph {
private final MqPersistence persistence;
private final ProcessService processService;
public static final String INITIAL = "INITIAL";
public static final String CHECK = "CHECK";
public static final String RUN = "RUN";
public static final String END = "END";
public static final int MAX_ATTEMPTS = 1;
public static final String inboxName = ConverterInboxNames.LOADER_INBOX;
public static final ProcessService.ProcessId processId = ProcessService.ProcessId.LOADER;
@Inject
public LoaderMonitorProcess(StateFactory stateFactory,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory);
this.persistence = persistence;
this.processService = processService;
}
@GraphState(name = INITIAL, next = CHECK)
public void init() {
}
@GraphState(name = CHECK, resume = ResumeBehavior.RETRY)
public void check() throws SQLException, InterruptedException {
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
TimeUnit.SECONDS.sleep(5);
} else {
transition(RUN, 0);
}
}
}
@GraphState(name = RUN)
public void run(Integer attempts) throws Exception {
try {
processService.trigger(processId);
}
catch (Exception e) {
if (attempts < MAX_ATTEMPTS) {
transition(RUN, attempts + 1);
}
else throw e;
}
transition(CHECK);
}
}

View File

@ -5,6 +5,7 @@ import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.control.model.ProcessHeartbeat; import nu.marginalia.control.model.ProcessHeartbeat;
import nu.marginalia.control.model.ServiceHeartbeat; import nu.marginalia.control.model.ServiceHeartbeat;
import nu.marginalia.service.control.ServiceEventLog;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
@ -13,10 +14,13 @@ import java.util.List;
@Singleton @Singleton
public class HeartbeatService { public class HeartbeatService {
private final HikariDataSource dataSource; private final HikariDataSource dataSource;
private final ServiceEventLog eventLogService;
@Inject @Inject
public HeartbeatService(HikariDataSource dataSource) { public HeartbeatService(HikariDataSource dataSource,
ServiceEventLog eventLogService) {
this.dataSource = dataSource; this.dataSource = dataSource;
this.eventLogService = eventLogService;
} }
public List<ServiceHeartbeat> getServiceHeartbeats() { public List<ServiceHeartbeat> getServiceHeartbeats() {
@ -77,4 +81,23 @@ public class HeartbeatService {
return heartbeats; return heartbeats;
} }
public void flagProcessAsStopped(ProcessHeartbeat processHeartbeat) {
eventLogService.logEvent("PROCESS-MISSING", "Marking stale process heartbeat "
+ processHeartbeat.processId() + " / " + processHeartbeat.uuidFull() + " as stopped");
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
UPDATE PROCESS_HEARTBEAT
SET STATUS = 'STOPPED'
WHERE INSTANCE = ?
""")) {
stmt.setString(1, processHeartbeat.uuidFull());
stmt.executeUpdate();
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
} }

View File

@ -1,62 +0,0 @@
package nu.marginalia.control.svc;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.control.ServiceEventLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
@Singleton
public class MessageQueueMonitorService {
private final Logger logger = LoggerFactory.getLogger(MessageQueueMonitorService.class);
private final MqPersistence persistence;
private final ServiceEventLog eventLog;
@Inject
public MessageQueueMonitorService(ServiceEventLog eventLog, MqPersistence persistence) {
this.eventLog = eventLog;
this.persistence = persistence;
Thread reaperThread = new Thread(this::run, "message-queue-reaper");
reaperThread.setDaemon(true);
reaperThread.start();
}
private void run() {
for (;;) {
try {
TimeUnit.MINUTES.sleep(10);
reapMessages();
}
catch (InterruptedException ex) {
logger.info("Message queue reaper interrupted");
break;
}
catch (Exception ex) {
logger.error("Message queue reaper failed", ex);
}
}
}
private void reapMessages() throws SQLException {
int outcome = persistence.reapDeadMessages();
if (outcome > 0) {
eventLog.logEvent("MESSAGE-QUEUE-REAPED", Integer.toString(outcome));
logger.info("Reaped {} dead messages from message queue", outcome);
}
outcome = persistence.cleanOldMessages();
if (outcome > 0) {
eventLog.logEvent("MESSAGE-QUEUE-CLEANED", Integer.toString(outcome));
logger.info("Cleaned {} stale messages from message queue", outcome);
}
}
}