diff --git a/code/api/index-api/build.gradle b/code/api/index-api/build.gradle index edb6056d..ae64f944 100644 --- a/code/api/index-api/build.gradle +++ b/code/api/index-api/build.gradle @@ -16,7 +16,7 @@ dependencies { implementation project(':code:common:config') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') - implementation project(':code:common:message-queue') + implementation project(':code:libraries:message-queue') implementation project(':code:features-index:index-query') implementation libs.lombok diff --git a/code/api/search-api/build.gradle b/code/api/search-api/build.gradle index ba00a702..c141aa6e 100644 --- a/code/api/search-api/build.gradle +++ b/code/api/search-api/build.gradle @@ -14,7 +14,7 @@ java { dependencies { implementation project(':code:common:model') implementation project(':code:common:config') - implementation project(':code:common:message-queue') + implementation project(':code:libraries:message-queue') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/AbstractStateGraph.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/AbstractStateGraph.java deleted file mode 100644 index 648aad63..00000000 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/AbstractStateGraph.java +++ /dev/null @@ -1,178 +0,0 @@ -package nu.marginalia.mqsm.graph; - -import nu.marginalia.mqsm.state.MachineState; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.state.StateTransition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.*; - -public abstract class AbstractStateGraph { - private final StateFactory stateFactory; - private static final Logger logger = LoggerFactory.getLogger(AbstractStateGraph.class); - - public AbstractStateGraph(StateFactory stateFactory) { - this.stateFactory = stateFactory; - } - - /** User-facing description of the actor. */ - public abstract String describe(); - - public void transition(String state) { - throw new ControlFlowException(state, null); - } - - public void transition(String state, T payload) { - throw new ControlFlowException(state, payload); - } - - public void error() { - throw new ControlFlowException("ERROR", ""); - } - - public void error(T payload) { - throw new ControlFlowException("ERROR", payload); - } - - public void error(Exception ex) { - throw new ControlFlowException("ERROR", ex.getClass().getSimpleName() + ":" + ex.getMessage()); - } - - /** Check whether there is an INITIAL state that can be directly initialized - * without declared parameters. */ - public boolean isDirectlyInitializable() { - for (var method : getClass().getMethods()) { - var gs = method.getAnnotation(GraphState.class); - if (gs == null) { - continue; - } - if ("INITIAL".equals(gs.name()) && method.getParameterCount() == 0) { - return true; - } - } - return false; - } - - public Map declaredStates() { - Map ret = new HashMap<>(); - - for (var method : getClass().getMethods()) { - var gs = method.getAnnotation(GraphState.class); - if (gs != null) { - ret.put(gs.name(), gs); - } - } - - return ret; - } - - - public Set terminalStates() { - Set ret = new HashSet<>(); - - for (var method : getClass().getMethods()) { - var gs = method.getAnnotation(TerminalGraphState.class); - if (gs != null) { - ret.add(gs); - } - } - - return ret; - } - - public List asStateList() { - List ret = new ArrayList<>(); - - for (var method : getClass().getMethods()) { - var gs = method.getAnnotation(GraphState.class); - if (gs != null) { - ret.add(graphState(method, gs)); - } - - var ts = method.getAnnotation(TerminalGraphState.class); - if (ts != null) { - ret.add(stateFactory.create(ts.name(), ResumeBehavior.ERROR, () -> { - throw new ControlFlowException(ts.name(), null); - })); - } - } - - return ret; - } - - private MachineState graphState(Method method, GraphState gs) { - - var parameters = method.getParameterTypes(); - boolean returnsVoid = method.getGenericReturnType().equals(Void.TYPE); - - if (parameters.length == 0) { - return stateFactory.create(gs.name(), gs.resume(), () -> { - try { - if (returnsVoid) { - method.invoke(this); - return StateTransition.to(gs.next()); - } else { - Object ret = method.invoke(this); - return stateFactory.transition(gs.next(), ret); - } - } - catch (Exception e) { - return invocationExceptionToStateTransition(gs.name(), e); - } - }); - } - else if (parameters.length == 1) { - return stateFactory.create(gs.name(), gs.resume(), parameters[0], (param) -> { - try { - if (returnsVoid) { - method.invoke(this, param); - return StateTransition.to(gs.next()); - } else { - Object ret = method.invoke(this, param); - return stateFactory.transition(gs.next(), ret); - } - } catch (Exception e) { - return invocationExceptionToStateTransition(gs.name(), e); - } - }); - } - else { - // We permit only @GraphState-annotated methods like this: - // - // void foo(); - // void foo(Object bar); - // Object foo(); - // Object foo(Object bar); - - throw new IllegalStateException("StateGraph " + - getClass().getSimpleName() + - " has invalid method signature for method " + - method.getName() + - ": Expected 0 or 1 parameter(s) but found " + - Arrays.toString(parameters)); - } - } - - private StateTransition invocationExceptionToStateTransition(String state, Throwable ex) { - while (ex instanceof InvocationTargetException e) { - if (e.getCause() != null) ex = ex.getCause(); - } - - if (ex instanceof ControlFlowException cfe) { - return stateFactory.transition(cfe.getState(), cfe.getPayload()); - } - else if (ex instanceof InterruptedException intE) { - logger.error("State execution was interrupted " + state); - return StateTransition.to("ERR", "Execution interrupted"); - } - else { - logger.error("Error in state invocation " + state, ex); - return StateTransition.to("ERROR", - "Exception: " + ex.getClass().getSimpleName() + "/" + ex.getMessage()); - } - } - -} diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ControlFlowException.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ControlFlowException.java deleted file mode 100644 index 12e5b569..00000000 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ControlFlowException.java +++ /dev/null @@ -1,22 +0,0 @@ -package nu.marginalia.mqsm.graph; - -/** Exception thrown by a state to indicate that the state machine should jump to a different state. */ -public class ControlFlowException extends RuntimeException { - private final String state; - private final Object payload; - - public ControlFlowException(String state, Object payload) { - this.state = state; - this.payload = payload; - } - - public String getState() { - return state; - } - - public Object getPayload() { - return payload; - } - - public StackTraceElement[] getStackTrace() { return new StackTraceElement[0]; } -} diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/MachineState.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/MachineState.java deleted file mode 100644 index 84a0b11c..00000000 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/MachineState.java +++ /dev/null @@ -1,14 +0,0 @@ -package nu.marginalia.mqsm.state; - -import nu.marginalia.mqsm.graph.ResumeBehavior; - -public interface MachineState { - String name(); - - StateTransition next(String message); - - ResumeBehavior resumeBehavior(); - - boolean isFinal(); - -} diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/StateTransition.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/StateTransition.java deleted file mode 100644 index 6ca5d387..00000000 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/StateTransition.java +++ /dev/null @@ -1,11 +0,0 @@ -package nu.marginalia.mqsm.state; - -public record StateTransition(String state, String message) { - public static StateTransition to(String state) { - return new StateTransition(state, ""); - } - - public static StateTransition to(String state, String message) { - return new StateTransition(state, message); - } -} diff --git a/code/common/service/build.gradle b/code/common/service/build.gradle index 156b826f..edf6b40f 100644 --- a/code/common/service/build.gradle +++ b/code/common/service/build.gradle @@ -12,7 +12,7 @@ java { dependencies { implementation project(':code:common:service-client') implementation project(':code:common:service-discovery') - implementation project(':code:common:message-queue') + implementation project(':code:libraries:message-queue') implementation project(':code:common:db') implementation libs.lombok diff --git a/code/common/message-queue/build.gradle b/code/libraries/message-queue/build.gradle similarity index 81% rename from code/common/message-queue/build.gradle rename to code/libraries/message-queue/build.gradle index d71ca1d4..82a93637 100644 --- a/code/common/message-queue/build.gradle +++ b/code/libraries/message-queue/build.gradle @@ -10,10 +10,6 @@ java { } dependencies { - implementation project(':code:common:service-client') - implementation project(':code:common:service-discovery') - implementation project(':code:common:db') - implementation libs.lombok annotationProcessor libs.lombok @@ -22,7 +18,6 @@ dependencies { implementation libs.gson implementation libs.rxjava - implementation libs.bundles.prometheus implementation libs.bundles.slf4j implementation libs.bucket4j @@ -32,6 +27,7 @@ dependencies { testImplementation libs.bundles.slf4j.test testImplementation libs.bundles.junit testImplementation libs.mockito + testImplementation project(':code:common:db') testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') testImplementation 'org.testcontainers:mariadb:1.17.4' diff --git a/code/common/message-queue/msgstate.svg b/code/libraries/message-queue/msgstate.svg similarity index 100% rename from code/common/message-queue/msgstate.svg rename to code/libraries/message-queue/msgstate.svg diff --git a/code/common/message-queue/readme.md b/code/libraries/message-queue/readme.md similarity index 91% rename from code/common/message-queue/readme.md rename to code/libraries/message-queue/readme.md index d71459dd..cc7fbb60 100644 --- a/code/common/message-queue/readme.md +++ b/code/libraries/message-queue/readme.md @@ -26,25 +26,25 @@ The inbox implementations as well as the outbox can be constructed via the `Mess The MQSM is a finite state machine that is backed by the message queue used to implement an Actor style paradigm. -The machine itself is defined through a class that extends the 'AbstractStateGraph'; with state transitions and +The machine itself is defined through a class that extends the 'AbstractActorPrototype'; with state transitions and names defined as implementations. Example: ```java -class ExampleStateMachine extends AbstractStateGraph { +class ExampleStateMachine extends AbstractActorPrototype { - @GraphState(name = "INITIAL", next="GREET") + @ActorState(name = "INITIAL", next="GREET") public void initial() { return "World"; // passed to the next state } - @GraphState(name = "GREET", next="COUNT-TO-FIVE") + @ActorState(name = "GREET", next="COUNT-TO-FIVE") public void greet(String name) { System.out.println("Hello " + name); } - @GraphState(name = "COUNT-TO-FIVE", next="END") + @ActorState(name = "COUNT-TO-FIVE", next="END") public void countToFive(Integer value) { // value is passed from the previous state, since greet didn't pass a value, // null will be the default. @@ -69,7 +69,7 @@ class ExampleStateMachine extends AbstractStateGraph { // Default transition is to END } - @GraphState(name="END") + @ActorState(name="END") public void end() { System.out.println("Done"); } diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateFactory.java similarity index 52% rename from code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateFactory.java index 6df583b3..2117df9a 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateFactory.java @@ -1,34 +1,33 @@ -package nu.marginalia.mqsm; +package nu.marginalia.actor; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import nu.marginalia.mqsm.graph.ResumeBehavior; -import nu.marginalia.mqsm.state.MachineState; -import nu.marginalia.mqsm.state.StateTransition; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.ActorStateInstance; +import nu.marginalia.actor.state.ActorStateTransition; import java.util.function.Function; import java.util.function.Supplier; -@Singleton -public class StateFactory { +/** Factory for creating actor state instances. You probably don't want to use this directly. + *

+ * Use AbstractStatePrototype instead. */ +public class ActorStateFactory { private final Gson gson; - @Inject - public StateFactory(Gson gson) { + public ActorStateFactory(Gson gson) { this.gson = gson; } - public MachineState create(String name, ResumeBehavior resumeBehavior, Class param, Function logic) { - return new MachineState() { + public ActorStateInstance create(String name, ActorResumeBehavior resumeBehavior, Class param, Function logic) { + return new ActorStateInstance() { @Override public String name() { return name; } @Override - public StateTransition next(String message) { + public ActorStateTransition next(String message) { if (message.isEmpty()) { return logic.apply(null); @@ -45,7 +44,7 @@ public class StateFactory { } @Override - public ResumeBehavior resumeBehavior() { + public ActorResumeBehavior resumeBehavior() { return resumeBehavior; } @@ -56,21 +55,21 @@ public class StateFactory { }; } - public MachineState create(String name, ResumeBehavior resumeBehavior, Supplier logic) { - return new MachineState() { + public ActorStateInstance create(String name, ActorResumeBehavior actorResumeBehavior, Supplier logic) { + return new ActorStateInstance() { @Override public String name() { return name; } @Override - public StateTransition next(String message) { + public ActorStateTransition next(String message) { return logic.get(); } @Override - public ResumeBehavior resumeBehavior() { - return resumeBehavior; + public ActorResumeBehavior resumeBehavior() { + return actorResumeBehavior; } @Override @@ -80,62 +79,62 @@ public class StateFactory { }; } - public StateTransition transition(String state) { - return StateTransition.to(state); + public ActorStateTransition transition(String state) { + return ActorStateTransition.to(state); } - public StateTransition transition(String state, Object message) { + public ActorStateTransition transition(String state, Object message) { if (null == message) { - return StateTransition.to(state); + return ActorStateTransition.to(state); } - return StateTransition.to(state, gson.toJson(message)); + return ActorStateTransition.to(state, gson.toJson(message)); } - public static class ErrorState implements MachineState { + static class ErrorStateInstance implements ActorStateInstance { @Override public String name() { return "ERROR"; } @Override - public StateTransition next(String message) { + public ActorStateTransition next(String message) { throw new UnsupportedOperationException(); } @Override - public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } + public ActorResumeBehavior resumeBehavior() { return ActorResumeBehavior.RETRY; } @Override public boolean isFinal() { return true; } } - public static class FinalState implements MachineState { + static class FinalState implements ActorStateInstance { @Override public String name() { return "END"; } @Override - public StateTransition next(String message) { + public ActorStateTransition next(String message) { throw new UnsupportedOperationException(); } @Override - public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } + public ActorResumeBehavior resumeBehavior() { return ActorResumeBehavior.RETRY; } @Override public boolean isFinal() { return true; } } - public static class ResumingState implements MachineState { + static class ResumingState implements ActorStateInstance { @Override public String name() { return "RESUMING"; } @Override - public StateTransition next(String message) { + public ActorStateTransition next(String message) { throw new UnsupportedOperationException(); } @Override - public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } + public ActorResumeBehavior resumeBehavior() { return ActorResumeBehavior.RETRY; } @Override public boolean isFinal() { return false; } diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/ActorStateMachine.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java similarity index 82% rename from code/common/message-queue/src/main/java/nu/marginalia/mqsm/ActorStateMachine.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java index cac47c6a..657fb6e2 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/ActorStateMachine.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java @@ -1,5 +1,6 @@ -package nu.marginalia.mqsm; +package nu.marginalia.actor; +import nu.marginalia.actor.prototype.ActorPrototype; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessageState; @@ -7,9 +8,8 @@ import nu.marginalia.mq.inbox.MqInboxResponse; import nu.marginalia.mq.inbox.MqSubscription; import nu.marginalia.mq.inbox.MqSynchronousInbox; import nu.marginalia.mq.outbox.MqOutbox; -import nu.marginalia.mqsm.graph.ResumeBehavior; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.state.*; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,23 +30,23 @@ public class ActorStateMachine { private final String queueName; - private volatile MachineState state; + private volatile ActorStateInstance state; private volatile ExpectedMessage expectedMessage = ExpectedMessage.anyUnrelated(); - private final MachineState errorState = new StateFactory.ErrorState(); - private final MachineState finalState = new StateFactory.FinalState(); - private final MachineState resumingState = new StateFactory.ResumingState(); + private final ActorStateInstance errorState = new ActorStateFactory.ErrorStateInstance(); + private final ActorStateInstance finalState = new ActorStateFactory.FinalState(); + private final ActorStateInstance resumingState = new ActorStateFactory.ResumingState(); private final List> stateChangeListeners = new ArrayList<>(); - private final Map allStates = new HashMap<>(); + private final Map allStates = new HashMap<>(); private final boolean isDirectlyInitializable; public ActorStateMachine(MessageQueueFactory messageQueueFactory, String queueName, UUID instanceUUID, - AbstractStateGraph stateGraph) + ActorPrototype statePrototype) { this.queueName = queueName; @@ -56,10 +56,10 @@ public class ActorStateMachine { smInbox.subscribe(new StateEventSubscription()); registerStates(List.of(errorState, finalState, resumingState)); - registerStates(stateGraph); - isDirectlyInitializable = stateGraph.isDirectlyInitializable(); + registerStates(statePrototype); + isDirectlyInitializable = statePrototype.isDirectlyInitializable(); - stateGraph.declaredStates().forEach((name, declaredState) -> { + statePrototype.declaredStates().forEach((name, declaredState) -> { if (!allStates.containsKey(name)) { throw new IllegalArgumentException("State " + name + " is not defined in the state graph"); } @@ -84,14 +84,14 @@ public class ActorStateMachine { } /** Register the state graph */ - void registerStates(List states) { + void registerStates(List states) { for (var state : states) { allStates.put(state.name(), state); } } /** Register the state graph */ - void registerStates(AbstractStateGraph states) { + void registerStates(ActorPrototype states) { registerStates(states.asStateList()); } @@ -128,7 +128,7 @@ public class ActorStateMachine { /** Initialize the state machine. */ public void init() throws Exception { - var transition = StateTransition.to("INITIAL"); + var transition = ActorStateTransition.to("INITIAL"); synchronized (this) { this.state = allStates.get(transition.state()); @@ -140,7 +140,7 @@ public class ActorStateMachine { /** Initialize the state machine. */ public void initFrom(String firstState) throws Exception { - var transition = StateTransition.to(firstState); + var transition = ActorStateTransition.to(firstState); synchronized (this) { this.state = allStates.get(transition.state()); @@ -152,7 +152,7 @@ public class ActorStateMachine { /** Initialize the state machine. */ public void init(String jsonEncodedArgument) throws Exception { - var transition = StateTransition.to("INITIAL", jsonEncodedArgument); + var transition = ActorStateTransition.to("INITIAL", jsonEncodedArgument); synchronized (this) { this.state = allStates.get(transition.state()); @@ -164,7 +164,7 @@ public class ActorStateMachine { /** Initialize the state machine. */ public void initFrom(String state, String jsonEncodedArgument) throws Exception { - var transition = StateTransition.to(state, jsonEncodedArgument); + var transition = ActorStateTransition.to(state, jsonEncodedArgument); synchronized (this) { this.state = allStates.get(transition.state()); @@ -212,15 +212,15 @@ public class ActorStateMachine { } } - private void resumeFromAck(MachineState resumeState, + private void resumeFromAck(ActorStateInstance resumeState, MqMessage message) { try { - if (resumeState.resumeBehavior().equals(ResumeBehavior.ERROR)) { + if (resumeState.resumeBehavior().equals(ActorResumeBehavior.ERROR)) { // The message is acknowledged, but the state does not support resuming smOutbox.sendNotice(expectedMessage.id, "ERROR", "Illegal resumption from ACK'ed state " + message.function()); } - else if (resumeState.resumeBehavior().equals(ResumeBehavior.RESTART)) { + else if (resumeState.resumeBehavior().equals(ActorResumeBehavior.RESTART)) { this.state = resumeState; // The message is already acknowledged, we flag it as dead and then send an identical message @@ -308,7 +308,7 @@ public class ActorStateMachine { } } - public MachineState getState() { + public ActorStateInstance getState() { return state; } @@ -371,38 +371,3 @@ public class ActorStateMachine { } } -/** ExpectedMessage guards against spurious state changes being triggered by old messages in the queue - * - * It contains the message id of the last message that was processed, and the messages sent by the state machine to - * itself via the message queue all have relatedId set to expectedMessageId. If the state machine is unitialized or - * in a terminal state, it will accept messages with relatedIds that are equal to -1. - * */ -class ExpectedMessage { - public final long id; - public ExpectedMessage(long id) { - this.id = id; - } - - public static ExpectedMessage expectThis(MqMessage message) { - return new ExpectedMessage(message.relatedId()); - } - - public static ExpectedMessage responseTo(MqMessage message) { - return new ExpectedMessage(message.msgId()); - } - - public static ExpectedMessage anyUnrelated() { - return new ExpectedMessage(-1); - } - - public static ExpectedMessage expectId(long id) { - return new ExpectedMessage(id); - } - - public boolean isExpected(MqMessage message) { - if (id < 0) - return true; - - return id == message.relatedId(); - } -} \ No newline at end of file diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ExpectedMessage.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ExpectedMessage.java new file mode 100644 index 00000000..f1779403 --- /dev/null +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ExpectedMessage.java @@ -0,0 +1,41 @@ +package nu.marginalia.actor; + +import nu.marginalia.mq.MqMessage; + +/** + * ExpectedMessage guards against spurious state changes being triggered by old messages in the queue + *

+ * It contains the message id of the last message that was processed, and the messages sent by the state machine to + * itself via the message queue all have relatedId set to expectedMessageId. If the state machine is unitialized or + * in a terminal state, it will accept messages with relatedIds that are equal to -1. + */ +class ExpectedMessage { + public final long id; + + ExpectedMessage(long id) { + this.id = id; + } + + public static ExpectedMessage expectThis(MqMessage message) { + return new ExpectedMessage(message.relatedId()); + } + + public static ExpectedMessage responseTo(MqMessage message) { + return new ExpectedMessage(message.msgId()); + } + + public static ExpectedMessage anyUnrelated() { + return new ExpectedMessage(-1); + } + + public static ExpectedMessage expectId(long id) { + return new ExpectedMessage(id); + } + + public boolean isExpected(MqMessage message) { + if (id < 0) + return true; + + return id == message.relatedId(); + } +} diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/actor/prototype/AbstractActorPrototype.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/prototype/AbstractActorPrototype.java new file mode 100644 index 00000000..28254b9f --- /dev/null +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/prototype/AbstractActorPrototype.java @@ -0,0 +1,237 @@ +package nu.marginalia.actor.prototype; + +import nu.marginalia.actor.ActorStateFactory; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.ActorTerminalState; +import nu.marginalia.actor.state.ActorStateInstance; +import nu.marginalia.actor.state.ActorStateTransition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.*; + +/** Base class for actors. The state graph is defined using public methods + * annotated with {@code @ActorState} and {@code @ActorTerminalState}. This class provide + * a mediation layer that translates these annotations into a state graph + * that can be used by the actor runtime. + *

.

+ *

+ * public class MyActor extends AbstractActorPrototype {
+ *   {@code @ActorState(name="INITIAL", next="STATE_1")}
+ *   public void initial() { ... }
+ *   {@code @ActorState(name="STATE_1", next="STATE_N")}
+ *   public void state1() { ... }
+ *   ...
+ * }
+ * 
+ *

+ * The prototype provides explicit transition() and error() methods that can be used + * to jump to a different state. Each of these methods come with a variant that has a + * parameter. The parameter will be passed as a payload to the next state. + *

+ *

The @ActorState annotation also provides a default next + * state that will be transitioned to automatically when the method returns. If the + * method returns a value, this value will be passed as a payload to the next state, + * and injected as a parameter to the handler method.

+ *

Caveat

+ * The jump functions are implemented using exceptions. This means that if you have + * a {@code try {} catch(Exception e)} block in your code or a {@code @SneakyThrows} + * annotation, you will catch the exception and prevent the transition. + * + */ +public abstract class AbstractActorPrototype implements ActorPrototype { + private final ActorStateFactory stateFactory; + private static final Logger logger = LoggerFactory.getLogger(AbstractActorPrototype.class); + + public AbstractActorPrototype(ActorStateFactory stateFactory) { + this.stateFactory = stateFactory; + } + + /** Explicitly transition to a different state. + *

+ * Caveat: This is implemented via an exception. Mind your catch statements. */ + public void transition(String state) { + throw new ControlFlowException(state, null); + } + + /** Explicitly transition to a different state, encoding a payload. + *

+ * Caveat: This is implemented via an exception. Mind your catch statements. */ + public void transition(String state, T payload) { + throw new ControlFlowException(state, payload); + } + + /** Explicitly transition to the error state. + *

+ * Caveat: This is implemented via an exception. Mind your catch statements. */ + public void error() { + throw new ControlFlowException("ERROR", ""); + } + + /** Explicitly transition to the error state with an error message. + *

+ * Caveat: This is implemented via an exception. Mind your catch statements. */ + public void error(T payload) { + throw new ControlFlowException("ERROR", payload); + } + + /** Explicitly transition to the error state. + *

+ * Caveat: This is implemented via an exception. Mind your catch statements. */ + public void error(Exception ex) { + throw new ControlFlowException("ERROR", ex.getClass().getSimpleName() + ":" + ex.getMessage()); + } + + @Override + public boolean isDirectlyInitializable() { + for (var method : getClass().getMethods()) { + var gs = method.getAnnotation(ActorState.class); + if (gs == null) { + continue; + } + if ("INITIAL".equals(gs.name()) && method.getParameterCount() == 0) { + return true; + } + } + return false; + } + + @Override + public Map declaredStates() { + Map ret = new HashMap<>(); + + for (var method : getClass().getMethods()) { + var gs = method.getAnnotation(ActorState.class); + if (gs != null) { + ret.put(gs.name(), gs); + } + } + + return ret; + } + + /** Compile a list of ActorStateInstances from the @ActorState and @ActorTerminalState annotations. + */ + @Override + public List asStateList() { + List ret = new ArrayList<>(); + + for (var method : getClass().getMethods()) { + var gs = method.getAnnotation(ActorState.class); + if (gs != null) { + ret.add(createStateInstance(method, gs)); + } + + var ts = method.getAnnotation(ActorTerminalState.class); + if (ts != null) { + ret.add(createTerminalStateInstance(ts)); + } + } + + return ret; + } + + private ActorStateInstance createStateInstance(Method method, ActorState gs) { + + var parameters = method.getParameterTypes(); + boolean returnsVoid = method.getGenericReturnType().equals(Void.TYPE); + + if (parameters.length == 0) { + return stateFactory.create(gs.name(), gs.resume(), () -> { + try { + if (returnsVoid) { + method.invoke(this); + return ActorStateTransition.to(gs.next()); + } else { + Object ret = method.invoke(this); + return stateFactory.transition(gs.next(), ret); + } + } + catch (Exception e) { + return translateInvocationExceptionToStateTransition(gs.name(), e); + } + }); + } + else if (parameters.length == 1) { + return stateFactory.create(gs.name(), gs.resume(), parameters[0], (param) -> { + try { + if (returnsVoid) { + method.invoke(this, param); + return ActorStateTransition.to(gs.next()); + } else { + Object ret = method.invoke(this, param); + return stateFactory.transition(gs.next(), ret); + } + } + catch (Exception e) { + return translateInvocationExceptionToStateTransition(gs.name(), e); + } + }); + } + else { + // We permit only @ActorState-annotated methods like this: + // + // void foo(); + // void foo(Object bar); + // Object foo(); + // Object foo(Object bar); + + throw new IllegalStateException("ActorStatePrototype " + + getClass().getSimpleName() + + " has invalid method signature for method " + + method.getName() + + ": Expected 0 or 1 parameter(s) but found " + + Arrays.toString(parameters)); + } + } + + private ActorStateInstance createTerminalStateInstance(ActorTerminalState ts) { + final String name = ts.name(); + return stateFactory.create(name, ActorResumeBehavior.ERROR, () -> { + throw new ControlFlowException(name, null); + }); + } + + private ActorStateTransition translateInvocationExceptionToStateTransition(String state, Throwable ex) { + while (ex instanceof InvocationTargetException e) { + if (e.getCause() != null) ex = ex.getCause(); + } + + if (ex instanceof ControlFlowException cfe) { + return stateFactory.transition(cfe.getState(), cfe.getPayload()); + } + else if (ex instanceof InterruptedException intE) { + logger.error("State execution was interrupted " + state); + return ActorStateTransition.to("ERR", "Execution interrupted"); + } + else { + logger.error("Error in state invocation " + state, ex); + return ActorStateTransition.to("ERROR", + "Exception: " + ex.getClass().getSimpleName() + "/" + ex.getMessage()); + } + } + + /** Exception thrown by a state to indicate that the state machine should jump to a different state. */ + public static class ControlFlowException extends RuntimeException { + private final String state; + private final Object payload; + + public ControlFlowException(String state, Object payload) { + this.state = state; + this.payload = payload; + } + + public String getState() { + return state; + } + + public Object getPayload() { + return payload; + } + + public StackTraceElement[] getStackTrace() { return new StackTraceElement[0]; } + } +} diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/actor/prototype/ActorPrototype.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/prototype/ActorPrototype.java new file mode 100644 index 00000000..1c78f6d3 --- /dev/null +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/prototype/ActorPrototype.java @@ -0,0 +1,25 @@ +package nu.marginalia.actor.prototype; + +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorStateInstance; +import nu.marginalia.actor.state.ActorTerminalState; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public interface ActorPrototype { + /** + * User-facing description of the actor. + */ + String describe(); + + /** Check whether there is an INITIAL state that can be directly initialized + * without declared parameters. */ + boolean isDirectlyInitializable(); + + Map declaredStates(); + + /** Get or create a list of ActorStateInstances */ + List asStateList(); +} diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorResumeBehavior.java similarity index 72% rename from code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorResumeBehavior.java index 33dacb5d..60429736 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorResumeBehavior.java @@ -1,6 +1,6 @@ -package nu.marginalia.mqsm.graph; +package nu.marginalia.actor.state; -public enum ResumeBehavior { +public enum ActorResumeBehavior { /** Retry the state on resume */ RETRY, /** Jump to ERROR on resume if the message has been acknowledged */ diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/GraphState.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorState.java similarity index 70% rename from code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/GraphState.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorState.java index e5764dd2..c52e8427 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/GraphState.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorState.java @@ -1,4 +1,4 @@ -package nu.marginalia.mqsm.graph; +package nu.marginalia.actor.state; import java.lang.annotation.Retention; @@ -6,10 +6,10 @@ import java.lang.annotation.RetentionPolicy; /** Annotation for declaring a state in an actor's state graph. */ @Retention(RetentionPolicy.RUNTIME) -public @interface GraphState { +public @interface ActorState { String name(); String next() default "ERROR"; String[] transitions() default {}; String description() default ""; - ResumeBehavior resume() default ResumeBehavior.ERROR; + ActorResumeBehavior resume() default ActorResumeBehavior.ERROR; } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorStateInstance.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorStateInstance.java new file mode 100644 index 00000000..65312439 --- /dev/null +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorStateInstance.java @@ -0,0 +1,12 @@ +package nu.marginalia.actor.state; + +public interface ActorStateInstance { + String name(); + + ActorStateTransition next(String message); + + ActorResumeBehavior resumeBehavior(); + + boolean isFinal(); + +} diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorStateTransition.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorStateTransition.java new file mode 100644 index 00000000..37ee7ea0 --- /dev/null +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorStateTransition.java @@ -0,0 +1,11 @@ +package nu.marginalia.actor.state; + +public record ActorStateTransition(String state, String message) { + public static ActorStateTransition to(String state) { + return new ActorStateTransition(state, ""); + } + + public static ActorStateTransition to(String state, String message) { + return new ActorStateTransition(state, message); + } +} diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/TerminalGraphState.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorTerminalState.java similarity index 70% rename from code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/TerminalGraphState.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorTerminalState.java index c7b11730..176fd877 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/TerminalGraphState.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/state/ActorTerminalState.java @@ -1,10 +1,10 @@ -package nu.marginalia.mqsm.graph; +package nu.marginalia.actor.state; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @Retention(RetentionPolicy.RUNTIME) -public @interface TerminalGraphState { +public @interface ActorTerminalState { String name(); String description() default ""; } diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/MqException.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MqException.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/MqException.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/MqException.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/MqMessage.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MqMessage.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/MqMessage.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/MqMessage.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/MqMessageState.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MqMessageState.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/MqMessageState.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/MqMessageState.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxIf.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxIf.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxIf.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxIf.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxResponse.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxResponse.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxResponse.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxResponse.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSubscription.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSubscription.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSubscription.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSubscription.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java similarity index 100% rename from code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java rename to code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineErrorTest.java b/code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineErrorTest.java similarity index 80% rename from code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineErrorTest.java rename to code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineErrorTest.java index 3ca46e83..6a39d0ec 100644 --- a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineErrorTest.java +++ b/code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineErrorTest.java @@ -1,15 +1,15 @@ -package nu.marginalia.mqsm; +package nu.marginalia.actor; import com.google.gson.GsonBuilder; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.actor.prototype.AbstractActorPrototype; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessageRow; import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import org.junit.jupiter.api.*; import org.junit.jupiter.api.parallel.Execution; import org.testcontainers.containers.MariaDBContainer; @@ -61,21 +61,24 @@ public class ActorStateMachineErrorTest { dataSource.close(); } - public static class ErrorHurdles extends AbstractStateGraph { + public static class ErrorHurdles extends AbstractActorPrototype { - public ErrorHurdles(StateFactory stateFactory) { + public ErrorHurdles(ActorStateFactory stateFactory) { super(stateFactory); } - @GraphState(name = "INITIAL", next = "FAILING") + public String describe() { + return "Test graph"; + } + @ActorState(name = "INITIAL", next = "FAILING") public void initial() { } - @GraphState(name = "FAILING", next = "OK", resume = ResumeBehavior.RETRY) + @ActorState(name = "FAILING", next = "OK", resume = ActorResumeBehavior.RETRY) public void resumable() { throw new RuntimeException("Boom!"); } - @GraphState(name = "OK", next = "END") + @ActorState(name = "OK", next = "END") public void ok() { } @@ -84,7 +87,7 @@ public class ActorStateMachineErrorTest { @Test public void smResumeResumableFromNew() throws Exception { - var stateFactory = new StateFactory(new GsonBuilder().create()); + var stateFactory = new ActorStateFactory(new GsonBuilder().create()); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ErrorHurdles(stateFactory)); sm.init(); diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineNullTest.java b/code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineNullTest.java similarity index 81% rename from code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineNullTest.java rename to code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineNullTest.java index a20c75f0..d1e1f0c6 100644 --- a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineNullTest.java +++ b/code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineNullTest.java @@ -1,13 +1,13 @@ -package nu.marginalia.mqsm; +package nu.marginalia.actor; import com.google.gson.GsonBuilder; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.actor.prototype.AbstractActorPrototype; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.actor.state.ActorState; import org.junit.jupiter.api.*; import org.junit.jupiter.api.parallel.Execution; import org.testcontainers.containers.MariaDBContainer; @@ -58,15 +58,18 @@ public class ActorStateMachineNullTest { dataSource.close(); } - public static class TestGraph extends AbstractStateGraph { - public TestGraph(StateFactory stateFactory) { + public static class TestPrototypeActor extends AbstractActorPrototype { + public TestPrototypeActor(ActorStateFactory stateFactory) { super(stateFactory); } - @GraphState(name = "INITIAL", next = "GREET") + public String describe() { + return "Test graph"; + } + @ActorState(name = "INITIAL", next = "GREET") public void initial() {} - @GraphState(name = "GREET", next = "END") + @ActorState(name = "GREET", next = "END") public void greet(String message) { if (null == message) { System.out.println("Hello, null!"); @@ -79,8 +82,8 @@ public class ActorStateMachineNullTest { @Test public void testStateGraphNullSerialization() throws Exception { - var stateFactory = new StateFactory(new GsonBuilder().create()); - var graph = new TestGraph(stateFactory); + var stateFactory = new ActorStateFactory(new GsonBuilder().create()); + var graph = new TestPrototypeActor(stateFactory); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), graph); diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineResumeTest.java b/code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineResumeTest.java similarity index 77% rename from code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineResumeTest.java rename to code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineResumeTest.java index 825a4c43..69381c51 100644 --- a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineResumeTest.java +++ b/code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineResumeTest.java @@ -1,16 +1,16 @@ -package nu.marginalia.mqsm; +package nu.marginalia.actor; import com.google.gson.GsonBuilder; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.actor.prototype.AbstractActorPrototype; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessageRow; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import org.junit.jupiter.api.*; import org.junit.jupiter.api.parallel.Execution; import org.testcontainers.containers.MariaDBContainer; @@ -62,31 +62,34 @@ public class ActorStateMachineResumeTest { dataSource.close(); } - public static class ResumeTrialsGraph extends AbstractStateGraph { + public static class ResumeTrialsPrototypeActor extends AbstractActorPrototype { - public ResumeTrialsGraph(StateFactory stateFactory) { + public ResumeTrialsPrototypeActor(ActorStateFactory stateFactory) { super(stateFactory); } - @GraphState(name = "INITIAL", next = "RESUMABLE") + public String describe() { + return "Test graph"; + } + @ActorState(name = "INITIAL", next = "RESUMABLE") public void initial() {} - @GraphState(name = "RESUMABLE", next = "NON-RESUMABLE", resume = ResumeBehavior.RETRY) + @ActorState(name = "RESUMABLE", next = "NON-RESUMABLE", resume = ActorResumeBehavior.RETRY) public void resumable() {} - @GraphState(name = "NON-RESUMABLE", next = "OK", resume = ResumeBehavior.ERROR) + @ActorState(name = "NON-RESUMABLE", next = "OK", resume = ActorResumeBehavior.ERROR) public void nonResumable() {} - @GraphState(name = "OK", next = "END") + @ActorState(name = "OK", next = "END") public void ok() {} } @Test public void smResumeResumableFromNew() throws Exception { - var stateFactory = new StateFactory(new GsonBuilder().create()); + var stateFactory = new ActorStateFactory(new GsonBuilder().create()); persistence.sendNewMessage(inboxId, null, -1L, "RESUMABLE", "", null); - var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); + var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory)); sm.join(2, TimeUnit.SECONDS); sm.stop(); @@ -102,12 +105,12 @@ public class ActorStateMachineResumeTest { @Test public void smResumeFromAck() throws Exception { - var stateFactory = new StateFactory(new GsonBuilder().create()); + var stateFactory = new ActorStateFactory(new GsonBuilder().create()); long id = persistence.sendNewMessage(inboxId, null, -1L, "RESUMABLE", "", null); persistence.updateMessageState(id, MqMessageState.ACK); - var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); + var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory)); sm.join(4, TimeUnit.SECONDS); sm.stop(); @@ -124,12 +127,12 @@ public class ActorStateMachineResumeTest { @Test public void smResumeNonResumableFromNew() throws Exception { - var stateFactory = new StateFactory(new GsonBuilder().create()); + var stateFactory = new ActorStateFactory(new GsonBuilder().create()); persistence.sendNewMessage(inboxId, null, -1L, "NON-RESUMABLE", "", null); - var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); + var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory)); sm.join(2, TimeUnit.SECONDS); sm.stop(); @@ -145,13 +148,13 @@ public class ActorStateMachineResumeTest { @Test public void smResumeNonResumableFromAck() throws Exception { - var stateFactory = new StateFactory(new GsonBuilder().create()); + var stateFactory = new ActorStateFactory(new GsonBuilder().create()); long id = persistence.sendNewMessage(inboxId, null, null, "NON-RESUMABLE", "", null); persistence.updateMessageState(id, MqMessageState.ACK); - var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); + var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory)); sm.join(2, TimeUnit.SECONDS); sm.stop(); @@ -167,10 +170,10 @@ public class ActorStateMachineResumeTest { @Test public void smResumeEmptyQueue() throws Exception { - var stateFactory = new StateFactory(new GsonBuilder().create()); + var stateFactory = new ActorStateFactory(new GsonBuilder().create()); - var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); + var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory)); sm.join(2, TimeUnit.SECONDS); sm.stop(); diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineTest.java b/code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineTest.java similarity index 79% rename from code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineTest.java rename to code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineTest.java index 5574c771..ac9147a9 100644 --- a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/ActorStateMachineTest.java +++ b/code/libraries/message-queue/src/test/java/nu/marginalia/actor/ActorStateMachineTest.java @@ -1,13 +1,13 @@ -package nu.marginalia.mqsm; +package nu.marginalia.actor; import com.google.gson.GsonBuilder; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.actor.prototype.AbstractActorPrototype; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.actor.state.ActorState; import org.junit.jupiter.api.*; import org.junit.jupiter.api.parallel.Execution; import org.testcontainers.containers.MariaDBContainer; @@ -58,24 +58,27 @@ public class ActorStateMachineTest { dataSource.close(); } - public static class TestGraph extends AbstractStateGraph { - public TestGraph(StateFactory stateFactory) { + public static class TestPrototypeActor extends AbstractActorPrototype { + public TestPrototypeActor(ActorStateFactory stateFactory) { super(stateFactory); } + public String describe() { + return "Test graph"; + } - @GraphState(name = "INITIAL", next = "GREET") + @ActorState(name = "INITIAL", next = "GREET") public String initial() { return "World"; } - @GraphState(name = "GREET") + @ActorState(name = "GREET") public void greet(String message) { System.out.println("Hello, " + message + "!"); transition("COUNT-DOWN", 5); } - @GraphState(name = "COUNT-DOWN", next = "END") + @ActorState(name = "COUNT-DOWN", next = "END") public void countDown(Integer from) { if (from > 0) { System.out.println(from); @@ -86,8 +89,8 @@ public class ActorStateMachineTest { @Test public void testAnnotatedStateGraph() throws Exception { - var stateFactory = new StateFactory(new GsonBuilder().create()); - var graph = new TestGraph(stateFactory); + var stateFactory = new ActorStateFactory(new GsonBuilder().create()); + var graph = new TestPrototypeActor(stateFactory); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), graph); @@ -104,8 +107,8 @@ public class ActorStateMachineTest { @Test public void testStartStopStartStop() throws Exception { - var stateFactory = new StateFactory(new GsonBuilder().create()); - var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestGraph(stateFactory)); + var stateFactory = new ActorStateFactory(new GsonBuilder().create()); + var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestPrototypeActor(stateFactory)); sm.init(); @@ -114,7 +117,7 @@ public class ActorStateMachineTest { System.out.println("-------------------- "); - var sm2 = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestGraph(stateFactory)); + var sm2 = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestPrototypeActor(stateFactory)); sm2.join(2, TimeUnit.SECONDS); sm2.stop(); @@ -123,7 +126,7 @@ public class ActorStateMachineTest { @Test public void testFalseTransition() throws Exception { - var stateFactory = new StateFactory(new GsonBuilder().create()); + var stateFactory = new ActorStateFactory(new GsonBuilder().create()); // Prep the queue with a message to set the state to initial, // and an additional message to trigger the false transition back to initial @@ -131,7 +134,7 @@ public class ActorStateMachineTest { persistence.sendNewMessage(inboxId, null, null, "INITIAL", "", null); persistence.sendNewMessage(inboxId, null, null, "INITIAL", "", null); - var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestGraph(stateFactory)); + var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestPrototypeActor(stateFactory)); Thread.sleep(50); diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mq/MqMessageRow.java b/code/libraries/message-queue/src/test/java/nu/marginalia/mq/MqMessageRow.java similarity index 100% rename from code/common/message-queue/src/test/java/nu/marginalia/mq/MqMessageRow.java rename to code/libraries/message-queue/src/test/java/nu/marginalia/mq/MqMessageRow.java diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mq/MqTestUtil.java b/code/libraries/message-queue/src/test/java/nu/marginalia/mq/MqTestUtil.java similarity index 100% rename from code/common/message-queue/src/test/java/nu/marginalia/mq/MqTestUtil.java rename to code/libraries/message-queue/src/test/java/nu/marginalia/mq/MqTestUtil.java diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java b/code/libraries/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java similarity index 100% rename from code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java rename to code/libraries/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mq/persistence/MqPersistenceTest.java b/code/libraries/message-queue/src/test/java/nu/marginalia/mq/persistence/MqPersistenceTest.java similarity index 100% rename from code/common/message-queue/src/test/java/nu/marginalia/mq/persistence/MqPersistenceTest.java rename to code/libraries/message-queue/src/test/java/nu/marginalia/mq/persistence/MqPersistenceTest.java diff --git a/code/libraries/readme.md b/code/libraries/readme.md index 7dabf9c9..693198df 100644 --- a/code/libraries/readme.md +++ b/code/libraries/readme.md @@ -11,7 +11,7 @@ These libraries may not depend on features, services, processes, models, etc. bad support for. It's designed to be able to easily replaced when *Java's Foreign Function And Memory API* is released. * The [btree](btree/) library offers a static BTree implementation based on the array library. * [language-processing](language-processing/) contains primitives for sentence extraction and POS-tagging. - +* The [message-queue](message-queue/) library. ## Micro libraries * [easy-lsh](easy-lsh/) is a simple locality-sensitive hash for document deduplication diff --git a/code/processes/converting-process/build.gradle b/code/processes/converting-process/build.gradle index 6d5ce58c..3b22535c 100644 --- a/code/processes/converting-process/build.gradle +++ b/code/processes/converting-process/build.gradle @@ -34,7 +34,7 @@ dependencies { implementation project(':code:common:db') implementation project(':code:common:service') implementation project(':code:common:config') - implementation project(':code:common:message-queue') + implementation project(':code:libraries:message-queue') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/StackexchangeSideloaderTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/StackexchangeSideloaderTest.java index ee48ccc9..7c673f67 100644 --- a/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/StackexchangeSideloaderTest.java +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/StackexchangeSideloaderTest.java @@ -1,5 +1,6 @@ package nu.marginalia.converting.sideload; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import javax.xml.stream.XMLStreamException; @@ -8,6 +9,7 @@ import java.nio.file.Path; class StackexchangeSideloaderTest { @Test + @Disabled public void test7zFile() throws IOException, XMLStreamException { var stackExchangeReader = new StackExchange7zReader(Path.of("/mnt/storage/stackexchange/scifi.meta.stackexchange.com.7z")); diff --git a/code/processes/crawling-process/build.gradle b/code/processes/crawling-process/build.gradle index fcc7862d..fe137a48 100644 --- a/code/processes/crawling-process/build.gradle +++ b/code/processes/crawling-process/build.gradle @@ -30,7 +30,7 @@ dependencies { implementation project(':code:api:process-mqapi') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') - implementation project(':code:common:message-queue') + implementation project(':code:libraries:message-queue') implementation project(':code:libraries:language-processing') implementation project(':code:libraries:easy-lsh') implementation project(':code:process-models:crawling-model') diff --git a/code/processes/loading-process/build.gradle b/code/processes/loading-process/build.gradle index 0a89c350..7f900621 100644 --- a/code/processes/loading-process/build.gradle +++ b/code/processes/loading-process/build.gradle @@ -23,13 +23,13 @@ dependencies { implementation project(':code:api:index-api') implementation project(':code:common:model') implementation project(':code:common:db') - implementation project(':code:common:message-queue') implementation project(':code:common:config') implementation project(':code:common:service') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') implementation project(':code:features-index:lexicon') implementation project(':code:features-index:index-journal') + implementation project(':code:libraries:message-queue') implementation project(':code:libraries:language-processing') implementation project(':third-party:commons-codec') testImplementation project(':code:services-core:search-service') diff --git a/code/services-core/control-service/build.gradle b/code/services-core/control-service/build.gradle index 90b832da..af67d328 100644 --- a/code/services-core/control-service/build.gradle +++ b/code/services-core/control-service/build.gradle @@ -29,7 +29,7 @@ dependencies { implementation project(':code:common:service') implementation project(':code:common:config') implementation project(':code:common:renderer') - implementation project(':code:common:message-queue') + implementation project(':code:libraries:message-queue') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') implementation project(':code:api:search-api') diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java index a42aec50..7be91df3 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java @@ -10,9 +10,9 @@ import nu.marginalia.control.actor.monitor.ConverterMonitorActor; import nu.marginalia.control.actor.monitor.LoaderMonitorActor; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mqsm.ActorStateMachine; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.state.MachineState; +import nu.marginalia.actor.ActorStateMachine; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorStateInstance; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.BaseServiceParams; @@ -28,7 +28,7 @@ public class ControlActors { private final Gson gson; private final MessageQueueFactory messageQueueFactory; public Map stateMachines = new HashMap<>(); - public Map actorDefinitions = new HashMap<>(); + public Map actorDefinitions = new HashMap<>(); @Inject public ControlActors(MessageQueueFactory messageQueueFactory, @@ -71,7 +71,7 @@ public class ControlActors { register(Actor.TRUNCATE_LINK_DATABASE, truncateLinkDatabase); } - private void register(Actor process, AbstractStateGraph graph) { + private void register(Actor process, AbstractActorPrototype graph) { var sm = new ActorStateMachine(messageQueueFactory, process.id(), UUID.randomUUID(), graph); sm.listen((function, param) -> logStateChange(process, function)); @@ -114,7 +114,7 @@ public class ControlActors { stateMachines.get(process).abortExecution(); } - public Map getActorStates() { + public Map getActorStates() { return stateMachines.entrySet().stream().collect( Collectors.toMap( Map.Entry::getKey, e -> e.getValue().getState()) @@ -125,7 +125,7 @@ public class ControlActors { return actorDefinitions.get(actor).isDirectlyInitializable(); } - public AbstractStateGraph getActorDefinition(Actor actor) { + public AbstractActorPrototype getActorDefinition(Actor actor) { return actorDefinitions.get(actor); } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java index 312e71d9..b3b3473f 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java @@ -2,14 +2,14 @@ package nu.marginalia.control.actor.monitor; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.control.process.ProcessService; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; -import nu.marginalia.mqsm.graph.TerminalGraphState; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.ActorTerminalState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @Singleton -public class AbstractProcessSpawnerActor extends AbstractStateGraph { +public class AbstractProcessSpawnerActor extends AbstractActorPrototype { private final MqPersistence persistence; private final ProcessService processService; @@ -45,7 +45,7 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph { } @Inject - public AbstractProcessSpawnerActor(StateFactory stateFactory, + public AbstractProcessSpawnerActor(ActorStateFactory stateFactory, MqPersistence persistence, ProcessService processService, String inboxName, @@ -57,14 +57,14 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph { this.processId = processId; } - @GraphState(name = INITIAL, next = MONITOR) + @ActorState(name = INITIAL, next = MONITOR) public void init() { } - @GraphState(name = MONITOR, + @ActorState(name = MONITOR, next = MONITOR, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, transitions = {MONITOR, RUN}, description = """ Monitors the inbox of the process for messages. @@ -95,8 +95,8 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph { } } - @GraphState(name = RUN, - resume = ResumeBehavior.RESTART, + @ActorState(name = RUN, + resume = ActorResumeBehavior.RESTART, transitions = {MONITOR, ERROR, RUN, ABORTED}, description = """ Runs the process. @@ -159,7 +159,7 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph { } } - @TerminalGraphState(name = ABORTED, description = "The process was manually aborted") + @ActorTerminalState(name = ABORTED, description = "The process was manually aborted") public void aborted() throws Exception {} diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ConverterMonitorActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ConverterMonitorActor.java index 158b48ca..aebb4a38 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ConverterMonitorActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ConverterMonitorActor.java @@ -2,17 +2,17 @@ package nu.marginalia.control.actor.monitor; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.control.process.ProcessService; import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqsm.StateFactory; @Singleton public class ConverterMonitorActor extends AbstractProcessSpawnerActor { @Inject - public ConverterMonitorActor(StateFactory stateFactory, + public ConverterMonitorActor(ActorStateFactory stateFactory, MqPersistence persistence, ProcessService processService) { super(stateFactory, persistence, processService, ProcessInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/CrawlerMonitorActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/CrawlerMonitorActor.java index cc9c73fb..631f29da 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/CrawlerMonitorActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/CrawlerMonitorActor.java @@ -2,16 +2,16 @@ package nu.marginalia.control.actor.monitor; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.control.process.ProcessService; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqapi.ProcessInboxNames; -import nu.marginalia.mqsm.StateFactory; @Singleton public class CrawlerMonitorActor extends AbstractProcessSpawnerActor { @Inject - public CrawlerMonitorActor(StateFactory stateFactory, + public CrawlerMonitorActor(ActorStateFactory stateFactory, MqPersistence persistence, ProcessService processService) { super(stateFactory, diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/FileStorageMonitorActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/FileStorageMonitorActor.java index 190d1daf..f69ab3de 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/FileStorageMonitorActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/FileStorageMonitorActor.java @@ -2,14 +2,14 @@ package nu.marginalia.control.actor.monitor; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.db.storage.model.FileStorageBaseType; import nu.marginalia.db.storage.model.FileStorageId; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +22,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; @Singleton -public class FileStorageMonitorActor extends AbstractStateGraph { +public class FileStorageMonitorActor extends AbstractActorPrototype { private final Logger logger = LoggerFactory.getLogger(getClass()); // STATES @@ -41,19 +41,19 @@ public class FileStorageMonitorActor extends AbstractStateGraph { } @Inject - public FileStorageMonitorActor(StateFactory stateFactory, + public FileStorageMonitorActor(ActorStateFactory stateFactory, FileStorageService fileStorageService) { super(stateFactory); this.fileStorageService = fileStorageService; } - @GraphState(name = INITIAL, next = MONITOR) + @ActorState(name = INITIAL, next = MONITOR) public void init() { } - @GraphState(name = MONITOR, + @ActorState(name = MONITOR, next = PURGE, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, transitions = { PURGE, REMOVE_STALE }, description = """ Monitor the file storage and trigger at transition to PURGE if any file storage area @@ -80,9 +80,9 @@ public class FileStorageMonitorActor extends AbstractStateGraph { } } - @GraphState(name = PURGE, + @ActorState(name = PURGE, next = MONITOR, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Purge the file storage area and transition back to MONITOR. """ @@ -99,10 +99,10 @@ public class FileStorageMonitorActor extends AbstractStateGraph { fileStorageService.removeFileStorage(storage.id()); } - @GraphState( + @ActorState( name = REMOVE_STALE, next = MONITOR, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Remove file storage from the database if it doesn't exist on disk. """ diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/LoaderMonitorActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/LoaderMonitorActor.java index fcf3b895..281b021b 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/LoaderMonitorActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/LoaderMonitorActor.java @@ -2,17 +2,17 @@ package nu.marginalia.control.actor.monitor; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.control.process.ProcessService; import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqsm.StateFactory; @Singleton public class LoaderMonitorActor extends AbstractProcessSpawnerActor { @Inject - public LoaderMonitorActor(StateFactory stateFactory, + public LoaderMonitorActor(ActorStateFactory stateFactory, MqPersistence persistence, ProcessService processService) { diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/MessageQueueMonitorActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/MessageQueueMonitorActor.java index 1665a524..8b7f3354 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/MessageQueueMonitorActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/MessageQueueMonitorActor.java @@ -2,16 +2,16 @@ package nu.marginalia.control.actor.monitor; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import java.util.concurrent.TimeUnit; @Singleton -public class MessageQueueMonitorActor extends AbstractStateGraph { +public class MessageQueueMonitorActor extends AbstractActorPrototype { // STATES @@ -26,17 +26,17 @@ public class MessageQueueMonitorActor extends AbstractStateGraph { } @Inject - public MessageQueueMonitorActor(StateFactory stateFactory, + public MessageQueueMonitorActor(ActorStateFactory stateFactory, MqPersistence persistence) { super(stateFactory); this.persistence = persistence; } - @GraphState(name = INITIAL, next = MONITOR) + @ActorState(name = INITIAL, next = MONITOR) public void init() { } - @GraphState(name = MONITOR, next = MONITOR, resume = ResumeBehavior.RETRY, + @ActorState(name = MONITOR, next = MONITOR, resume = ActorResumeBehavior.RETRY, description = """ Periodically clean up the message queue. """) diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ProcessLivenessMonitorActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ProcessLivenessMonitorActor.java index e7542a4c..6cf20e20 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ProcessLivenessMonitorActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ProcessLivenessMonitorActor.java @@ -2,19 +2,19 @@ package nu.marginalia.control.actor.monitor; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.control.model.ServiceHeartbeat; import nu.marginalia.control.svc.HeartbeatService; import nu.marginalia.control.process.ProcessService; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Singleton -public class ProcessLivenessMonitorActor extends AbstractStateGraph { +public class ProcessLivenessMonitorActor extends AbstractActorPrototype { // STATES @@ -26,7 +26,7 @@ public class ProcessLivenessMonitorActor extends AbstractStateGraph { @Inject - public ProcessLivenessMonitorActor(StateFactory stateFactory, + public ProcessLivenessMonitorActor(ActorStateFactory stateFactory, ProcessService processService, HeartbeatService heartbeatService) { super(stateFactory); @@ -39,11 +39,11 @@ public class ProcessLivenessMonitorActor extends AbstractStateGraph { return "Periodically check to ensure that the control service's view of running processes is agreement with the process heartbeats table."; } - @GraphState(name = INITIAL, next = MONITOR) + @ActorState(name = INITIAL, next = MONITOR) public void init() { } - @GraphState(name = MONITOR, next = MONITOR, resume = ResumeBehavior.RETRY, description = """ + @ActorState(name = MONITOR, next = MONITOR, resume = ActorResumeBehavior.RETRY, description = """ Periodically check to ensure that the control service's view of running processes is agreement with the process heartbeats table. diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ActorProcessWatcher.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ActorProcessWatcher.java index d6c33608..b8fc8261 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ActorProcessWatcher.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ActorProcessWatcher.java @@ -2,10 +2,10 @@ package nu.marginalia.control.actor.task; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.actor.prototype.AbstractActorPrototype; import nu.marginalia.control.process.ProcessService; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.outbox.MqOutbox; -import nu.marginalia.mqsm.graph.ControlFlowException; import java.sql.SQLException; import java.util.concurrent.TimeUnit; @@ -29,10 +29,10 @@ public class ActorProcessWatcher { * When interrupted, the process is killed and the message is marked as dead. */ public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long msgId) - throws ControlFlowException, InterruptedException, SQLException + throws AbstractActorPrototype.ControlFlowException, InterruptedException, SQLException { if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { - throw new ControlFlowException("ERROR", + throw new AbstractActorPrototype.ControlFlowException("ERROR", "Process " + processId + " did not launch"); } @@ -52,7 +52,7 @@ public class ActorProcessWatcher { catch (TimeoutException ex) { // Maybe the process died, wait a moment for it to restart if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { - throw new ControlFlowException("ERROR", + throw new AbstractActorPrototype.ControlFlowException("ERROR", "Process " + processId + " died and did not re-launch"); } } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertActor.java index 79d2c2bb..2377449a 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertActor.java @@ -16,10 +16,10 @@ import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mqapi.converting.ConvertAction; import nu.marginalia.mqapi.converting.ConvertRequest; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.ActorStateFactory; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +27,7 @@ import java.nio.file.Files; import java.nio.file.Path; @Singleton -public class ConvertActor extends AbstractStateGraph { +public class ConvertActor extends AbstractActorPrototype { // STATES @@ -59,7 +59,7 @@ public class ConvertActor extends AbstractStateGraph { } @Inject - public ConvertActor(StateFactory stateFactory, + public ConvertActor(ActorStateFactory stateFactory, ActorProcessWatcher processWatcher, ProcessOutboxes processOutboxes, FileStorageService storageService, @@ -73,15 +73,15 @@ public class ConvertActor extends AbstractStateGraph { this.gson = gson; } - @GraphState(name= INITIAL, resume = ResumeBehavior.ERROR, + @ActorState(name= INITIAL, resume = ActorResumeBehavior.ERROR, description = "Pro forma initial state") public void initial(Integer unused) { error("This actor does not support the initial state"); } - @GraphState(name = CONVERT, + @ActorState(name = CONVERT, next = CONVERT_WAIT, - resume = ResumeBehavior.ERROR, + resume = ActorResumeBehavior.ERROR, description = """ Allocate a storage area for the processed data, then send a convert request to the converter and transition to RECONVERT_WAIT. @@ -107,9 +107,9 @@ public class ConvertActor extends AbstractStateGraph { return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)); } - @GraphState(name = CONVERT_ENCYCLOPEDIA, + @ActorState(name = CONVERT_ENCYCLOPEDIA, next = CONVERT_WAIT, - resume = ResumeBehavior.ERROR, + resume = ActorResumeBehavior.ERROR, description = """ Allocate a storage area for the processed data, then send a convert request to the converter and transition to RECONVERT_WAIT. @@ -138,9 +138,9 @@ public class ConvertActor extends AbstractStateGraph { return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)); } - @GraphState(name = CONVERT_STACKEXCHANGE, + @ActorState(name = CONVERT_STACKEXCHANGE, next = CONVERT_WAIT, - resume = ResumeBehavior.ERROR, + resume = ActorResumeBehavior.ERROR, description = """ Allocate a storage area for the processed data, then send a convert request to the converter and transition to RECONVERT_WAIT. @@ -169,10 +169,10 @@ public class ConvertActor extends AbstractStateGraph { return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)); } - @GraphState( + @ActorState( name = CONVERT_WAIT, next = END, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Wait for the converter to finish processing the data. """ diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java index 970e85d5..38966a7f 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java @@ -6,6 +6,7 @@ import com.google.inject.Singleton; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.With; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.control.process.ProcessOutboxes; import nu.marginalia.control.process.ProcessService; import nu.marginalia.index.client.IndexClient; @@ -19,10 +20,9 @@ import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +30,7 @@ import java.nio.file.Files; import java.nio.file.StandardCopyOption; @Singleton -public class ConvertAndLoadActor extends AbstractStateGraph { +public class ConvertAndLoadActor extends AbstractActorPrototype { // STATES @@ -69,7 +69,7 @@ public class ConvertAndLoadActor extends AbstractStateGraph { } @Inject - public ConvertAndLoadActor(StateFactory stateFactory, + public ConvertAndLoadActor(ActorStateFactory stateFactory, ActorProcessWatcher processWatcher, ProcessOutboxes processOutboxes, FileStorageService storageService, @@ -86,7 +86,7 @@ public class ConvertAndLoadActor extends AbstractStateGraph { this.gson = gson; } - @GraphState(name = INITIAL, + @ActorState(name = INITIAL, next = RECONVERT, description = """ Validate the input and transition to RECONVERT @@ -104,9 +104,9 @@ public class ConvertAndLoadActor extends AbstractStateGraph { return new Message().withCrawlStorageId(crawlStorageId); } - @GraphState(name = RECONVERT, + @ActorState(name = RECONVERT, next = RECONVERT_WAIT, - resume = ResumeBehavior.ERROR, + resume = ActorResumeBehavior.ERROR, description = """ Allocate a storage area for the processed data, then send a convert request to the converter and transition to RECONVERT_WAIT. @@ -135,10 +135,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph { .withConverterMsgId(id); } - @GraphState( + @ActorState( name = RECONVERT_WAIT, next = LOAD, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Wait for the converter to finish processing the data. """ @@ -153,10 +153,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph { } - @GraphState( + @ActorState( name = LOAD, next = LOAD_WAIT, - resume = ResumeBehavior.ERROR, + resume = ActorResumeBehavior.ERROR, description = """ Send a load request to the loader and transition to LOAD_WAIT. """) @@ -169,10 +169,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph { } - @GraphState( + @ActorState( name = LOAD_WAIT, next = SWAP_LEXICON, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Wait for the loader to finish loading the data. """ @@ -186,10 +186,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph { - @GraphState( + @ActorState( name = SWAP_LEXICON, next = REPARTITION, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Move the lexicon from the LEXICON_STAGING area to the LEXICON_LIVE area, then instruct the index-service to reload the lexicon. @@ -208,7 +208,7 @@ public class ConvertAndLoadActor extends AbstractStateGraph { } - @GraphState( + @ActorState( name = REPARTITION, next = REPARTITION_WAIT, description = """ @@ -219,10 +219,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph { return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, ""); } - @GraphState( + @ActorState( name = REPARTITION_WAIT, next = REINDEX, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Wait for the index-service to finish repartitioning the index. """ @@ -235,7 +235,7 @@ public class ConvertAndLoadActor extends AbstractStateGraph { } } - @GraphState( + @ActorState( name = REINDEX, next = REINDEX_WAIT, description = """ @@ -246,10 +246,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph { return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, ""); } - @GraphState( + @ActorState( name = REINDEX_WAIT, next = END, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Wait for the index-service to finish reindexing the data. """ diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlActor.java index bc26624d..77d2a86c 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlActor.java @@ -6,6 +6,7 @@ import com.google.inject.Singleton; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.With; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.control.process.ProcessOutboxes; import nu.marginalia.control.process.ProcessService; import nu.marginalia.db.storage.FileStorageService; @@ -15,15 +16,14 @@ import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mqapi.crawling.CrawlRequest; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Singleton -public class CrawlActor extends AbstractStateGraph { +public class CrawlActor extends AbstractActorPrototype { // STATES @@ -52,7 +52,7 @@ public class CrawlActor extends AbstractStateGraph { } @Inject - public CrawlActor(StateFactory stateFactory, + public CrawlActor(ActorStateFactory stateFactory, ProcessOutboxes processOutboxes, FileStorageService storageService, Gson gson, @@ -65,7 +65,7 @@ public class CrawlActor extends AbstractStateGraph { this.processWatcher = processWatcher; } - @GraphState(name = INITIAL, + @ActorState(name = INITIAL, next = CRAWL, description = """ Validate the input and transition to CRAWL @@ -83,9 +83,9 @@ public class CrawlActor extends AbstractStateGraph { return new Message().withCrawlSpecId(crawlStorageId); } - @GraphState(name = CRAWL, + @ActorState(name = CRAWL, next = CRAWL_WAIT, - resume = ResumeBehavior.ERROR, + resume = ActorResumeBehavior.ERROR, description = """ Allocate a storage area for the crawled data, then send a crawl request to the crawler and transition to CRAWL_WAIT. @@ -114,10 +114,10 @@ public class CrawlActor extends AbstractStateGraph { .withCrawlerMsgId(id); } - @GraphState( + @ActorState( name = CRAWL_WAIT, next = END, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Wait for the crawler to finish retreiving the data. """ diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlJobExtractorActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlJobExtractorActor.java index 1b611ce0..91e42729 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlJobExtractorActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlJobExtractorActor.java @@ -2,16 +2,16 @@ package nu.marginalia.control.actor.task; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.control.svc.ControlFileStorageService; import nu.marginalia.control.process.ProcessService; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.db.storage.model.FileStorageBaseType; import nu.marginalia.db.storage.model.FileStorageType; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +24,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @Singleton -public class CrawlJobExtractorActor extends AbstractStateGraph { +public class CrawlJobExtractorActor extends AbstractActorPrototype { private final Logger logger = LoggerFactory.getLogger(getClass()); // STATES @@ -38,7 +38,7 @@ public class CrawlJobExtractorActor extends AbstractStateGraph { private final ExecutorService executor = Executors.newSingleThreadExecutor(); @Inject - public CrawlJobExtractorActor(StateFactory stateFactory, + public CrawlJobExtractorActor(ActorStateFactory stateFactory, ProcessService processService, FileStorageService fileStorageService, ControlFileStorageService controlFileStorageService @@ -57,8 +57,8 @@ public class CrawlJobExtractorActor extends AbstractStateGraph { return "Run the crawler job extractor process"; } - @GraphState(name = CREATE_FROM_LINK, next = END, - resume = ResumeBehavior.ERROR, + @ActorState(name = CREATE_FROM_LINK, next = END, + resume = ActorResumeBehavior.ERROR, description = """ Download a list of URLs as provided, and then spawn a CrawlJobExtractor process, @@ -92,8 +92,8 @@ public class CrawlJobExtractorActor extends AbstractStateGraph { } - @GraphState(name = CREATE_FROM_DB, next = END, - resume = ResumeBehavior.ERROR, + @ActorState(name = CREATE_FROM_DB, next = END, + resume = ActorResumeBehavior.ERROR, description = """ Spawns a CrawlJobExtractor process that loads data from the link database, and wait for it to finish. """ diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ExportDataActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ExportDataActor.java index b82b2278..5e2a3cdd 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ExportDataActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ExportDataActor.java @@ -6,13 +6,13 @@ import com.zaxxer.hikari.HikariDataSource; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.With; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageType; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +25,7 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.zip.GZIPOutputStream; @Singleton -public class ExportDataActor extends AbstractStateGraph { +public class ExportDataActor extends AbstractActorPrototype { private static final String blacklistFilename = "blacklist.csv.gz"; private static final String domainsFilename = "domains.csv.gz"; @@ -54,7 +54,7 @@ public class ExportDataActor extends AbstractStateGraph { } @Inject - public ExportDataActor(StateFactory stateFactory, + public ExportDataActor(ActorStateFactory stateFactory, FileStorageService storageService, HikariDataSource dataSource) { @@ -63,7 +63,7 @@ public class ExportDataActor extends AbstractStateGraph { this.dataSource = dataSource; } - @GraphState(name = INITIAL, + @ActorState(name = INITIAL, next = EXPORT_BLACKLIST, description = """ Find EXPORT storage area, then transition to EXPORT-BLACKLIST. @@ -76,9 +76,9 @@ public class ExportDataActor extends AbstractStateGraph { return new Message().withStorageId(storage.id()); } - @GraphState(name = EXPORT_BLACKLIST, + @ActorState(name = EXPORT_BLACKLIST, next = EXPORT_DOMAINS, - resume = ResumeBehavior.ERROR, + resume = ActorResumeBehavior.ERROR, description = """ Export the blacklist from the database to the EXPORT storage area. """ @@ -112,10 +112,10 @@ public class ExportDataActor extends AbstractStateGraph { return message; } - @GraphState( + @ActorState( name = EXPORT_DOMAINS, next = EXPORT_LINK_GRAPH, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Export known domains to the EXPORT storage area. """ @@ -155,10 +155,10 @@ public class ExportDataActor extends AbstractStateGraph { return message; } - @GraphState( + @ActorState( name = EXPORT_LINK_GRAPH, next = END, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Export known domains to the EXPORT storage area. """ diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/RecrawlActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/RecrawlActor.java index 071e61db..2351e1aa 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/RecrawlActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/RecrawlActor.java @@ -6,6 +6,7 @@ import com.google.inject.Singleton; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.With; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.control.process.ProcessOutboxes; import nu.marginalia.control.process.ProcessService; import nu.marginalia.db.storage.FileStorageService; @@ -15,17 +16,16 @@ import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mqapi.crawling.CrawlRequest; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import java.nio.file.Files; import java.sql.SQLException; import java.util.Optional; @Singleton -public class RecrawlActor extends AbstractStateGraph { +public class RecrawlActor extends AbstractActorPrototype { // STATES @@ -58,7 +58,7 @@ public class RecrawlActor extends AbstractStateGraph { } @Inject - public RecrawlActor(StateFactory stateFactory, + public RecrawlActor(ActorStateFactory stateFactory, ActorProcessWatcher processWatcher, ProcessOutboxes processOutboxes, FileStorageService storageService, @@ -72,7 +72,7 @@ public class RecrawlActor extends AbstractStateGraph { this.gson = gson; } - @GraphState(name = INITIAL, + @ActorState(name = INITIAL, next = CRAWL, description = """ Validate the input and transition to CRAWL @@ -110,9 +110,9 @@ public class RecrawlActor extends AbstractStateGraph { .findFirst(); } - @GraphState(name = CRAWL, + @ActorState(name = CRAWL, next = CRAWL_WAIT, - resume = ResumeBehavior.ERROR, + resume = ActorResumeBehavior.ERROR, description = """ Send a crawl request to the crawler and transition to CRAWL_WAIT. """ @@ -125,10 +125,10 @@ public class RecrawlActor extends AbstractStateGraph { return recrawlMessage.withCrawlerMsgId(id); } - @GraphState( + @ActorState( name = CRAWL_WAIT, next = END, - resume = ResumeBehavior.RETRY, + resume = ActorResumeBehavior.RETRY, description = """ Wait for the crawler to finish retrieving the data. """ diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/TriggerAdjacencyCalculationActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/TriggerAdjacencyCalculationActor.java index ca78d871..0082c024 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/TriggerAdjacencyCalculationActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/TriggerAdjacencyCalculationActor.java @@ -2,11 +2,11 @@ package nu.marginalia.control.actor.task; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.control.process.ProcessService; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,7 +15,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @Singleton -public class TriggerAdjacencyCalculationActor extends AbstractStateGraph { +public class TriggerAdjacencyCalculationActor extends AbstractActorPrototype { private final Logger logger = LoggerFactory.getLogger(getClass()); // STATES @@ -26,7 +26,7 @@ public class TriggerAdjacencyCalculationActor extends AbstractStateGraph { private final ExecutorService executor = Executors.newSingleThreadExecutor(); @Inject - public TriggerAdjacencyCalculationActor(StateFactory stateFactory, + public TriggerAdjacencyCalculationActor(ActorStateFactory stateFactory, ProcessService processService) { super(stateFactory); this.processService = processService; @@ -37,8 +37,8 @@ public class TriggerAdjacencyCalculationActor extends AbstractStateGraph { return "Calculate website similarities"; } - @GraphState(name = INITIAL, next = END, - resume = ResumeBehavior.ERROR, + @ActorState(name = INITIAL, next = END, + resume = ActorResumeBehavior.ERROR, description = """ Spawns a WebsitesAdjacenciesCalculator process and waits for it to finish. """ diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/TruncateLinkDatabase.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/TruncateLinkDatabase.java index 90d449c3..f44545b9 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/TruncateLinkDatabase.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/TruncateLinkDatabase.java @@ -6,18 +6,18 @@ import com.zaxxer.hikari.HikariDataSource; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.With; +import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.db.storage.model.FileStorageId; -import nu.marginalia.mqsm.StateFactory; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorResumeBehavior; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.SQLException; @Singleton -public class TruncateLinkDatabase extends AbstractStateGraph { +public class TruncateLinkDatabase extends AbstractActorPrototype { // STATES @@ -39,14 +39,14 @@ public class TruncateLinkDatabase extends AbstractStateGraph { } @Inject - public TruncateLinkDatabase(StateFactory stateFactory, + public TruncateLinkDatabase(ActorStateFactory stateFactory, HikariDataSource dataSource) { super(stateFactory); this.dataSource = dataSource; } - @GraphState(name = INITIAL, + @ActorState(name = INITIAL, next = FLUSH_DATABASE, description = """ Initial stage @@ -55,9 +55,9 @@ public class TruncateLinkDatabase extends AbstractStateGraph { } - @GraphState(name = FLUSH_DATABASE, + @ActorState(name = FLUSH_DATABASE, next = END, - resume = ResumeBehavior.ERROR, + resume = ActorResumeBehavior.ERROR, description = """ Truncate the domain and link tables. """ diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ActorState.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ActorState.java index 676f3ed2..7d4681ee 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ActorState.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ActorState.java @@ -1,7 +1,5 @@ package nu.marginalia.control.model; -import nu.marginalia.mqsm.graph.GraphState; - import java.util.Arrays; import java.util.List; import java.util.stream.Stream; @@ -10,7 +8,7 @@ public record ActorState(String name, boolean current, List transitions, String description) { - public ActorState(GraphState gs, boolean current) { + public ActorState(nu.marginalia.actor.state.ActorState gs, boolean current) { this(gs.name(), current, toTransitions(gs.next(), gs.transitions()), gs.description()); } private static List toTransitions(String next, String[] transitions) { diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ActorStateGraph.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ActorStateGraph.java index 34caef6f..757bdd9a 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ActorStateGraph.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ActorStateGraph.java @@ -1,27 +1,26 @@ package nu.marginalia.control.model; -import nu.marginalia.mqsm.graph.AbstractStateGraph; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.state.MachineState; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorStateInstance; import java.util.*; -import java.util.stream.Collectors; -public record ActorStateGraph(String description, List states) { +public record ActorStateGraph(String description, List states) { - public ActorStateGraph(AbstractStateGraph graph, MachineState currentState) { + public ActorStateGraph(AbstractActorPrototype graph, ActorStateInstance currentState) { this(graph.describe(), getStateList(graph, currentState)); } - private static List getStateList( - AbstractStateGraph graph, - MachineState currentState) + private static List getStateList( + AbstractActorPrototype graph, + ActorStateInstance currentState) { - Map declaredStates = graph.declaredStates(); - Set seenStates = new HashSet<>(declaredStates.size()); - LinkedList edge = new LinkedList<>(); + Map declaredStates = graph.declaredStates(); + Set seenStates = new HashSet<>(declaredStates.size()); + LinkedList edge = new LinkedList<>(); - List statesList = new ArrayList<>(declaredStates.size()); + List statesList = new ArrayList<>(declaredStates.size()); edge.add(declaredStates.get("INITIAL")); @@ -30,7 +29,7 @@ public record ActorStateGraph(String description, List states) { if (first == null || !seenStates.add(first)) { continue; } - statesList.add(new ActorState(first, currentState.name().equals(first.name()))); + statesList.add(new nu.marginalia.control.model.ActorState(first, currentState.name().equals(first.name()))); edge.add(declaredStates.get(first.next())); @@ -40,10 +39,10 @@ public record ActorStateGraph(String description, List states) { } if (!declaredStates.containsKey("ERROR")) { - statesList.add(new ActorState("ERROR", currentState.name().equals("ERROR"), List.of(), "Terminal error state")); + statesList.add(new nu.marginalia.control.model.ActorState("ERROR", currentState.name().equals("ERROR"), List.of(), "Terminal error state")); } if (!declaredStates.containsKey("END")) { - statesList.add(new ActorState("END", currentState.name().equals("END"), List.of(), "The machine terminated successfully")); + statesList.add(new nu.marginalia.control.model.ActorState("END", currentState.name().equals("END"), List.of(), "The machine terminated successfully")); } return statesList; diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java index 03973a62..7507e3d1 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java @@ -11,8 +11,8 @@ import nu.marginalia.control.actor.Actor; import nu.marginalia.control.model.ActorRunState; import nu.marginalia.control.model.ActorStateGraph; import nu.marginalia.db.storage.model.FileStorageId; -import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.state.MachineState; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.actor.state.ActorStateInstance; import spark.Request; import spark.Response; @@ -105,7 +105,7 @@ public class ControlActorService { final var stateGraph = controlActors.getActorDefinition(e.getKey()); - final MachineState state = e.getValue(); + final ActorStateInstance state = e.getValue(); final String actorDescription = stateGraph.describe(); final String machineName = e.getKey().name(); @@ -114,7 +114,7 @@ public class ControlActorService { final String stateDescription = actorStateDescriptions.computeIfAbsent( (machineName + "." + stateName), k -> Optional.ofNullable(stateGraph.declaredStates().get(stateName)) - .map(GraphState::description) + .map(ActorState::description) .orElse("Description missing for " + stateName) ); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/ControlFileStorageService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/ControlFileStorageService.java index f80287f4..65be9614 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/ControlFileStorageService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/svc/ControlFileStorageService.java @@ -7,7 +7,6 @@ import lombok.SneakyThrows; import nu.marginalia.control.model.*; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.model.*; -import nu.marginalia.mqsm.graph.AbstractStateGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import spark.Request; diff --git a/settings.gradle b/settings.gradle index be5dd603..361c3ec6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -20,6 +20,8 @@ include 'code:libraries:braille-block-punch-cards' include 'code:libraries:language-processing' include 'code:libraries:term-frequency-dict' +include 'code:libraries:message-queue' + include 'code:features-search:screenshots' include 'code:features-search:random-websites' include 'code:features-search:query-parser' @@ -49,7 +51,6 @@ include 'code:api:process-mqapi' include 'code:common:service-discovery' include 'code:common:service-client' include 'code:common:db' -include 'code:common:message-queue' include 'code:common:service' include 'code:common:config' include 'code:common:model'