(converter) WIP

This commit is contained in:
Viktor Lofgren 2023-07-19 17:14:45 +02:00
parent c0b5ea0e7d
commit 08ca6399ec
30 changed files with 730 additions and 323 deletions

View File

@ -59,8 +59,16 @@ public class StateMachine {
registerStates(stateGraph); registerStates(stateGraph);
for (var declaredState : stateGraph.declaredStates()) { for (var declaredState : stateGraph.declaredStates()) {
if (!allStates.containsKey(declaredState)) { if (!allStates.containsKey(declaredState.name())) {
throw new IllegalArgumentException("State " + declaredState + " is not defined in the state graph"); throw new IllegalArgumentException("State " + declaredState.name() + " is not defined in the state graph");
}
if (!allStates.containsKey(declaredState.next())) {
throw new IllegalArgumentException("State " + declaredState.next() + " is not defined in the state graph");
}
for (var state : declaredState.transitions()) {
if (!allStates.containsKey(state)) {
throw new IllegalArgumentException("State " + state + " is not defined in the state graph");
}
} }
} }

View File

@ -30,6 +30,7 @@ public abstract class AbstractStateGraph {
throw new ControlFlowException("ERROR", ""); throw new ControlFlowException("ERROR", "");
} }
public <T> void error(T payload) { public <T> void error(T payload) {
throw new ControlFlowException("ERROR", payload); throw new ControlFlowException("ERROR", payload);
} }
@ -38,19 +39,31 @@ public abstract class AbstractStateGraph {
throw new ControlFlowException("ERROR", ex.getClass().getSimpleName() + ":" + ex.getMessage()); throw new ControlFlowException("ERROR", ex.getClass().getSimpleName() + ":" + ex.getMessage());
} }
public Set<String> declaredStates() { public Set<GraphState> declaredStates() {
Set<String> ret = new HashSet<>(); Set<GraphState> ret = new HashSet<>();
for (var method : getClass().getMethods()) { for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(GraphState.class); var gs = method.getAnnotation(GraphState.class);
if (gs != null) { if (gs != null) {
ret.add(gs.name()); ret.add(gs);
ret.add(gs.next());
} }
} }
return ret; return ret;
} }
public Set<TerminalState> terminalStates() {
Set<TerminalState> ret = new HashSet<>();
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(TerminalState.class);
if (gs != null) {
ret.add(gs);
}
}
return ret;
}
public List<MachineState> asStateList() { public List<MachineState> asStateList() {
List<MachineState> ret = new ArrayList<>(); List<MachineState> ret = new ArrayList<>();
@ -59,6 +72,13 @@ public abstract class AbstractStateGraph {
if (gs != null) { if (gs != null) {
ret.add(graphState(method, gs)); ret.add(graphState(method, gs));
} }
var ts = method.getAnnotation(TerminalState.class);
if (ts != null) {
ret.add(stateFactory.create(ts.name(), ResumeBehavior.ERROR, () -> {
throw new ControlFlowException(ts.name(), null);
}));
}
} }
return ret; return ret;

View File

@ -8,5 +8,7 @@ import java.lang.annotation.RetentionPolicy;
public @interface GraphState { public @interface GraphState {
String name(); String name();
String next() default "ERROR"; String next() default "ERROR";
String[] transitions() default {};
String description() default "";
ResumeBehavior resume() default ResumeBehavior.ERROR; ResumeBehavior resume() default ResumeBehavior.ERROR;
} }

View File

@ -6,4 +6,5 @@ import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
public @interface TerminalState { public @interface TerminalState {
String name(); String name();
String description() default "";
} }

View File

@ -5,9 +5,11 @@ appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %style{%-8markerSimpleName}{FG_Cyan} %highlight{%-5level}{FATAL=red, ERROR=red, WARN=yellow} %-24t %-20c{1} -- %msg{nolookups}%n appender.console.layout.pattern = %d{HH:mm:ss,SSS} %style{%-8markerSimpleName}{FG_Cyan} %highlight{%-5level}{FATAL=red, ERROR=red, WARN=yellow} %-24t %-20c{1} -- %msg{nolookups}%n
appender.console.filter.process.type = MarkerFilter appender.console.filter.process.type = MarkerFilter
appender.console.filter.process.onMismatch=ACCEPT
appender.console.filter.process.onMatch=DENY appender.console.filter.process.onMatch=DENY
appender.console.filter.process.marker=PROCESS appender.console.filter.process.marker=PROCESS
appender.console.filter.http.type = MarkerFilter appender.console.filter.http.type = MarkerFilter
appender.console.filter.http.onMismatch=ACCEPT
appender.console.filter.http.onMatch=DENY appender.console.filter.http.onMatch=DENY
appender.console.filter.http.marker=HTTP appender.console.filter.http.marker=HTTP
appender.processconsole.type = Console appender.processconsole.type = Console
@ -16,6 +18,7 @@ appender.processconsole.layout.type = PatternLayout
appender.processconsole.layout.pattern = %msg{nolookups}%n appender.processconsole.layout.pattern = %msg{nolookups}%n
appender.processconsole.filter.process.type = MarkerFilter appender.processconsole.filter.process.type = MarkerFilter
appender.processconsole.filter.process.onMismatch=DENY appender.processconsole.filter.process.onMismatch=DENY
appender.processconsole.filter.process.onMatch=ACCEPT
appender.processconsole.filter.process.marker=PROCESS appender.processconsole.filter.process.marker=PROCESS
appender.rolling.type = RollingFile appender.rolling.type = RollingFile
appender.rolling.name = RollingFile appender.rolling.name = RollingFile
@ -29,15 +32,15 @@ appender.rolling.policies.size.size=10MB
appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10 appender.rolling.strategy.max = 10
appender.rolling.filter.query.type = MarkerFilter appender.rolling.filter.query.type = MarkerFilter
appender.rolling.filter.query.onMismatch=NEUTRAL appender.rolling.filter.query.onMismatch=ACCEPT
appender.rolling.filter.query.onMatch=DENY appender.rolling.filter.query.onMatch=DENY
appender.rolling.filter.query.marker=QUERY appender.rolling.filter.query.marker=QUERY
appender.rolling.filter.http.type = MarkerFilter appender.rolling.filter.http.type = MarkerFilter
appender.rolling.filter.http.onMismatch=NEUTRAL appender.rolling.filter.http.onMismatch=ACCEPT
appender.rolling.filter.http.onMatch=DENY appender.rolling.filter.http.onMatch=DENY
appender.rolling.filter.http.marker=HTTP appender.rolling.filter.http.marker=HTTP
appender.rolling.filter.process.type = MarkerFilter appender.rolling.filter.process.type = MarkerFilter
appender.rolling.filter.process.onMismatch=NEUTRAL appender.rolling.filter.process.onMismatch=ACCEPT
appender.rolling.filter.process.onMatch=DENY appender.rolling.filter.process.onMatch=DENY
appender.rolling.filter.process.marker=PROCESS appender.rolling.filter.process.marker=PROCESS
appender.process.type = RollingFile appender.process.type = RollingFile
@ -53,6 +56,7 @@ appender.process.strategy.type = DefaultRolloverStrategy
appender.process.strategy.max = 10 appender.process.strategy.max = 10
appender.process.filter.process.type = MarkerFilter appender.process.filter.process.type = MarkerFilter
appender.process.filter.process.onMismatch=DENY appender.process.filter.process.onMismatch=DENY
appender.process.filter.process.onMatch=ACCEPT
appender.process.filter.process.marker=PROCESS appender.process.filter.process.marker=PROCESS
rootLogger.level = info rootLogger.level = info
rootLogger.appenderRef.console.ref = LogToConsole rootLogger.appenderRef.console.ref = LogToConsole

View File

@ -3,9 +3,9 @@ package nu.marginalia.control;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors; import nu.marginalia.client.ServiceMonitors;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.svc.*; import nu.marginalia.control.svc.*;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.renderer.MustacheRenderer;
import nu.marginalia.renderer.RendererFactory; import nu.marginalia.renderer.RendererFactory;
import nu.marginalia.service.server.*; import nu.marginalia.service.server.*;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -25,7 +25,7 @@ public class ControlService extends Service {
private final ServiceMonitors monitors; private final ServiceMonitors monitors;
private final HeartbeatService heartbeatService; private final HeartbeatService heartbeatService;
private final EventLogService eventLogService; private final EventLogService eventLogService;
private final ControlFsmService controlFsmService; private final ControlActorService controlActorService;
private final StaticResources staticResources; private final StaticResources staticResources;
private final MessageQueueViewService messageQueueViewService; private final MessageQueueViewService messageQueueViewService;
private final ControlFileStorageService controlFileStorageService; private final ControlFileStorageService controlFileStorageService;
@ -37,7 +37,7 @@ public class ControlService extends Service {
HeartbeatService heartbeatService, HeartbeatService heartbeatService,
EventLogService eventLogService, EventLogService eventLogService,
RendererFactory rendererFactory, RendererFactory rendererFactory,
ControlFsmService controlFsmService, ControlActorService controlActorService,
StaticResources staticResources, StaticResources staticResources,
MessageQueueViewService messageQueueViewService, MessageQueueViewService messageQueueViewService,
ControlFileStorageService controlFileStorageService ControlFileStorageService controlFileStorageService
@ -51,10 +51,11 @@ public class ControlService extends Service {
var indexRenderer = rendererFactory.renderer("control/index"); var indexRenderer = rendererFactory.renderer("control/index");
var servicesRenderer = rendererFactory.renderer("control/services"); var servicesRenderer = rendererFactory.renderer("control/services");
var serviceByIdRenderer = rendererFactory.renderer("control/service-by-id"); var serviceByIdRenderer = rendererFactory.renderer("control/service-by-id");
var processesRenderer = rendererFactory.renderer("control/processes"); var actorsRenderer = rendererFactory.renderer("control/actors");
var actorDetailsRenderer = rendererFactory.renderer("control/actor-details");
var storageRenderer = rendererFactory.renderer("control/storage"); var storageRenderer = rendererFactory.renderer("control/storage");
this.controlFsmService = controlFsmService; this.controlActorService = controlActorService;
this.staticResources = staticResources; this.staticResources = staticResources;
this.messageQueueViewService = messageQueueViewService; this.messageQueueViewService = messageQueueViewService;
@ -69,18 +70,20 @@ public class ControlService extends Service {
Spark.get("/public/services", this::servicesModel, servicesRenderer::render); Spark.get("/public/services", this::servicesModel, servicesRenderer::render);
Spark.get("/public/services/:id", this::serviceModel, serviceByIdRenderer::render); Spark.get("/public/services/:id", this::serviceModel, serviceByIdRenderer::render);
Spark.get("/public/processes", this::processesModel, processesRenderer::render); Spark.get("/public/messages/:id", this::messageModel, gson::toJson);
Spark.get("/public/actors", this::processesModel, actorsRenderer::render);
Spark.get("/public/actors/:fsm", this::actorDetailsModel, actorDetailsRenderer::render);
Spark.get("/public/storage", this::storageModel, storageRenderer::render); Spark.get("/public/storage", this::storageModel, storageRenderer::render);
final HtmlRedirect redirectToServices = new HtmlRedirect("/services"); final HtmlRedirect redirectToServices = new HtmlRedirect("/services");
final HtmlRedirect redirectToProcesses = new HtmlRedirect("/processes"); final HtmlRedirect redirectToProcesses = new HtmlRedirect("/actors");
final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage"); final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage");
Spark.post("/public/fsms/:fsm/start", controlFsmService::startFsm, redirectToProcesses); Spark.post("/public/fsms/:fsm/start", controlActorService::startFsm, redirectToProcesses);
Spark.post("/public/fsms/:fsm/stop", controlFsmService::stopFsm, redirectToProcesses); Spark.post("/public/fsms/:fsm/stop", controlActorService::stopFsm, redirectToProcesses);
Spark.post("/public/storage/:fid/process", controlFsmService::triggerProcessing, redirectToProcesses); Spark.post("/public/storage/:fid/process", controlActorService::triggerProcessing, redirectToProcesses);
Spark.post("/public/storage/:fid/load", controlFsmService::loadProcessedData, redirectToProcesses); Spark.post("/public/storage/:fid/load", controlActorService::loadProcessedData, redirectToProcesses);
Spark.post("/public/storage/:fid/delete", controlFileStorageService::flagFileForDeletionRequest, redirectToStorage); Spark.post("/public/storage/:fid/delete", controlFileStorageService::flagFileForDeletionRequest, redirectToStorage);
@ -89,6 +92,18 @@ public class ControlService extends Service {
monitors.subscribe(this::logMonitorStateChange); monitors.subscribe(this::logMonitorStateChange);
} }
private Object messageModel(Request request, Response response) {
var message = messageQueueViewService.getMessage(Long.parseLong(request.params("id")));
if (message != null) {
response.type("application/json");
return message;
}
else {
response.status(404);
return "";
}
}
private Object serviceModel(Request request, Response response) { private Object serviceModel(Request request, Response response) {
String serviceName = request.params("id"); String serviceName = request.params("id");
@ -108,10 +123,18 @@ public class ControlService extends Service {
private Object processesModel(Request request, Response response) { private Object processesModel(Request request, Response response) {
return Map.of("processes", heartbeatService.getProcessHeartbeats(), return Map.of("processes", heartbeatService.getProcessHeartbeats(),
"fsms", controlFsmService.getFsmStates(), "actors", controlActorService.getActorStates(),
"messages", messageQueueViewService.getLastEntries(20)); "messages", messageQueueViewService.getLastEntries(20));
} }
private Object actorDetailsModel(Request request, Response response) {
final Actor actor = Actor.valueOf(request.params("fsm").toUpperCase());
final String inbox = actor.id();
return Map.of(
"actor", actor,
"state-graph", controlActorService.getActorStateGraph(actor),
"messages", messageQueueViewService.getLastEntriesForInbox(inbox, 20));
}
private Object serveStatic(Request request, Response response) { private Object serveStatic(Request request, Response response) {
String resource = request.params("resource"); String resource = request.params("resource");

View File

@ -0,0 +1,111 @@
package nu.marginalia.control.actor;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.actor.monitor.*;
import nu.marginalia.control.actor.monitor.ConverterMonitorActor;
import nu.marginalia.control.actor.monitor.LoaderMonitorActor;
import nu.marginalia.control.actor.task.ReconvertAndLoadActor;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqsm.StateMachine;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.state.MachineState;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@Singleton
public class ControlActors {
private final ServiceEventLog eventLog;
private final Gson gson;
private final MessageQueueFactory messageQueueFactory;
public Map<Actor, StateMachine> stateMachines = new HashMap<>();
public Map<Actor, AbstractStateGraph> actorDefinitions = new HashMap<>();
@Inject
public ControlActors(MessageQueueFactory messageQueueFactory,
GsonFactory gsonFactory,
BaseServiceParams baseServiceParams,
ReconvertAndLoadActor reconvertAndLoadActor,
ConverterMonitorActor converterMonitorFSM,
LoaderMonitorActor loaderMonitor,
MessageQueueMonitorActor messageQueueMonitor,
ProcessLivenessMonitorActor processMonitorFSM,
FileStorageMonitorActor fileStorageMonitorActor
) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
this.gson = gsonFactory.get();
register(Actor.RECONVERT_LOAD, reconvertAndLoadActor);
register(Actor.CONVERTER_MONITOR, converterMonitorFSM);
register(Actor.LOADER_MONITOR, loaderMonitor);
register(Actor.MESSAGE_QUEUE_MONITOR, messageQueueMonitor);
register(Actor.PROCESS_LIVENESS_MONITOR, processMonitorFSM);
register(Actor.FILE_STORAGE_MONITOR, fileStorageMonitorActor);
}
private void register(Actor process, AbstractStateGraph graph) {
var sm = new StateMachine(messageQueueFactory, process.id(), UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function));
stateMachines.put(process, sm);
actorDefinitions.put(process, graph);
}
private void logStateChange(Actor process, String state) {
eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state);
}
public void startFrom(Actor process, String state) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state);
}
public void start(Actor process) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init();
}
public <T> void startFrom(Actor process, String state, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, gson.toJson(arg));
}
public <T> void start(Actor process, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(gson.toJson(arg));
}
@SneakyThrows
public void stop(Actor fsm) {
stateMachines.get(fsm).abortExecution();
}
public Map<Actor, MachineState> getActorStates() {
return stateMachines.entrySet().stream().collect(
Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().getState())
);
}
public MachineState getActorStates(Actor actor) {
return stateMachines.get(actor).getState();
}
public AbstractStateGraph getActorDefinition(Actor actor) {
return actorDefinitions.get(actor);
}
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.fsm.monitor; package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -18,10 +18,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@Singleton @Singleton
public class AbstractProcessSpawnerFSM extends AbstractStateGraph { public class AbstractProcessSpawnerActor extends AbstractStateGraph {
private final MqPersistence persistence; private final MqPersistence persistence;
private final ProcessService processService; private final ProcessService processService;
@ -30,9 +29,9 @@ public class AbstractProcessSpawnerFSM extends AbstractStateGraph {
public static final String INITIAL = "INITIAL"; public static final String INITIAL = "INITIAL";
public static final String MONITOR = "MONITOR"; public static final String MONITOR = "MONITOR";
public static final String ABORTED= "ABORTED";
public static final String RUN = "RUN"; public static final String RUN = "RUN";
public static final String ERROR = "ERROR"; public static final String ERROR = "ERROR";
public static final String ABORTED = "ABORTED";
public static final String END = "END"; public static final String END = "END";
public static final int MAX_ATTEMPTS = 3; public static final int MAX_ATTEMPTS = 3;
@ -41,11 +40,11 @@ public class AbstractProcessSpawnerFSM extends AbstractStateGraph {
private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final ExecutorService executorService = Executors.newSingleThreadExecutor();
@Inject @Inject
public AbstractProcessSpawnerFSM(StateFactory stateFactory, public AbstractProcessSpawnerActor(StateFactory stateFactory,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService, ProcessService processService,
String inboxName, String inboxName,
ProcessService.ProcessId processId) { ProcessService.ProcessId processId) {
super(stateFactory); super(stateFactory);
this.persistence = persistence; this.persistence = persistence;
this.processService = processService; this.processService = processService;
@ -58,7 +57,15 @@ public class AbstractProcessSpawnerFSM extends AbstractStateGraph {
} }
@GraphState(name = MONITOR, resume = ResumeBehavior.RETRY) @GraphState(name = MONITOR,
next = MONITOR,
resume = ResumeBehavior.RETRY,
transitions = {MONITOR, RUN},
description = """
Monitors the inbox of the process for messages.
If a message is found, transition to RUN.
"""
)
public void monitor() throws SQLException, InterruptedException { public void monitor() throws SQLException, InterruptedException {
for (;;) { for (;;) {
@ -72,7 +79,17 @@ public class AbstractProcessSpawnerFSM extends AbstractStateGraph {
} }
} }
@GraphState(name = RUN, resume = ResumeBehavior.RESTART) @GraphState(name = RUN,
resume = ResumeBehavior.RESTART,
transitions = {MONITOR, ERROR, RUN, ABORTED},
description = """
Runs the process.
If the process fails, retransition to RUN up to MAX_ATTEMPTS times.
After MAX_ATTEMPTS at restarting the process, transition to ERROR.
If the process is cancelled, transition to ABORTED.
If the process is successful, transition to MONITOR.
"""
)
public void run(Integer attempts) throws Exception { public void run(Integer attempts) throws Exception {
if (attempts == null) if (attempts == null)
attempts = 0; attempts = 0;
@ -94,7 +111,7 @@ public class AbstractProcessSpawnerFSM extends AbstractStateGraph {
transition(MONITOR); transition(MONITOR);
} }
@TerminalState(name = ABORTED) @TerminalState(name = ABORTED, description = "The process was manually aborted")
public void aborted() throws Exception {} public void aborted() throws Exception {}

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.fsm.monitor; package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -8,13 +8,13 @@ import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.mqsm.StateFactory;
@Singleton @Singleton
public class ConverterMonitorFSM extends AbstractProcessSpawnerFSM { public class ConverterMonitorActor extends AbstractProcessSpawnerActor {
@Inject @Inject
public ConverterMonitorFSM(StateFactory stateFactory, public ConverterMonitorActor(StateFactory stateFactory,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessService processService) {
super(stateFactory, persistence, processService, ProcessInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER); super(stateFactory, persistence, processService, ProcessInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER);
} }

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.fsm.monitor; package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -19,7 +19,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Singleton @Singleton
public class FileStorageMonitorFSM extends AbstractStateGraph { public class FileStorageMonitorActor extends AbstractStateGraph {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES // STATES
@ -32,8 +32,8 @@ public class FileStorageMonitorFSM extends AbstractStateGraph {
@Inject @Inject
public FileStorageMonitorFSM(StateFactory stateFactory, public FileStorageMonitorActor(StateFactory stateFactory,
FileStorageService fileStorageService) { FileStorageService fileStorageService) {
super(stateFactory); super(stateFactory);
this.fileStorageService = fileStorageService; this.fileStorageService = fileStorageService;
} }
@ -42,7 +42,11 @@ public class FileStorageMonitorFSM extends AbstractStateGraph {
public void init() { public void init() {
} }
@GraphState(name = MONITOR, resume = ResumeBehavior.RETRY) @GraphState(name = MONITOR, next = PURGE, resume = ResumeBehavior.RETRY, transitions = { PURGE },
description = """
Monitor the file storage and trigger at transition to PURGE if any file storage area
has been marked for deletion.
""")
public void monitor() throws Exception { public void monitor() throws Exception {
for (;;) { for (;;) {
@ -57,7 +61,13 @@ public class FileStorageMonitorFSM extends AbstractStateGraph {
} }
} }
@GraphState(name = PURGE, next = MONITOR, resume = ResumeBehavior.RETRY) @GraphState(name = PURGE,
next = MONITOR,
resume = ResumeBehavior.RETRY,
description = """
Purge the file storage area and transition back to MONITOR.
"""
)
public void purge(FileStorageId id) throws Exception { public void purge(FileStorageId id) throws Exception {
var storage = fileStorageService.getStorage(id); var storage = fileStorageService.getStorage(id);
logger.info("Deleting {} ", storage.path()); logger.info("Deleting {} ", storage.path());

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.fsm.monitor; package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -8,13 +8,13 @@ import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.mqsm.StateFactory;
@Singleton @Singleton
public class LoaderMonitorFSM extends AbstractProcessSpawnerFSM { public class LoaderMonitorActor extends AbstractProcessSpawnerActor {
@Inject @Inject
public LoaderMonitorFSM(StateFactory stateFactory, public LoaderMonitorActor(StateFactory stateFactory,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessService processService) {
super(stateFactory, persistence, processService, super(stateFactory, persistence, processService,
ProcessInboxNames.LOADER_INBOX, ProcessInboxNames.LOADER_INBOX,

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.fsm.monitor; package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -11,7 +11,7 @@ import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Singleton @Singleton
public class MessageQueueMonitorFSM extends AbstractStateGraph { public class MessageQueueMonitorActor extends AbstractStateGraph {
// STATES // STATES
@ -22,8 +22,8 @@ public class MessageQueueMonitorFSM extends AbstractStateGraph {
@Inject @Inject
public MessageQueueMonitorFSM(StateFactory stateFactory, public MessageQueueMonitorActor(StateFactory stateFactory,
MqPersistence persistence) { MqPersistence persistence) {
super(stateFactory); super(stateFactory);
this.persistence = persistence; this.persistence = persistence;
} }
@ -32,7 +32,10 @@ public class MessageQueueMonitorFSM extends AbstractStateGraph {
public void init() { public void init() {
} }
@GraphState(name = MONITOR, resume = ResumeBehavior.RETRY) @GraphState(name = MONITOR, next = MONITOR, resume = ResumeBehavior.RETRY,
description = """
Periodically clean up the message queue.
""")
public void monitor() throws Exception { public void monitor() throws Exception {
for (;;) { for (;;) {

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.fsm.monitor; package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -13,7 +13,7 @@ import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Singleton @Singleton
public class ProcessLivenessMonitorFSM extends AbstractStateGraph { public class ProcessLivenessMonitorActor extends AbstractStateGraph {
// STATES // STATES
@ -25,9 +25,9 @@ public class ProcessLivenessMonitorFSM extends AbstractStateGraph {
@Inject @Inject
public ProcessLivenessMonitorFSM(StateFactory stateFactory, public ProcessLivenessMonitorActor(StateFactory stateFactory,
ProcessService processService, ProcessService processService,
HeartbeatService heartbeatService) { HeartbeatService heartbeatService) {
super(stateFactory); super(stateFactory);
this.processService = processService; this.processService = processService;
this.heartbeatService = heartbeatService; this.heartbeatService = heartbeatService;
@ -37,7 +37,12 @@ public class ProcessLivenessMonitorFSM extends AbstractStateGraph {
public void init() { public void init() {
} }
@GraphState(name = MONITOR, resume = ResumeBehavior.RETRY) @GraphState(name = MONITOR, next = MONITOR, resume = ResumeBehavior.RETRY, description = """
Periodically check to ensure that the control service's view of
running processes is agreement with the process heartbeats table.
If the process is not running, mark the process as stopped in the table.
""")
public void monitor() throws Exception { public void monitor() throws Exception {
for (;;) { for (;;) {

View File

@ -1,4 +1,4 @@
package nu.marginalia.control.fsm.task; package nu.marginalia.control.actor.task;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -8,13 +8,14 @@ import lombok.NoArgsConstructor;
import lombok.With; import lombok.With;
import nu.marginalia.control.svc.ProcessOutboxFactory; import nu.marginalia.control.svc.ProcessOutboxFactory;
import nu.marginalia.control.svc.ProcessService; import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.mqapi.converting.ConvertRequest; import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.mqapi.loading.LoadRequest; import nu.marginalia.mqapi.loading.LoadRequest;
import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorageBaseType; import nu.marginalia.db.storage.model.FileStorageBaseType;
import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
@ -22,19 +23,16 @@ import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.mqsm.graph.ResumeBehavior;
import nu.marginalia.search.client.SearchClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@Singleton @Singleton
public class ReconvertAndLoadFSM extends AbstractStateGraph { public class ReconvertAndLoadActor extends AbstractStateGraph {
// STATES // STATES
@ -44,10 +42,16 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
public static final String LOAD = "LOAD"; public static final String LOAD = "LOAD";
public static final String LOAD_WAIT = "LOAD-WAIT"; public static final String LOAD_WAIT = "LOAD-WAIT";
public static final String SWAP_LEXICON = "SWAP-LEXICON"; public static final String SWAP_LEXICON = "SWAP-LEXICON";
public static final String REPARTITION = "REPARTITION";
public static final String REPARTITION_WAIT = "REPARTITION-WAIT";
public static final String REINDEX = "REINDEX";
public static final String REINDEX_WAIT = "REINDEX-WAIT";
public static final String END = "END"; public static final String END = "END";
private final ProcessService processService; private final ProcessService processService;
private final MqOutbox mqConverterOutbox; private final MqOutbox mqConverterOutbox;
private final MqOutbox mqLoaderOutbox; private final MqOutbox mqLoaderOutbox;
private final MqOutbox indexOutbox;
private final FileStorageService storageService; private final FileStorageService storageService;
private final Gson gson; private final Gson gson;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@ -62,14 +66,16 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
}; };
@Inject @Inject
public ReconvertAndLoadFSM(StateFactory stateFactory, public ReconvertAndLoadActor(StateFactory stateFactory,
ProcessService processService, ProcessService processService,
ProcessOutboxFactory processOutboxFactory, ProcessOutboxFactory processOutboxFactory,
FileStorageService storageService, FileStorageService storageService,
Gson gson IndexClient indexClient,
Gson gson
) )
{ {
super(stateFactory); super(stateFactory);
this.indexOutbox = indexClient.outbox();
this.processService = processService; this.processService = processService;
this.mqConverterOutbox = processOutboxFactory.createConverterOutbox(); this.mqConverterOutbox = processOutboxFactory.createConverterOutbox();
this.mqLoaderOutbox = processOutboxFactory.createLoaderOutbox(); this.mqLoaderOutbox = processOutboxFactory.createLoaderOutbox();
@ -77,8 +83,16 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
this.gson = gson; this.gson = gson;
} }
@GraphState(name = INITIAL, next = RECONVERT) @GraphState(name = INITIAL,
next = RECONVERT,
description = """
Validate the input and transition to RECONVERT
""")
public Message init(FileStorageId crawlStorageId) throws Exception { public Message init(FileStorageId crawlStorageId) throws Exception {
if (null == crawlStorageId) {
error("This Actor requires a FileStorageId to be passed in as a parameter to INITIAL");
}
var storage = storageService.getStorage(crawlStorageId); var storage = storageService.getStorage(crawlStorageId);
if (storage == null) error("Bad storage id"); if (storage == null) error("Bad storage id");
@ -87,7 +101,14 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
return new Message().withCrawlStorageId(crawlStorageId); return new Message().withCrawlStorageId(crawlStorageId);
} }
@GraphState(name = RECONVERT, next = RECONVERT_WAIT, resume = ResumeBehavior.ERROR) @GraphState(name = RECONVERT,
next = RECONVERT_WAIT,
resume = ResumeBehavior.ERROR,
description = """
Allocate a storage area for the processed data,
then send a convert request to the converter and transition to RECONVERT_WAIT.
"""
)
public Message reconvert(Message message) throws Exception { public Message reconvert(Message message) throws Exception {
// Create processed data area // Create processed data area
@ -105,7 +126,15 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
.withProcessedStorageId(processedArea.id()) .withProcessedStorageId(processedArea.id())
.withConverterMsgId(id); .withConverterMsgId(id);
} }
@GraphState(name = RECONVERT_WAIT, next = LOAD, resume = ResumeBehavior.RETRY)
@GraphState(
name = RECONVERT_WAIT,
next = LOAD,
resume = ResumeBehavior.RETRY,
description = """
Wait for the converter to finish processing the data.
"""
)
public Message reconvertWait(Message message) throws Exception { public Message reconvertWait(Message message) throws Exception {
var rsp = waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, message.converterMsgId); var rsp = waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, message.converterMsgId);
@ -116,7 +145,13 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
} }
@GraphState(name = LOAD, next = LOAD_WAIT, resume = ResumeBehavior.ERROR) @GraphState(
name = LOAD,
next = LOAD_WAIT,
resume = ResumeBehavior.ERROR,
description = """
Send a load request to the loader and transition to LOAD_WAIT.
""")
public Message load(Message message) throws Exception { public Message load(Message message) throws Exception {
var request = new LoadRequest(message.processedStorageId); var request = new LoadRequest(message.processedStorageId);
@ -126,7 +161,14 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
} }
@GraphState(name = LOAD_WAIT, next = SWAP_LEXICON, resume = ResumeBehavior.RETRY) @GraphState(
name = LOAD_WAIT,
next = SWAP_LEXICON,
resume = ResumeBehavior.RETRY,
description = """
Wait for the loader to finish loading the data.
"""
)
public void loadWait(Message message) throws Exception { public void loadWait(Message message) throws Exception {
var rsp = waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, message.loaderMsgId); var rsp = waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, message.loaderMsgId);
@ -136,7 +178,15 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
@GraphState(name = SWAP_LEXICON, next = END, resume = ResumeBehavior.RETRY) @GraphState(
name = SWAP_LEXICON,
next = REPARTITION,
resume = ResumeBehavior.RETRY,
description = """
Move the lexicon from the LEXICON_STAGING area to the LEXICON_LIVE area,
then instruct the index-service to reload the lexicon.
"""
)
public void swapLexicon(Message message) throws Exception { public void swapLexicon(Message message) throws Exception {
var live = storageService.getStorageByType(FileStorageType.LEXICON_LIVE); var live = storageService.getStorageByType(FileStorageType.LEXICON_LIVE);
@ -144,22 +194,65 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
var fromSource = staging.asPath().resolve("dictionary.dat"); var fromSource = staging.asPath().resolve("dictionary.dat");
var liveDest = live.asPath().resolve("dictionary.dat"); var liveDest = live.asPath().resolve("dictionary.dat");
// Backup live lexicon
var backupBase = storageService.getStorageBase(FileStorageBaseType.BACKUP);
var backup = storageService.allocateTemporaryStorage(backupBase, FileStorageType.BACKUP,
"lexicon", "Lexicon Backup; " + LocalDateTime.now());
Path backupDest = backup.asPath().resolve("dictionary.dat");
logger.info("Moving " + liveDest + " to " + backupDest);
Files.move(liveDest, backupDest);
// Swap in new lexicon // Swap in new lexicon
logger.info("Moving " + fromSource + " to " + liveDest); logger.info("Moving " + fromSource + " to " + liveDest);
Files.move(fromSource, liveDest, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE); Files.move(fromSource, liveDest, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
} }
@GraphState(
name = REPARTITION,
next = REPARTITION_WAIT,
description = """
Instruct the index-service to repartition the index then transition to REPARTITION_WAIT.
"""
)
public Long repartition() throws Exception {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, "");
}
@GraphState(
name = REPARTITION_WAIT,
next = REINDEX,
resume = ResumeBehavior.RETRY,
description = """
Wait for the index-service to finish repartitioning the index.
"""
)
public void repartitionReply(Long id) throws Exception {
var rsp = indexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
@GraphState(
name = REINDEX,
next = REINDEX_WAIT,
description = """
Instruct the index-service to reindex the data then transition to REINDEX_WAIT.
"""
)
public Long reindex() throws Exception {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, "");
}
@GraphState(
name = REINDEX_WAIT,
next = END,
resume = ResumeBehavior.RETRY,
description = """
Wait for the index-service to finish reindexing the data.
"""
)
public void reindexReply(Long id) throws Exception {
var rsp = indexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception { public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception {
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
@ -170,6 +263,7 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
return outbox.waitResponse(id, 1, TimeUnit.SECONDS); return outbox.waitResponse(id, 1, TimeUnit.SECONDS);
} }
catch (TimeoutException ex) { catch (TimeoutException ex) {
// Maybe the process died, wait a moment for it to restart
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
error("Process " + processId + " died and did not re-launch"); error("Process " + processId + " died and did not re-launch");
} }
@ -180,7 +274,7 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
public boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException { public boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException {
// Wait for process to start // Wait for process to start
long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30); long deadline = System.currentTimeMillis() + unit.toMillis(duration);
while (System.currentTimeMillis() < deadline) { while (System.currentTimeMillis() < deadline) {
if (processService.isRunning(processId)) if (processService.isRunning(processId))
return true; return true;

View File

@ -1,107 +0,0 @@
package nu.marginalia.control.fsm;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.control.model.ControlProcess;
import nu.marginalia.control.model.ControlProcessState;
import nu.marginalia.control.fsm.monitor.*;
import nu.marginalia.control.fsm.monitor.ConverterMonitorFSM;
import nu.marginalia.control.fsm.monitor.LoaderMonitorFSM;
import nu.marginalia.control.fsm.task.ReconvertAndLoadFSM;
import nu.marginalia.control.fsm.task.RepartitionReindexFSM;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqsm.StateMachine;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.state.MachineState;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@Singleton
public class ControlFSMs {
private final ServiceEventLog eventLog;
private final Gson gson;
private final MessageQueueFactory messageQueueFactory;
public Map<ControlProcess, StateMachine> stateMachines = new HashMap<>();
@Inject
public ControlFSMs(MessageQueueFactory messageQueueFactory,
GsonFactory gsonFactory,
BaseServiceParams baseServiceParams,
RepartitionReindexFSM repartitionReindexFSM,
ReconvertAndLoadFSM reconvertAndLoadFSM,
ConverterMonitorFSM converterMonitorFSM,
LoaderMonitorFSM loaderMonitor,
MessageQueueMonitorFSM messageQueueMonitor,
ProcessLivenessMonitorFSM processMonitorFSM,
FileStorageMonitorFSM fileStorageMonitorFSM
) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
this.gson = gsonFactory.get();
register(ControlProcess.REPARTITION_REINDEX, repartitionReindexFSM);
register(ControlProcess.RECONVERT_LOAD, reconvertAndLoadFSM);
register(ControlProcess.CONVERTER_MONITOR, converterMonitorFSM);
register(ControlProcess.LOADER_MONITOR, loaderMonitor);
register(ControlProcess.MESSAGE_QUEUE_MONITOR, messageQueueMonitor);
register(ControlProcess.PROCESS_LIVENESS_MONITOR, processMonitorFSM);
register(ControlProcess.FILE_STORAGE_MONITOR, fileStorageMonitorFSM);
}
private void register(ControlProcess process, AbstractStateGraph graph) {
var sm = new StateMachine(messageQueueFactory, process.id(), UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function));
stateMachines.put(process, sm);
}
private void logStateChange(ControlProcess process, String state) {
eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state);
}
public void startFrom(ControlProcess process, String state) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state);
}
public void start(ControlProcess process) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init();
}
public <T> void startFrom(ControlProcess process, String state, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, gson.toJson(arg));
}
public <T> void start(ControlProcess process, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(gson.toJson(arg));
}
@SneakyThrows
public void stop(ControlProcess fsm) {
stateMachines.get(fsm).abortExecution();
}
public Map<ControlProcess, MachineState> getMachineStates() {
return stateMachines.entrySet().stream().collect(
Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().getState())
);
}
}

View File

@ -1,74 +0,0 @@
package nu.marginalia.control.fsm.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
@Singleton
public class RepartitionReindexFSM extends AbstractStateGraph {
private final MqOutbox indexOutbox;
// STATES
public static final String INITIAL = "INITIAL";
public static final String REPARTITION = "REPARTITION";
public static final String REPARTITION_WAIT = "REPARTITION-WAIT";
public static final String REINDEX = "REINDEX";
public static final String REINDEX_WAIT = "REINDEX-WAIT";
public static final String END = "END";
@Inject
public RepartitionReindexFSM(StateFactory stateFactory,
IndexClient indexClient) {
super(stateFactory);
indexOutbox = indexClient.outbox();
}
@GraphState(name = INITIAL, next = REPARTITION)
public void init() throws Exception {
var rsp = indexOutbox.send(IndexMqEndpoints.INDEX_IS_BLOCKED, "");
if (rsp.payload().equalsIgnoreCase("true")) {
error("Index is blocked");
}
}
@GraphState(name = REPARTITION, next = REPARTITION_WAIT)
public Long repartition() throws Exception {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, "");
}
@GraphState(name = REPARTITION_WAIT, next = REINDEX, resume = ResumeBehavior.RETRY)
public void repartitionReply(Long id) throws Exception {
var rsp = indexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
@GraphState(name = REINDEX, next = REINDEX_WAIT)
public Long reindex() throws Exception {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, "");
}
@GraphState(name = REINDEX_WAIT, next = END, resume = ResumeBehavior.RETRY)
public void reindexReply(Long id) throws Exception {
var rsp = indexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
}

View File

@ -1,7 +1,6 @@
package nu.marginalia.control.model; package nu.marginalia.control.model;
public enum ControlProcess { public enum Actor {
REPARTITION_REINDEX,
RECONVERT_LOAD, RECONVERT_LOAD,
CONVERTER_MONITOR, CONVERTER_MONITOR,
LOADER_MONITOR, LOADER_MONITOR,

View File

@ -1,6 +1,6 @@
package nu.marginalia.control.model; package nu.marginalia.control.model;
public record ControlProcessState(String name, String state, boolean terminal) { public record ActorRunState(String name, String state, boolean terminal) {
public String stateIcon() { public String stateIcon() {
if (terminal) { if (terminal) {
return "\uD83D\uDE34"; return "\uD83D\uDE34";

View File

@ -0,0 +1,19 @@
package nu.marginalia.control.model;
import nu.marginalia.mqsm.graph.GraphState;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
public record ActorState(String name,
boolean current,
List<String> transitions,
String description) {
public ActorState(GraphState gs, boolean current) {
this(gs.name(), current, toTransitions(gs.next(), gs.transitions()), gs.description());
}
private static List<String> toTransitions(String next, String[] transitions) {
return Stream.concat(Stream.of(next), Arrays.stream(transitions)).distinct().toList();
}
}

View File

@ -0,0 +1,51 @@
package nu.marginalia.control.model;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.state.MachineState;
import java.util.*;
import java.util.stream.Collectors;
public record ActorStateGraph(List<ActorState> states) {
public ActorStateGraph(AbstractStateGraph graph, MachineState currentState) {
this(getStateList(graph, currentState));
}
private static List<ActorState> getStateList(
AbstractStateGraph graph,
MachineState currentState)
{
Map<String, GraphState> declaredStates = graph.declaredStates().stream().collect(Collectors.toMap(GraphState::name, gs -> gs));
Set<GraphState> seenStates = new HashSet<>(declaredStates.size());
LinkedList<GraphState> edge = new LinkedList<>();
List<ActorState> statesList = new ArrayList<>(declaredStates.size());
edge.add(declaredStates.get("INITIAL"));
while (!edge.isEmpty()) {
var first = edge.removeFirst();
if (first == null || !seenStates.add(first)) {
continue;
}
statesList.add(new ActorState(first, currentState.name().equals(first.name())));
edge.add(declaredStates.get(first.next()));
for (var transition : first.transitions()) {
edge.add(declaredStates.get(transition));
}
}
if (!declaredStates.containsKey("ERROR")) {
statesList.add(new ActorState("ERROR", currentState.name().equals("ERROR"), List.of(), "Terminal error state"));
}
if (!declaredStates.containsKey("END")) {
statesList.add(new ActorState("END", currentState.name().equals("END"), List.of(), "The machine terminated successfully"));
}
return statesList;
}
}

View File

@ -2,44 +2,50 @@ package nu.marginalia.control.svc;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.control.fsm.ControlFSMs; import nu.marginalia.control.actor.ControlActors;
import nu.marginalia.control.fsm.task.ReconvertAndLoadFSM; import nu.marginalia.control.actor.task.ReconvertAndLoadActor;
import nu.marginalia.control.model.ControlProcess; import nu.marginalia.control.model.Actor;
import nu.marginalia.control.model.ControlProcessState; import nu.marginalia.control.model.ActorRunState;
import nu.marginalia.control.model.ActorStateGraph;
import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.mqsm.state.MachineState; import nu.marginalia.mqsm.state.MachineState;
import spark.Request; import spark.Request;
import spark.Response; import spark.Response;
import java.util.List;
import java.util.Map; import java.util.Map;
@Singleton @Singleton
public class ControlFsmService { public class ControlActorService {
private final ControlFSMs controlFSMs; private final ControlActors controlActors;
@Inject @Inject
public ControlFsmService(ControlFSMs controlFSMs) { public ControlActorService(ControlActors controlActors) {
this.controlFSMs = controlFSMs; this.controlActors = controlActors;
}
public Object getActorStateGraph(Actor actor) {
var currentState = controlActors.getActorStates().get(actor);
return new ActorStateGraph(controlActors.getActorDefinition(actor), currentState);
} }
public Object startFsm(Request req, Response rsp) throws Exception { public Object startFsm(Request req, Response rsp) throws Exception {
controlFSMs.start( controlActors.start(
ControlProcess.valueOf(req.params("fsm").toUpperCase()) Actor.valueOf(req.params("fsm").toUpperCase())
); );
return ""; return "";
} }
public Object stopFsm(Request req, Response rsp) throws Exception { public Object stopFsm(Request req, Response rsp) throws Exception {
controlFSMs.stop( controlActors.stop(
ControlProcess.valueOf(req.params("fsm").toUpperCase()) Actor.valueOf(req.params("fsm").toUpperCase())
); );
return ""; return "";
} }
public Object triggerProcessing(Request request, Response response) throws Exception { public Object triggerProcessing(Request request, Response response) throws Exception {
controlFSMs.start( controlActors.start(
ControlProcess.RECONVERT_LOAD, Actor.RECONVERT_LOAD,
FileStorageId.of(Integer.parseInt(request.params("fid"))) FileStorageId.of(Integer.parseInt(request.params("fid")))
); );
return ""; return "";
@ -49,24 +55,25 @@ public class ControlFsmService {
var fid = FileStorageId.of(Integer.parseInt(request.params("fid"))); var fid = FileStorageId.of(Integer.parseInt(request.params("fid")));
// Start the FSM from the intermediate state that triggers the load // Start the FSM from the intermediate state that triggers the load
controlFSMs.startFrom( controlActors.startFrom(
ControlProcess.RECONVERT_LOAD, Actor.RECONVERT_LOAD,
ReconvertAndLoadFSM.LOAD, ReconvertAndLoadActor.LOAD,
new ReconvertAndLoadFSM.Message(null, fid, 0L, 0L) new ReconvertAndLoadActor.Message(null, fid, 0L, 0L)
); );
return ""; return "";
} }
public Object getFsmStates() { public Object getActorStates() {
return controlFSMs.getMachineStates().entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> { return controlActors.getActorStates().entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> {
final MachineState state = e.getValue(); final MachineState state = e.getValue();
final String machineName = e.getKey().name(); final String machineName = e.getKey().name();
final String stateName = state.name(); final String stateName = state.name();
final boolean terminal = state.isFinal(); final boolean terminal = state.isFinal();
return new ControlProcessState(machineName, stateName, terminal); return new ActorRunState(machineName, stateName, terminal);
}).toList(); }).toList();
} }
}
}

View File

@ -3,7 +3,9 @@ package nu.marginalia.control.svc;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.model.MessageQueueEntry; import nu.marginalia.control.model.MessageQueueEntry;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
@ -54,4 +56,74 @@ public class MessageQueueViewService {
} }
} }
public MessageQueueEntry getMessage(long id) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID=?
""")) {
query.setLong(1, id);
var rs = query.executeQuery();
if (rs.next()) {
return new MessageQueueEntry(
rs.getLong("ID"),
rs.getLong("RELATED_ID"),
rs.getString("SENDER_INBOX"),
rs.getString("RECIPIENT_INBOX"),
rs.getString("FUNCTION"),
rs.getString("PAYLOAD"),
rs.getString("OWNER_INSTANCE"),
rs.getLong("OWNER_TICK"),
rs.getString("STATE"),
rs.getTimestamp("CREATED_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getTimestamp("UPDATED_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getInt("TTL")
);
}
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
return null;
}
public Object getLastEntriesForInbox(String inbox, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE RECIPIENT_INBOX=?
ORDER BY ID DESC
LIMIT ?
""")) {
query.setString(1, inbox);
query.setInt(2, n);
List<MessageQueueEntry> entries = new ArrayList<>(n);
var rs = query.executeQuery();
while (rs.next()) {
entries.add(new MessageQueueEntry(
rs.getLong("ID"),
rs.getLong("RELATED_ID"),
rs.getString("SENDER_INBOX"),
rs.getString("RECIPIENT_INBOX"),
rs.getString("FUNCTION"),
rs.getString("PAYLOAD"),
rs.getString("OWNER_INSTANCE"),
rs.getLong("OWNER_TICK"),
rs.getString("STATE"),
rs.getTimestamp("CREATED_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getTimestamp("UPDATED_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getInt("TTL")
));
}
return entries;
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
} }

View File

@ -8,6 +8,19 @@ body {
grid-template-areas: grid-template-areas:
"left right"; "left right";
} }
.toggle-switch-off {
border-left: 5px solid #f00;
width: 8ch;
}
.toggle-switch-on {
border-right: 5px solid #080;
width: 8ch;
}
.toggle-switch-active {
border-left: 5px solid #00f;
border-right: 5px solid #00f;
width: 8ch;
}
#services .missing { #services .missing {
color: #800; color: #800;
} }
@ -54,4 +67,10 @@ nav ul li a.current {
body > section { body > section {
grid-area: right; grid-area: right;
}
#state-graph .current-state td:first-of-type {
border-right: 1em solid #000;
font-weight: bold;
border-color: #000;
} }

View File

@ -0,0 +1,22 @@
<!DOCTYPE html>
<html>
<head>
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="/style.css" />
</head>
<body>
{{> control/partials/nav}}
<section>
<h1>{{actor}}</h1>
{{> control/partials/actor-state-graph}}
{{> control/partials/message-queue-table}}
</section>
</body>
<script src="/refresh.js"></script>
<script>
window.setInterval(() => {
refresh(["queue", "state-graph"]);
}, 2000);
</script>
</html>

View File

@ -9,14 +9,14 @@
{{> control/partials/nav}} {{> control/partials/nav}}
<section> <section>
{{> control/partials/processes-table}} {{> control/partials/processes-table}}
{{> control/partials/fsm-table}} {{> control/partials/actors-table}}
{{> control/partials/message-queue-table}} {{> control/partials/message-queue-table}}
</section> </section>
</body> </body>
<script src="/refresh.js"></script> <script src="/refresh.js"></script>
<script> <script>
window.setInterval(() => { window.setInterval(() => {
refresh(["processes", "fsms", "queue"]); refresh(["processes", "actors", "queue"]);
}, 2000); }, 2000);
</script> </script>
</html> </html>

View File

@ -0,0 +1,16 @@
<h1>Actor State Graph</h1>
<table id="state-graph">
<tr>
<th>State</th>
<th>Transitions</th>
<th>Description</th>
</tr>
{{#each state-graph.states}}
<tr {{#if current}}class="current-state"{{/if}}>
<td>{{name}}</td>
<td>{{#each transitions}} {{.}} {{/each}}</td>
<td>{{description}}</td>
</tr>
{{/each}}
</table>

View File

@ -0,0 +1,50 @@
<h1>Actors</h1>
<table id="actors">
<tr>
<th>Actor</th>
<th>State</th>
<th>Action</th>
</tr>
{{#each actors}}
<tr>
<td><a href="/actors/{{name}}">{{name}}</a></td>
<td>{{stateIcon}}&nbsp;{{state}}</td>
<td>
{{#unless terminal}}
<form id="toggle-{{name}}"
action="/fsms/{{name}}/stop"
method="post"
onsubmit="return toggleActorSwitch('{{name}}')">
<input type="submit" value="On" class="toggle-switch-on" id="toggle-{{name}}-button">
</form>
{{/unless}}
{{#if terminal}}
<form id="toggle-{{name}}"
action="/fsms/{{name}}/start"
method="post"
onsubmit="return toggleActorSwitch('{{name}}')">
<input type="submit" value="Off" class="toggle-switch-off" id="toggle-{{name}}-button">
</form>
{{/if}}
</td>
</tr>
{{/each}}
</table>
<script>
function toggleActorSwitch(name) {
var toggle = document.getElementById("toggle-" + name + "-button");
toggle.classList.remove('toggle-switch-on');
toggle.classList.remove('toggle-switch-off');
toggle.classList.add("toggle-switch-active");
toggle.value = '...';
var form = document.getElementById("toggle-" + name);
var xhr = new XMLHttpRequest();
xhr.open(form.method, form.action, true);
xhr.send();
return false;
}
</script>

View File

@ -1,23 +0,0 @@
<h1>FSMs</h1>
<table id="fsms">
<tr>
<th>FSM</th>
<th>State</th>
<th>Action</th>
</tr>
{{#each fsms}}
<tr>
<td>{{name}}</td>
<td>{{stateIcon}}&nbsp;{{state}}</td>
<td>
{{#unless terminal}}
<form action="/fsms/{{name}}/stop" method="post"><input type="submit" value="Stop"></form>
{{/unless}}
{{#if terminal}}
<form action="/fsms/{{name}}/start" method="post"><input type="submit" value="Start"></form>
{{/if}}
</td>
</tr>
{{/each}}
</table>

View File

@ -12,7 +12,7 @@
{{#each messages}} {{#each messages}}
<tr> <tr>
<td>{{stateCode}}&nbsp;{{state}}</td> <td>{{stateCode}}&nbsp;{{state}}</td>
<td>{{id}}</td> <td><a onClick="editMessage({{id}})" href="#">{{id}}</a></td>
<td>{{recipientInbox}}</td> <td>{{recipientInbox}}</td>
<td>{{function}}</td> <td>{{function}}</td>
<td title="{{ownerInstanceFull}}"> <td title="{{ownerInstanceFull}}">
@ -29,4 +29,62 @@
<td>{{updatedTime}}</td> <td>{{updatedTime}}</td>
</tr> </tr>
{{/each}} {{/each}}
</table> </table>
<dialog id="edit-message">
<h1>Edit Message</h1>
<form method="post" action="/message/:id/edit">
<div class="form-grid">
<label for="id">ID</label> <input readonly type="text" name="id" id="id" pattern="\d+" value="12" />
<label for="relatedId">Related ID</label> <input type="text" name="relatedId" id="relatedId" pattern="\d+" />
<label for="state">State</label>
<select name="state" id="state">
<option value="NEW">NEW</option>
<option value="ACK">ACK</option>
<option value="OK">OK</option>
<option value="ERR">ERR</option>
<option value="DEAD">DEAD</option>
</select>
<label for="sender">Sender</label> <input type="text" name="sender" id="sender" />
<label for="recipient">Recipient</label> <input type="text" name="recipient" id="recipient" />
<label for="function">Function</label> <input type="text" name="function" id="function" />
<label for="payload">Payload</label> <input type="text" name="payload" id="payload" />
<label for="ttl">TTL</label> <input type="text" name="ttl" id="ttl" />
<label for="ownerInstance">Owner Instance</label> <input type="text" name="ownerInstance" id="ownerInstance" />
<label for="ownerTick" pattern="\d+">Owner Tick</label> <input type="text" name="ownerTick" id="ownerTick" />
<div>
<input type="submit" value="Save" style="float:left" />
</div>
<div>
<input type="button" value="Cancel" style="float:right" onClick="document.getElementById('edit-message').close()"/>
</div>
</div>
</form>
</dialog>
<script>
function editMessage(id) {
var message = document.getElementById('edit-message');
var xhr = new XMLHttpRequest();
xhr.open('GET', '/messages/' + id, true);
xhr.onload = function () {
if (xhr.status === 200) {
var data = JSON.parse(xhr.responseText);
message.querySelector('#edit-message #id').value = data.id;
message.querySelector('#edit-message #relatedId').value = data.relatedId;
message.querySelector('#edit-message #state').value = data.state;
message.querySelector('#edit-message #sender').value = data.sender;
message.querySelector('#edit-message #recipient').value = data.recipient;
message.querySelector('#edit-message #function').value = data.function;
message.querySelector('#edit-message #payload').value = data.payload;
message.querySelector('#edit-message #ttl').value = data.ttl;
message.querySelector('#edit-message #ownerInstance').value = data.ownerInstance;
message.querySelector('#edit-message #ownerTick').value = data.ownerTick;
message.showModal();
}
};
xhr.send();
}
</script>

View File

@ -2,7 +2,7 @@
<ul> <ul>
<li><a href="/">Overview</a></li> <li><a href="/">Overview</a></li>
<li><a href="/services">Services</a></li> <li><a href="/services">Services</a></li>
<li><a href="/processes">Processes</a></li> <li><a href="/actors">Actors</a></li>
<li><a href="/storage">Storage</a></li> <li><a href="/storage">Storage</a></li>
</ul> </ul>
</nav> </nav>