(mq) Refactor mq and actor library and move it to libraries out of common

This commit is contained in:
Viktor Lofgren 2023-08-15 10:53:23 +02:00
parent 019b61b330
commit e7192a9cad
69 changed files with 652 additions and 580 deletions

View File

@ -16,7 +16,7 @@ dependencies {
implementation project(':code:common:config') implementation project(':code:common:config')
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client') implementation project(':code:common:service-client')
implementation project(':code:common:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:features-index:index-query') implementation project(':code:features-index:index-query')
implementation libs.lombok implementation libs.lombok

View File

@ -14,7 +14,7 @@ java {
dependencies { dependencies {
implementation project(':code:common:model') implementation project(':code:common:model')
implementation project(':code:common:config') implementation project(':code:common:config')
implementation project(':code:common:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client') implementation project(':code:common:service-client')

View File

@ -1,178 +0,0 @@
package nu.marginalia.mqsm.graph;
import nu.marginalia.mqsm.state.MachineState;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.state.StateTransition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
public abstract class AbstractStateGraph {
private final StateFactory stateFactory;
private static final Logger logger = LoggerFactory.getLogger(AbstractStateGraph.class);
public AbstractStateGraph(StateFactory stateFactory) {
this.stateFactory = stateFactory;
}
/** User-facing description of the actor. */
public abstract String describe();
public void transition(String state) {
throw new ControlFlowException(state, null);
}
public <T> void transition(String state, T payload) {
throw new ControlFlowException(state, payload);
}
public void error() {
throw new ControlFlowException("ERROR", "");
}
public <T> void error(T payload) {
throw new ControlFlowException("ERROR", payload);
}
public void error(Exception ex) {
throw new ControlFlowException("ERROR", ex.getClass().getSimpleName() + ":" + ex.getMessage());
}
/** Check whether there is an INITIAL state that can be directly initialized
* without declared parameters. */
public boolean isDirectlyInitializable() {
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(GraphState.class);
if (gs == null) {
continue;
}
if ("INITIAL".equals(gs.name()) && method.getParameterCount() == 0) {
return true;
}
}
return false;
}
public Map<String, GraphState> declaredStates() {
Map<String, GraphState> ret = new HashMap<>();
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(GraphState.class);
if (gs != null) {
ret.put(gs.name(), gs);
}
}
return ret;
}
public Set<TerminalGraphState> terminalStates() {
Set<TerminalGraphState> ret = new HashSet<>();
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(TerminalGraphState.class);
if (gs != null) {
ret.add(gs);
}
}
return ret;
}
public List<MachineState> asStateList() {
List<MachineState> ret = new ArrayList<>();
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(GraphState.class);
if (gs != null) {
ret.add(graphState(method, gs));
}
var ts = method.getAnnotation(TerminalGraphState.class);
if (ts != null) {
ret.add(stateFactory.create(ts.name(), ResumeBehavior.ERROR, () -> {
throw new ControlFlowException(ts.name(), null);
}));
}
}
return ret;
}
private MachineState graphState(Method method, GraphState gs) {
var parameters = method.getParameterTypes();
boolean returnsVoid = method.getGenericReturnType().equals(Void.TYPE);
if (parameters.length == 0) {
return stateFactory.create(gs.name(), gs.resume(), () -> {
try {
if (returnsVoid) {
method.invoke(this);
return StateTransition.to(gs.next());
} else {
Object ret = method.invoke(this);
return stateFactory.transition(gs.next(), ret);
}
}
catch (Exception e) {
return invocationExceptionToStateTransition(gs.name(), e);
}
});
}
else if (parameters.length == 1) {
return stateFactory.create(gs.name(), gs.resume(), parameters[0], (param) -> {
try {
if (returnsVoid) {
method.invoke(this, param);
return StateTransition.to(gs.next());
} else {
Object ret = method.invoke(this, param);
return stateFactory.transition(gs.next(), ret);
}
} catch (Exception e) {
return invocationExceptionToStateTransition(gs.name(), e);
}
});
}
else {
// We permit only @GraphState-annotated methods like this:
//
// void foo();
// void foo(Object bar);
// Object foo();
// Object foo(Object bar);
throw new IllegalStateException("StateGraph " +
getClass().getSimpleName() +
" has invalid method signature for method " +
method.getName() +
": Expected 0 or 1 parameter(s) but found " +
Arrays.toString(parameters));
}
}
private StateTransition invocationExceptionToStateTransition(String state, Throwable ex) {
while (ex instanceof InvocationTargetException e) {
if (e.getCause() != null) ex = ex.getCause();
}
if (ex instanceof ControlFlowException cfe) {
return stateFactory.transition(cfe.getState(), cfe.getPayload());
}
else if (ex instanceof InterruptedException intE) {
logger.error("State execution was interrupted " + state);
return StateTransition.to("ERR", "Execution interrupted");
}
else {
logger.error("Error in state invocation " + state, ex);
return StateTransition.to("ERROR",
"Exception: " + ex.getClass().getSimpleName() + "/" + ex.getMessage());
}
}
}

View File

@ -1,22 +0,0 @@
package nu.marginalia.mqsm.graph;
/** Exception thrown by a state to indicate that the state machine should jump to a different state. */
public class ControlFlowException extends RuntimeException {
private final String state;
private final Object payload;
public ControlFlowException(String state, Object payload) {
this.state = state;
this.payload = payload;
}
public String getState() {
return state;
}
public Object getPayload() {
return payload;
}
public StackTraceElement[] getStackTrace() { return new StackTraceElement[0]; }
}

View File

@ -1,14 +0,0 @@
package nu.marginalia.mqsm.state;
import nu.marginalia.mqsm.graph.ResumeBehavior;
public interface MachineState {
String name();
StateTransition next(String message);
ResumeBehavior resumeBehavior();
boolean isFinal();
}

View File

@ -1,11 +0,0 @@
package nu.marginalia.mqsm.state;
public record StateTransition(String state, String message) {
public static StateTransition to(String state) {
return new StateTransition(state, "");
}
public static StateTransition to(String state, String message) {
return new StateTransition(state, message);
}
}

View File

@ -12,7 +12,7 @@ java {
dependencies { dependencies {
implementation project(':code:common:service-client') implementation project(':code:common:service-client')
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:common:db') implementation project(':code:common:db')
implementation libs.lombok implementation libs.lombok

View File

@ -10,10 +10,6 @@ java {
} }
dependencies { dependencies {
implementation project(':code:common:service-client')
implementation project(':code:common:service-discovery')
implementation project(':code:common:db')
implementation libs.lombok implementation libs.lombok
annotationProcessor libs.lombok annotationProcessor libs.lombok
@ -22,7 +18,6 @@ dependencies {
implementation libs.gson implementation libs.gson
implementation libs.rxjava implementation libs.rxjava
implementation libs.bundles.prometheus
implementation libs.bundles.slf4j implementation libs.bundles.slf4j
implementation libs.bucket4j implementation libs.bucket4j
@ -32,6 +27,7 @@ dependencies {
testImplementation libs.bundles.slf4j.test testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit testImplementation libs.bundles.junit
testImplementation libs.mockito testImplementation libs.mockito
testImplementation project(':code:common:db')
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation 'org.testcontainers:mariadb:1.17.4' testImplementation 'org.testcontainers:mariadb:1.17.4'

View File

Before

Width:  |  Height:  |  Size: 18 KiB

After

Width:  |  Height:  |  Size: 18 KiB

View File

@ -26,25 +26,25 @@ The inbox implementations as well as the outbox can be constructed via the `Mess
The MQSM is a finite state machine that is backed by the message queue used to implement an Actor style paradigm. The MQSM is a finite state machine that is backed by the message queue used to implement an Actor style paradigm.
The machine itself is defined through a class that extends the 'AbstractStateGraph'; with state transitions and The machine itself is defined through a class that extends the 'AbstractActorPrototype'; with state transitions and
names defined as implementations. names defined as implementations.
Example: Example:
```java ```java
class ExampleStateMachine extends AbstractStateGraph { class ExampleStateMachine extends AbstractActorPrototype {
@GraphState(name = "INITIAL", next="GREET") @ActorState(name = "INITIAL", next="GREET")
public void initial() { public void initial() {
return "World"; // passed to the next state return "World"; // passed to the next state
} }
@GraphState(name = "GREET", next="COUNT-TO-FIVE") @ActorState(name = "GREET", next="COUNT-TO-FIVE")
public void greet(String name) { public void greet(String name) {
System.out.println("Hello " + name); System.out.println("Hello " + name);
} }
@GraphState(name = "COUNT-TO-FIVE", next="END") @ActorState(name = "COUNT-TO-FIVE", next="END")
public void countToFive(Integer value) { public void countToFive(Integer value) {
// value is passed from the previous state, since greet didn't pass a value, // value is passed from the previous state, since greet didn't pass a value,
// null will be the default. // null will be the default.
@ -69,7 +69,7 @@ class ExampleStateMachine extends AbstractStateGraph {
// Default transition is to END // Default transition is to END
} }
@GraphState(name="END") @ActorState(name="END")
public void end() { public void end() {
System.out.println("Done"); System.out.println("Done");
} }

View File

@ -1,34 +1,33 @@
package nu.marginalia.mqsm; package nu.marginalia.actor;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException; import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject; import nu.marginalia.actor.state.ActorResumeBehavior;
import com.google.inject.Singleton; import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.actor.state.ActorStateTransition;
import nu.marginalia.mqsm.state.MachineState;
import nu.marginalia.mqsm.state.StateTransition;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
@Singleton /** Factory for creating actor state instances. You probably don't want to use this directly.
public class StateFactory { * <p>
* Use AbstractStatePrototype instead. */
public class ActorStateFactory {
private final Gson gson; private final Gson gson;
@Inject public ActorStateFactory(Gson gson) {
public StateFactory(Gson gson) {
this.gson = gson; this.gson = gson;
} }
public <T> MachineState create(String name, ResumeBehavior resumeBehavior, Class<T> param, Function<T, StateTransition> logic) { public <T> ActorStateInstance create(String name, ActorResumeBehavior resumeBehavior, Class<T> param, Function<T, ActorStateTransition> logic) {
return new MachineState() { return new ActorStateInstance() {
@Override @Override
public String name() { public String name() {
return name; return name;
} }
@Override @Override
public StateTransition next(String message) { public ActorStateTransition next(String message) {
if (message.isEmpty()) { if (message.isEmpty()) {
return logic.apply(null); return logic.apply(null);
@ -45,7 +44,7 @@ public class StateFactory {
} }
@Override @Override
public ResumeBehavior resumeBehavior() { public ActorResumeBehavior resumeBehavior() {
return resumeBehavior; return resumeBehavior;
} }
@ -56,21 +55,21 @@ public class StateFactory {
}; };
} }
public MachineState create(String name, ResumeBehavior resumeBehavior, Supplier<StateTransition> logic) { public ActorStateInstance create(String name, ActorResumeBehavior actorResumeBehavior, Supplier<ActorStateTransition> logic) {
return new MachineState() { return new ActorStateInstance() {
@Override @Override
public String name() { public String name() {
return name; return name;
} }
@Override @Override
public StateTransition next(String message) { public ActorStateTransition next(String message) {
return logic.get(); return logic.get();
} }
@Override @Override
public ResumeBehavior resumeBehavior() { public ActorResumeBehavior resumeBehavior() {
return resumeBehavior; return actorResumeBehavior;
} }
@Override @Override
@ -80,62 +79,62 @@ public class StateFactory {
}; };
} }
public StateTransition transition(String state) { public ActorStateTransition transition(String state) {
return StateTransition.to(state); return ActorStateTransition.to(state);
} }
public StateTransition transition(String state, Object message) { public ActorStateTransition transition(String state, Object message) {
if (null == message) { if (null == message) {
return StateTransition.to(state); return ActorStateTransition.to(state);
} }
return StateTransition.to(state, gson.toJson(message)); return ActorStateTransition.to(state, gson.toJson(message));
} }
public static class ErrorState implements MachineState { static class ErrorStateInstance implements ActorStateInstance {
@Override @Override
public String name() { return "ERROR"; } public String name() { return "ERROR"; }
@Override @Override
public StateTransition next(String message) { public ActorStateTransition next(String message) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } public ActorResumeBehavior resumeBehavior() { return ActorResumeBehavior.RETRY; }
@Override @Override
public boolean isFinal() { return true; } public boolean isFinal() { return true; }
} }
public static class FinalState implements MachineState { static class FinalState implements ActorStateInstance {
@Override @Override
public String name() { return "END"; } public String name() { return "END"; }
@Override @Override
public StateTransition next(String message) { public ActorStateTransition next(String message) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } public ActorResumeBehavior resumeBehavior() { return ActorResumeBehavior.RETRY; }
@Override @Override
public boolean isFinal() { return true; } public boolean isFinal() { return true; }
} }
public static class ResumingState implements MachineState { static class ResumingState implements ActorStateInstance {
@Override @Override
public String name() { return "RESUMING"; } public String name() { return "RESUMING"; }
@Override @Override
public StateTransition next(String message) { public ActorStateTransition next(String message) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } public ActorResumeBehavior resumeBehavior() { return ActorResumeBehavior.RETRY; }
@Override @Override
public boolean isFinal() { return false; } public boolean isFinal() { return false; }

View File

@ -1,5 +1,6 @@
package nu.marginalia.mqsm; package nu.marginalia.actor;
import nu.marginalia.actor.prototype.ActorPrototype;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
@ -7,9 +8,8 @@ import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSubscription; import nu.marginalia.mq.inbox.MqSubscription;
import nu.marginalia.mq.inbox.MqSynchronousInbox; import nu.marginalia.mq.inbox.MqSynchronousInbox;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.*;
import nu.marginalia.mqsm.state.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -30,23 +30,23 @@ public class ActorStateMachine {
private final String queueName; private final String queueName;
private volatile MachineState state; private volatile ActorStateInstance state;
private volatile ExpectedMessage expectedMessage = ExpectedMessage.anyUnrelated(); private volatile ExpectedMessage expectedMessage = ExpectedMessage.anyUnrelated();
private final MachineState errorState = new StateFactory.ErrorState(); private final ActorStateInstance errorState = new ActorStateFactory.ErrorStateInstance();
private final MachineState finalState = new StateFactory.FinalState(); private final ActorStateInstance finalState = new ActorStateFactory.FinalState();
private final MachineState resumingState = new StateFactory.ResumingState(); private final ActorStateInstance resumingState = new ActorStateFactory.ResumingState();
private final List<BiConsumer<String, String>> stateChangeListeners = new ArrayList<>(); private final List<BiConsumer<String, String>> stateChangeListeners = new ArrayList<>();
private final Map<String, MachineState> allStates = new HashMap<>(); private final Map<String, ActorStateInstance> allStates = new HashMap<>();
private final boolean isDirectlyInitializable; private final boolean isDirectlyInitializable;
public ActorStateMachine(MessageQueueFactory messageQueueFactory, public ActorStateMachine(MessageQueueFactory messageQueueFactory,
String queueName, String queueName,
UUID instanceUUID, UUID instanceUUID,
AbstractStateGraph stateGraph) ActorPrototype statePrototype)
{ {
this.queueName = queueName; this.queueName = queueName;
@ -56,10 +56,10 @@ public class ActorStateMachine {
smInbox.subscribe(new StateEventSubscription()); smInbox.subscribe(new StateEventSubscription());
registerStates(List.of(errorState, finalState, resumingState)); registerStates(List.of(errorState, finalState, resumingState));
registerStates(stateGraph); registerStates(statePrototype);
isDirectlyInitializable = stateGraph.isDirectlyInitializable(); isDirectlyInitializable = statePrototype.isDirectlyInitializable();
stateGraph.declaredStates().forEach((name, declaredState) -> { statePrototype.declaredStates().forEach((name, declaredState) -> {
if (!allStates.containsKey(name)) { if (!allStates.containsKey(name)) {
throw new IllegalArgumentException("State " + name + " is not defined in the state graph"); throw new IllegalArgumentException("State " + name + " is not defined in the state graph");
} }
@ -84,14 +84,14 @@ public class ActorStateMachine {
} }
/** Register the state graph */ /** Register the state graph */
void registerStates(List<MachineState> states) { void registerStates(List<ActorStateInstance> states) {
for (var state : states) { for (var state : states) {
allStates.put(state.name(), state); allStates.put(state.name(), state);
} }
} }
/** Register the state graph */ /** Register the state graph */
void registerStates(AbstractStateGraph states) { void registerStates(ActorPrototype states) {
registerStates(states.asStateList()); registerStates(states.asStateList());
} }
@ -128,7 +128,7 @@ public class ActorStateMachine {
/** Initialize the state machine. */ /** Initialize the state machine. */
public void init() throws Exception { public void init() throws Exception {
var transition = StateTransition.to("INITIAL"); var transition = ActorStateTransition.to("INITIAL");
synchronized (this) { synchronized (this) {
this.state = allStates.get(transition.state()); this.state = allStates.get(transition.state());
@ -140,7 +140,7 @@ public class ActorStateMachine {
/** Initialize the state machine. */ /** Initialize the state machine. */
public void initFrom(String firstState) throws Exception { public void initFrom(String firstState) throws Exception {
var transition = StateTransition.to(firstState); var transition = ActorStateTransition.to(firstState);
synchronized (this) { synchronized (this) {
this.state = allStates.get(transition.state()); this.state = allStates.get(transition.state());
@ -152,7 +152,7 @@ public class ActorStateMachine {
/** Initialize the state machine. */ /** Initialize the state machine. */
public void init(String jsonEncodedArgument) throws Exception { public void init(String jsonEncodedArgument) throws Exception {
var transition = StateTransition.to("INITIAL", jsonEncodedArgument); var transition = ActorStateTransition.to("INITIAL", jsonEncodedArgument);
synchronized (this) { synchronized (this) {
this.state = allStates.get(transition.state()); this.state = allStates.get(transition.state());
@ -164,7 +164,7 @@ public class ActorStateMachine {
/** Initialize the state machine. */ /** Initialize the state machine. */
public void initFrom(String state, String jsonEncodedArgument) throws Exception { public void initFrom(String state, String jsonEncodedArgument) throws Exception {
var transition = StateTransition.to(state, jsonEncodedArgument); var transition = ActorStateTransition.to(state, jsonEncodedArgument);
synchronized (this) { synchronized (this) {
this.state = allStates.get(transition.state()); this.state = allStates.get(transition.state());
@ -212,15 +212,15 @@ public class ActorStateMachine {
} }
} }
private void resumeFromAck(MachineState resumeState, private void resumeFromAck(ActorStateInstance resumeState,
MqMessage message) MqMessage message)
{ {
try { try {
if (resumeState.resumeBehavior().equals(ResumeBehavior.ERROR)) { if (resumeState.resumeBehavior().equals(ActorResumeBehavior.ERROR)) {
// The message is acknowledged, but the state does not support resuming // The message is acknowledged, but the state does not support resuming
smOutbox.sendNotice(expectedMessage.id, "ERROR", "Illegal resumption from ACK'ed state " + message.function()); smOutbox.sendNotice(expectedMessage.id, "ERROR", "Illegal resumption from ACK'ed state " + message.function());
} }
else if (resumeState.resumeBehavior().equals(ResumeBehavior.RESTART)) { else if (resumeState.resumeBehavior().equals(ActorResumeBehavior.RESTART)) {
this.state = resumeState; this.state = resumeState;
// The message is already acknowledged, we flag it as dead and then send an identical message // The message is already acknowledged, we flag it as dead and then send an identical message
@ -308,7 +308,7 @@ public class ActorStateMachine {
} }
} }
public MachineState getState() { public ActorStateInstance getState() {
return state; return state;
} }
@ -371,38 +371,3 @@ public class ActorStateMachine {
} }
} }
/** ExpectedMessage guards against spurious state changes being triggered by old messages in the queue
*
* It contains the message id of the last message that was processed, and the messages sent by the state machine to
* itself via the message queue all have relatedId set to expectedMessageId. If the state machine is unitialized or
* in a terminal state, it will accept messages with relatedIds that are equal to -1.
* */
class ExpectedMessage {
public final long id;
public ExpectedMessage(long id) {
this.id = id;
}
public static ExpectedMessage expectThis(MqMessage message) {
return new ExpectedMessage(message.relatedId());
}
public static ExpectedMessage responseTo(MqMessage message) {
return new ExpectedMessage(message.msgId());
}
public static ExpectedMessage anyUnrelated() {
return new ExpectedMessage(-1);
}
public static ExpectedMessage expectId(long id) {
return new ExpectedMessage(id);
}
public boolean isExpected(MqMessage message) {
if (id < 0)
return true;
return id == message.relatedId();
}
}

View File

@ -0,0 +1,41 @@
package nu.marginalia.actor;
import nu.marginalia.mq.MqMessage;
/**
* ExpectedMessage guards against spurious state changes being triggered by old messages in the queue
* <p>
* It contains the message id of the last message that was processed, and the messages sent by the state machine to
* itself via the message queue all have relatedId set to expectedMessageId. If the state machine is unitialized or
* in a terminal state, it will accept messages with relatedIds that are equal to -1.
*/
class ExpectedMessage {
public final long id;
ExpectedMessage(long id) {
this.id = id;
}
public static ExpectedMessage expectThis(MqMessage message) {
return new ExpectedMessage(message.relatedId());
}
public static ExpectedMessage responseTo(MqMessage message) {
return new ExpectedMessage(message.msgId());
}
public static ExpectedMessage anyUnrelated() {
return new ExpectedMessage(-1);
}
public static ExpectedMessage expectId(long id) {
return new ExpectedMessage(id);
}
public boolean isExpected(MqMessage message) {
if (id < 0)
return true;
return id == message.relatedId();
}
}

View File

@ -0,0 +1,237 @@
package nu.marginalia.actor.prototype;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorTerminalState;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.state.ActorStateTransition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
/** Base class for actors. The state graph is defined using public methods
* annotated with {@code @ActorState} and {@code @ActorTerminalState}. This class provide
* a mediation layer that translates these annotations into a state graph
* that can be used by the actor runtime.
* <p> . <p>
* <pre>
* public class MyActor extends AbstractActorPrototype {
* {@code @ActorState(name="INITIAL", next="STATE_1")}
* public void initial() { ... }
* {@code @ActorState(name="STATE_1", next="STATE_N")}
* public void state1() { ... }
* ...
* }
* </pre>
* <p>
* The prototype provides explicit transition() and error() methods that can be used
* to jump to a different state. Each of these methods come with a variant that has a
* parameter. The parameter will be passed as a payload to the next state.
* </p>
* <p>The @ActorState annotation also provides a default next
* state that will be transitioned to automatically when the method returns. If the
* method returns a value, this value will be passed as a payload to the next state,
* and injected as a parameter to the handler method.</p>
* <h2>Caveat</h2>
* The jump functions are implemented using exceptions. This means that if you have
* a {@code try {} catch(Exception e)} block in your code or a {@code @SneakyThrows}
* annotation, you will catch the exception and prevent the transition.
*
*/
public abstract class AbstractActorPrototype implements ActorPrototype {
private final ActorStateFactory stateFactory;
private static final Logger logger = LoggerFactory.getLogger(AbstractActorPrototype.class);
public AbstractActorPrototype(ActorStateFactory stateFactory) {
this.stateFactory = stateFactory;
}
/** Explicitly transition to a different state.
* <p>
* Caveat: This is implemented via an exception. Mind your catch statements. */
public void transition(String state) {
throw new ControlFlowException(state, null);
}
/** Explicitly transition to a different state, encoding a payload.
* <p>
* Caveat: This is implemented via an exception. Mind your catch statements. */
public <T> void transition(String state, T payload) {
throw new ControlFlowException(state, payload);
}
/** Explicitly transition to the error state.
* <p>
* Caveat: This is implemented via an exception. Mind your catch statements. */
public void error() {
throw new ControlFlowException("ERROR", "");
}
/** Explicitly transition to the error state with an error message.
* <p>
* Caveat: This is implemented via an exception. Mind your catch statements. */
public <T> void error(T payload) {
throw new ControlFlowException("ERROR", payload);
}
/** Explicitly transition to the error state.
* <p>
* Caveat: This is implemented via an exception. Mind your catch statements. */
public void error(Exception ex) {
throw new ControlFlowException("ERROR", ex.getClass().getSimpleName() + ":" + ex.getMessage());
}
@Override
public boolean isDirectlyInitializable() {
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(ActorState.class);
if (gs == null) {
continue;
}
if ("INITIAL".equals(gs.name()) && method.getParameterCount() == 0) {
return true;
}
}
return false;
}
@Override
public Map<String, ActorState> declaredStates() {
Map<String, ActorState> ret = new HashMap<>();
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(ActorState.class);
if (gs != null) {
ret.put(gs.name(), gs);
}
}
return ret;
}
/** Compile a list of ActorStateInstances from the @ActorState and @ActorTerminalState annotations.
*/
@Override
public List<ActorStateInstance> asStateList() {
List<ActorStateInstance> ret = new ArrayList<>();
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(ActorState.class);
if (gs != null) {
ret.add(createStateInstance(method, gs));
}
var ts = method.getAnnotation(ActorTerminalState.class);
if (ts != null) {
ret.add(createTerminalStateInstance(ts));
}
}
return ret;
}
private ActorStateInstance createStateInstance(Method method, ActorState gs) {
var parameters = method.getParameterTypes();
boolean returnsVoid = method.getGenericReturnType().equals(Void.TYPE);
if (parameters.length == 0) {
return stateFactory.create(gs.name(), gs.resume(), () -> {
try {
if (returnsVoid) {
method.invoke(this);
return ActorStateTransition.to(gs.next());
} else {
Object ret = method.invoke(this);
return stateFactory.transition(gs.next(), ret);
}
}
catch (Exception e) {
return translateInvocationExceptionToStateTransition(gs.name(), e);
}
});
}
else if (parameters.length == 1) {
return stateFactory.create(gs.name(), gs.resume(), parameters[0], (param) -> {
try {
if (returnsVoid) {
method.invoke(this, param);
return ActorStateTransition.to(gs.next());
} else {
Object ret = method.invoke(this, param);
return stateFactory.transition(gs.next(), ret);
}
}
catch (Exception e) {
return translateInvocationExceptionToStateTransition(gs.name(), e);
}
});
}
else {
// We permit only @ActorState-annotated methods like this:
//
// void foo();
// void foo(Object bar);
// Object foo();
// Object foo(Object bar);
throw new IllegalStateException("ActorStatePrototype " +
getClass().getSimpleName() +
" has invalid method signature for method " +
method.getName() +
": Expected 0 or 1 parameter(s) but found " +
Arrays.toString(parameters));
}
}
private ActorStateInstance createTerminalStateInstance(ActorTerminalState ts) {
final String name = ts.name();
return stateFactory.create(name, ActorResumeBehavior.ERROR, () -> {
throw new ControlFlowException(name, null);
});
}
private ActorStateTransition translateInvocationExceptionToStateTransition(String state, Throwable ex) {
while (ex instanceof InvocationTargetException e) {
if (e.getCause() != null) ex = ex.getCause();
}
if (ex instanceof ControlFlowException cfe) {
return stateFactory.transition(cfe.getState(), cfe.getPayload());
}
else if (ex instanceof InterruptedException intE) {
logger.error("State execution was interrupted " + state);
return ActorStateTransition.to("ERR", "Execution interrupted");
}
else {
logger.error("Error in state invocation " + state, ex);
return ActorStateTransition.to("ERROR",
"Exception: " + ex.getClass().getSimpleName() + "/" + ex.getMessage());
}
}
/** Exception thrown by a state to indicate that the state machine should jump to a different state. */
public static class ControlFlowException extends RuntimeException {
private final String state;
private final Object payload;
public ControlFlowException(String state, Object payload) {
this.state = state;
this.payload = payload;
}
public String getState() {
return state;
}
public Object getPayload() {
return payload;
}
public StackTraceElement[] getStackTrace() { return new StackTraceElement[0]; }
}
}

View File

@ -0,0 +1,25 @@
package nu.marginalia.actor.prototype;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.state.ActorTerminalState;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface ActorPrototype {
/**
* User-facing description of the actor.
*/
String describe();
/** Check whether there is an INITIAL state that can be directly initialized
* without declared parameters. */
boolean isDirectlyInitializable();
Map<String, ActorState> declaredStates();
/** Get or create a list of ActorStateInstances */
List<ActorStateInstance> asStateList();
}

View File

@ -1,6 +1,6 @@
package nu.marginalia.mqsm.graph; package nu.marginalia.actor.state;
public enum ResumeBehavior { public enum ActorResumeBehavior {
/** Retry the state on resume */ /** Retry the state on resume */
RETRY, RETRY,
/** Jump to ERROR on resume if the message has been acknowledged */ /** Jump to ERROR on resume if the message has been acknowledged */

View File

@ -1,4 +1,4 @@
package nu.marginalia.mqsm.graph; package nu.marginalia.actor.state;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
@ -6,10 +6,10 @@ import java.lang.annotation.RetentionPolicy;
/** Annotation for declaring a state in an actor's state graph. */ /** Annotation for declaring a state in an actor's state graph. */
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
public @interface GraphState { public @interface ActorState {
String name(); String name();
String next() default "ERROR"; String next() default "ERROR";
String[] transitions() default {}; String[] transitions() default {};
String description() default ""; String description() default "";
ResumeBehavior resume() default ResumeBehavior.ERROR; ActorResumeBehavior resume() default ActorResumeBehavior.ERROR;
} }

View File

@ -0,0 +1,12 @@
package nu.marginalia.actor.state;
public interface ActorStateInstance {
String name();
ActorStateTransition next(String message);
ActorResumeBehavior resumeBehavior();
boolean isFinal();
}

View File

@ -0,0 +1,11 @@
package nu.marginalia.actor.state;
public record ActorStateTransition(String state, String message) {
public static ActorStateTransition to(String state) {
return new ActorStateTransition(state, "");
}
public static ActorStateTransition to(String state, String message) {
return new ActorStateTransition(state, message);
}
}

View File

@ -1,10 +1,10 @@
package nu.marginalia.mqsm.graph; package nu.marginalia.actor.state;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
public @interface TerminalGraphState { public @interface ActorTerminalState {
String name(); String name();
String description() default ""; String description() default "";
} }

View File

@ -1,15 +1,15 @@
package nu.marginalia.mqsm; package nu.marginalia.actor;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessageRow; import nu.marginalia.mq.MqMessageRow;
import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.MqTestUtil;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.Execution;
import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.containers.MariaDBContainer;
@ -61,21 +61,24 @@ public class ActorStateMachineErrorTest {
dataSource.close(); dataSource.close();
} }
public static class ErrorHurdles extends AbstractStateGraph { public static class ErrorHurdles extends AbstractActorPrototype {
public ErrorHurdles(StateFactory stateFactory) { public ErrorHurdles(ActorStateFactory stateFactory) {
super(stateFactory); super(stateFactory);
} }
@GraphState(name = "INITIAL", next = "FAILING") public String describe() {
return "Test graph";
}
@ActorState(name = "INITIAL", next = "FAILING")
public void initial() { public void initial() {
} }
@GraphState(name = "FAILING", next = "OK", resume = ResumeBehavior.RETRY) @ActorState(name = "FAILING", next = "OK", resume = ActorResumeBehavior.RETRY)
public void resumable() { public void resumable() {
throw new RuntimeException("Boom!"); throw new RuntimeException("Boom!");
} }
@GraphState(name = "OK", next = "END") @ActorState(name = "OK", next = "END")
public void ok() { public void ok() {
} }
@ -84,7 +87,7 @@ public class ActorStateMachineErrorTest {
@Test @Test
public void smResumeResumableFromNew() throws Exception { public void smResumeResumableFromNew() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create()); var stateFactory = new ActorStateFactory(new GsonBuilder().create());
var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ErrorHurdles(stateFactory)); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ErrorHurdles(stateFactory));
sm.init(); sm.init();

View File

@ -1,13 +1,13 @@
package nu.marginalia.mqsm; package nu.marginalia.actor;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.MqTestUtil;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.Execution;
import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.containers.MariaDBContainer;
@ -58,15 +58,18 @@ public class ActorStateMachineNullTest {
dataSource.close(); dataSource.close();
} }
public static class TestGraph extends AbstractStateGraph { public static class TestPrototypeActor extends AbstractActorPrototype {
public TestGraph(StateFactory stateFactory) { public TestPrototypeActor(ActorStateFactory stateFactory) {
super(stateFactory); super(stateFactory);
} }
@GraphState(name = "INITIAL", next = "GREET") public String describe() {
return "Test graph";
}
@ActorState(name = "INITIAL", next = "GREET")
public void initial() {} public void initial() {}
@GraphState(name = "GREET", next = "END") @ActorState(name = "GREET", next = "END")
public void greet(String message) { public void greet(String message) {
if (null == message) { if (null == message) {
System.out.println("Hello, null!"); System.out.println("Hello, null!");
@ -79,8 +82,8 @@ public class ActorStateMachineNullTest {
@Test @Test
public void testStateGraphNullSerialization() throws Exception { public void testStateGraphNullSerialization() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create()); var stateFactory = new ActorStateFactory(new GsonBuilder().create());
var graph = new TestGraph(stateFactory); var graph = new TestPrototypeActor(stateFactory);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), graph); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), graph);

View File

@ -1,16 +1,16 @@
package nu.marginalia.mqsm; package nu.marginalia.actor;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessageRow; import nu.marginalia.mq.MqMessageRow;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.MqTestUtil;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.Execution;
import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.containers.MariaDBContainer;
@ -62,31 +62,34 @@ public class ActorStateMachineResumeTest {
dataSource.close(); dataSource.close();
} }
public static class ResumeTrialsGraph extends AbstractStateGraph { public static class ResumeTrialsPrototypeActor extends AbstractActorPrototype {
public ResumeTrialsGraph(StateFactory stateFactory) { public ResumeTrialsPrototypeActor(ActorStateFactory stateFactory) {
super(stateFactory); super(stateFactory);
} }
@GraphState(name = "INITIAL", next = "RESUMABLE") public String describe() {
return "Test graph";
}
@ActorState(name = "INITIAL", next = "RESUMABLE")
public void initial() {} public void initial() {}
@GraphState(name = "RESUMABLE", next = "NON-RESUMABLE", resume = ResumeBehavior.RETRY) @ActorState(name = "RESUMABLE", next = "NON-RESUMABLE", resume = ActorResumeBehavior.RETRY)
public void resumable() {} public void resumable() {}
@GraphState(name = "NON-RESUMABLE", next = "OK", resume = ResumeBehavior.ERROR) @ActorState(name = "NON-RESUMABLE", next = "OK", resume = ActorResumeBehavior.ERROR)
public void nonResumable() {} public void nonResumable() {}
@GraphState(name = "OK", next = "END") @ActorState(name = "OK", next = "END")
public void ok() {} public void ok() {}
} }
@Test @Test
public void smResumeResumableFromNew() throws Exception { public void smResumeResumableFromNew() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create()); var stateFactory = new ActorStateFactory(new GsonBuilder().create());
persistence.sendNewMessage(inboxId, null, -1L, "RESUMABLE", "", null); persistence.sendNewMessage(inboxId, null, -1L, "RESUMABLE", "", null);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory));
sm.join(2, TimeUnit.SECONDS); sm.join(2, TimeUnit.SECONDS);
sm.stop(); sm.stop();
@ -102,12 +105,12 @@ public class ActorStateMachineResumeTest {
@Test @Test
public void smResumeFromAck() throws Exception { public void smResumeFromAck() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create()); var stateFactory = new ActorStateFactory(new GsonBuilder().create());
long id = persistence.sendNewMessage(inboxId, null, -1L, "RESUMABLE", "", null); long id = persistence.sendNewMessage(inboxId, null, -1L, "RESUMABLE", "", null);
persistence.updateMessageState(id, MqMessageState.ACK); persistence.updateMessageState(id, MqMessageState.ACK);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory));
sm.join(4, TimeUnit.SECONDS); sm.join(4, TimeUnit.SECONDS);
sm.stop(); sm.stop();
@ -124,12 +127,12 @@ public class ActorStateMachineResumeTest {
@Test @Test
public void smResumeNonResumableFromNew() throws Exception { public void smResumeNonResumableFromNew() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create()); var stateFactory = new ActorStateFactory(new GsonBuilder().create());
persistence.sendNewMessage(inboxId, null, -1L, "NON-RESUMABLE", "", null); persistence.sendNewMessage(inboxId, null, -1L, "NON-RESUMABLE", "", null);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory));
sm.join(2, TimeUnit.SECONDS); sm.join(2, TimeUnit.SECONDS);
sm.stop(); sm.stop();
@ -145,13 +148,13 @@ public class ActorStateMachineResumeTest {
@Test @Test
public void smResumeNonResumableFromAck() throws Exception { public void smResumeNonResumableFromAck() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create()); var stateFactory = new ActorStateFactory(new GsonBuilder().create());
long id = persistence.sendNewMessage(inboxId, null, null, "NON-RESUMABLE", "", null); long id = persistence.sendNewMessage(inboxId, null, null, "NON-RESUMABLE", "", null);
persistence.updateMessageState(id, MqMessageState.ACK); persistence.updateMessageState(id, MqMessageState.ACK);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory));
sm.join(2, TimeUnit.SECONDS); sm.join(2, TimeUnit.SECONDS);
sm.stop(); sm.stop();
@ -167,10 +170,10 @@ public class ActorStateMachineResumeTest {
@Test @Test
public void smResumeEmptyQueue() throws Exception { public void smResumeEmptyQueue() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create()); var stateFactory = new ActorStateFactory(new GsonBuilder().create());
var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory));
sm.join(2, TimeUnit.SECONDS); sm.join(2, TimeUnit.SECONDS);
sm.stop(); sm.stop();

View File

@ -1,13 +1,13 @@
package nu.marginalia.mqsm; package nu.marginalia.actor;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.MqTestUtil;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.Execution;
import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.containers.MariaDBContainer;
@ -58,24 +58,27 @@ public class ActorStateMachineTest {
dataSource.close(); dataSource.close();
} }
public static class TestGraph extends AbstractStateGraph { public static class TestPrototypeActor extends AbstractActorPrototype {
public TestGraph(StateFactory stateFactory) { public TestPrototypeActor(ActorStateFactory stateFactory) {
super(stateFactory); super(stateFactory);
} }
public String describe() {
return "Test graph";
}
@GraphState(name = "INITIAL", next = "GREET") @ActorState(name = "INITIAL", next = "GREET")
public String initial() { public String initial() {
return "World"; return "World";
} }
@GraphState(name = "GREET") @ActorState(name = "GREET")
public void greet(String message) { public void greet(String message) {
System.out.println("Hello, " + message + "!"); System.out.println("Hello, " + message + "!");
transition("COUNT-DOWN", 5); transition("COUNT-DOWN", 5);
} }
@GraphState(name = "COUNT-DOWN", next = "END") @ActorState(name = "COUNT-DOWN", next = "END")
public void countDown(Integer from) { public void countDown(Integer from) {
if (from > 0) { if (from > 0) {
System.out.println(from); System.out.println(from);
@ -86,8 +89,8 @@ public class ActorStateMachineTest {
@Test @Test
public void testAnnotatedStateGraph() throws Exception { public void testAnnotatedStateGraph() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create()); var stateFactory = new ActorStateFactory(new GsonBuilder().create());
var graph = new TestGraph(stateFactory); var graph = new TestPrototypeActor(stateFactory);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), graph); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), graph);
@ -104,8 +107,8 @@ public class ActorStateMachineTest {
@Test @Test
public void testStartStopStartStop() throws Exception { public void testStartStopStartStop() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create()); var stateFactory = new ActorStateFactory(new GsonBuilder().create());
var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestGraph(stateFactory)); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestPrototypeActor(stateFactory));
sm.init(); sm.init();
@ -114,7 +117,7 @@ public class ActorStateMachineTest {
System.out.println("-------------------- "); System.out.println("-------------------- ");
var sm2 = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestGraph(stateFactory)); var sm2 = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestPrototypeActor(stateFactory));
sm2.join(2, TimeUnit.SECONDS); sm2.join(2, TimeUnit.SECONDS);
sm2.stop(); sm2.stop();
@ -123,7 +126,7 @@ public class ActorStateMachineTest {
@Test @Test
public void testFalseTransition() throws Exception { public void testFalseTransition() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create()); var stateFactory = new ActorStateFactory(new GsonBuilder().create());
// Prep the queue with a message to set the state to initial, // Prep the queue with a message to set the state to initial,
// and an additional message to trigger the false transition back to initial // and an additional message to trigger the false transition back to initial
@ -131,7 +134,7 @@ public class ActorStateMachineTest {
persistence.sendNewMessage(inboxId, null, null, "INITIAL", "", null); persistence.sendNewMessage(inboxId, null, null, "INITIAL", "", null);
persistence.sendNewMessage(inboxId, null, null, "INITIAL", "", null); persistence.sendNewMessage(inboxId, null, null, "INITIAL", "", null);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestGraph(stateFactory)); var sm = new ActorStateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestPrototypeActor(stateFactory));
Thread.sleep(50); Thread.sleep(50);

View File

@ -11,7 +11,7 @@ These libraries may not depend on features, services, processes, models, etc.
bad support for. It's designed to be able to easily replaced when *Java's Foreign Function And Memory API* is released. bad support for. It's designed to be able to easily replaced when *Java's Foreign Function And Memory API* is released.
* The [btree](btree/) library offers a static BTree implementation based on the array library. * The [btree](btree/) library offers a static BTree implementation based on the array library.
* [language-processing](language-processing/) contains primitives for sentence extraction and POS-tagging. * [language-processing](language-processing/) contains primitives for sentence extraction and POS-tagging.
* The [message-queue](message-queue/) library.
## Micro libraries ## Micro libraries
* [easy-lsh](easy-lsh/) is a simple locality-sensitive hash for document deduplication * [easy-lsh](easy-lsh/) is a simple locality-sensitive hash for document deduplication

View File

@ -34,7 +34,7 @@ dependencies {
implementation project(':code:common:db') implementation project(':code:common:db')
implementation project(':code:common:service') implementation project(':code:common:service')
implementation project(':code:common:config') implementation project(':code:common:config')
implementation project(':code:common:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client') implementation project(':code:common:service-client')

View File

@ -1,5 +1,6 @@
package nu.marginalia.converting.sideload; package nu.marginalia.converting.sideload;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamException;
@ -8,6 +9,7 @@ import java.nio.file.Path;
class StackexchangeSideloaderTest { class StackexchangeSideloaderTest {
@Test @Test
@Disabled
public void test7zFile() throws IOException, XMLStreamException { public void test7zFile() throws IOException, XMLStreamException {
var stackExchangeReader = new StackExchange7zReader(Path.of("/mnt/storage/stackexchange/scifi.meta.stackexchange.com.7z")); var stackExchangeReader = new StackExchange7zReader(Path.of("/mnt/storage/stackexchange/scifi.meta.stackexchange.com.7z"));

View File

@ -30,7 +30,7 @@ dependencies {
implementation project(':code:api:process-mqapi') implementation project(':code:api:process-mqapi')
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client') implementation project(':code:common:service-client')
implementation project(':code:common:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:language-processing') implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh') implementation project(':code:libraries:easy-lsh')
implementation project(':code:process-models:crawling-model') implementation project(':code:process-models:crawling-model')

View File

@ -23,13 +23,13 @@ dependencies {
implementation project(':code:api:index-api') implementation project(':code:api:index-api')
implementation project(':code:common:model') implementation project(':code:common:model')
implementation project(':code:common:db') implementation project(':code:common:db')
implementation project(':code:common:message-queue')
implementation project(':code:common:config') implementation project(':code:common:config')
implementation project(':code:common:service') implementation project(':code:common:service')
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client') implementation project(':code:common:service-client')
implementation project(':code:features-index:lexicon') implementation project(':code:features-index:lexicon')
implementation project(':code:features-index:index-journal') implementation project(':code:features-index:index-journal')
implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:language-processing') implementation project(':code:libraries:language-processing')
implementation project(':third-party:commons-codec') implementation project(':third-party:commons-codec')
testImplementation project(':code:services-core:search-service') testImplementation project(':code:services-core:search-service')

View File

@ -29,7 +29,7 @@ dependencies {
implementation project(':code:common:service') implementation project(':code:common:service')
implementation project(':code:common:config') implementation project(':code:common:config')
implementation project(':code:common:renderer') implementation project(':code:common:renderer')
implementation project(':code:common:message-queue') implementation project(':code:libraries:message-queue')
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client') implementation project(':code:common:service-client')
implementation project(':code:api:search-api') implementation project(':code:api:search-api')

View File

@ -10,9 +10,9 @@ import nu.marginalia.control.actor.monitor.ConverterMonitorActor;
import nu.marginalia.control.actor.monitor.LoaderMonitorActor; import nu.marginalia.control.actor.monitor.LoaderMonitorActor;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqsm.ActorStateMachine; import nu.marginalia.actor.ActorStateMachine;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.state.MachineState; import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.BaseServiceParams;
@ -28,7 +28,7 @@ public class ControlActors {
private final Gson gson; private final Gson gson;
private final MessageQueueFactory messageQueueFactory; private final MessageQueueFactory messageQueueFactory;
public Map<Actor, ActorStateMachine> stateMachines = new HashMap<>(); public Map<Actor, ActorStateMachine> stateMachines = new HashMap<>();
public Map<Actor, AbstractStateGraph> actorDefinitions = new HashMap<>(); public Map<Actor, AbstractActorPrototype> actorDefinitions = new HashMap<>();
@Inject @Inject
public ControlActors(MessageQueueFactory messageQueueFactory, public ControlActors(MessageQueueFactory messageQueueFactory,
@ -71,7 +71,7 @@ public class ControlActors {
register(Actor.TRUNCATE_LINK_DATABASE, truncateLinkDatabase); register(Actor.TRUNCATE_LINK_DATABASE, truncateLinkDatabase);
} }
private void register(Actor process, AbstractStateGraph graph) { private void register(Actor process, AbstractActorPrototype graph) {
var sm = new ActorStateMachine(messageQueueFactory, process.id(), UUID.randomUUID(), graph); var sm = new ActorStateMachine(messageQueueFactory, process.id(), UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function)); sm.listen((function, param) -> logStateChange(process, function));
@ -114,7 +114,7 @@ public class ControlActors {
stateMachines.get(process).abortExecution(); stateMachines.get(process).abortExecution();
} }
public Map<Actor, MachineState> getActorStates() { public Map<Actor, ActorStateInstance> getActorStates() {
return stateMachines.entrySet().stream().collect( return stateMachines.entrySet().stream().collect(
Collectors.toMap( Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().getState()) Map.Entry::getKey, e -> e.getValue().getState())
@ -125,7 +125,7 @@ public class ControlActors {
return actorDefinitions.get(actor).isDirectlyInitializable(); return actorDefinitions.get(actor).isDirectlyInitializable();
} }
public AbstractStateGraph getActorDefinition(Actor actor) { public AbstractActorPrototype getActorDefinition(Actor actor) {
return actorDefinitions.get(actor); return actorDefinitions.get(actor);
} }

View File

@ -2,14 +2,14 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.actor.state.ActorTerminalState;
import nu.marginalia.mqsm.graph.TerminalGraphState;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@Singleton @Singleton
public class AbstractProcessSpawnerActor extends AbstractStateGraph { public class AbstractProcessSpawnerActor extends AbstractActorPrototype {
private final MqPersistence persistence; private final MqPersistence persistence;
private final ProcessService processService; private final ProcessService processService;
@ -45,7 +45,7 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph {
} }
@Inject @Inject
public AbstractProcessSpawnerActor(StateFactory stateFactory, public AbstractProcessSpawnerActor(ActorStateFactory stateFactory,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService, ProcessService processService,
String inboxName, String inboxName,
@ -57,14 +57,14 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph {
this.processId = processId; this.processId = processId;
} }
@GraphState(name = INITIAL, next = MONITOR) @ActorState(name = INITIAL, next = MONITOR)
public void init() { public void init() {
} }
@GraphState(name = MONITOR, @ActorState(name = MONITOR,
next = MONITOR, next = MONITOR,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
transitions = {MONITOR, RUN}, transitions = {MONITOR, RUN},
description = """ description = """
Monitors the inbox of the process for messages. Monitors the inbox of the process for messages.
@ -95,8 +95,8 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph {
} }
} }
@GraphState(name = RUN, @ActorState(name = RUN,
resume = ResumeBehavior.RESTART, resume = ActorResumeBehavior.RESTART,
transitions = {MONITOR, ERROR, RUN, ABORTED}, transitions = {MONITOR, ERROR, RUN, ABORTED},
description = """ description = """
Runs the process. Runs the process.
@ -159,7 +159,7 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph {
} }
} }
@TerminalGraphState(name = ABORTED, description = "The process was manually aborted") @ActorTerminalState(name = ABORTED, description = "The process was manually aborted")
public void aborted() throws Exception {} public void aborted() throws Exception {}

View File

@ -2,17 +2,17 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;
@Singleton @Singleton
public class ConverterMonitorActor extends AbstractProcessSpawnerActor { public class ConverterMonitorActor extends AbstractProcessSpawnerActor {
@Inject @Inject
public ConverterMonitorActor(StateFactory stateFactory, public ConverterMonitorActor(ActorStateFactory stateFactory,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessService processService) {
super(stateFactory, persistence, processService, ProcessInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER); super(stateFactory, persistence, processService, ProcessInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER);

View File

@ -2,16 +2,16 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqsm.StateFactory;
@Singleton @Singleton
public class CrawlerMonitorActor extends AbstractProcessSpawnerActor { public class CrawlerMonitorActor extends AbstractProcessSpawnerActor {
@Inject @Inject
public CrawlerMonitorActor(StateFactory stateFactory, public CrawlerMonitorActor(ActorStateFactory stateFactory,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessService processService) {
super(stateFactory, super(stateFactory,

View File

@ -2,14 +2,14 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageBaseType; import nu.marginalia.db.storage.model.FileStorageBaseType;
import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -22,7 +22,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Singleton @Singleton
public class FileStorageMonitorActor extends AbstractStateGraph { public class FileStorageMonitorActor extends AbstractActorPrototype {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES // STATES
@ -41,19 +41,19 @@ public class FileStorageMonitorActor extends AbstractStateGraph {
} }
@Inject @Inject
public FileStorageMonitorActor(StateFactory stateFactory, public FileStorageMonitorActor(ActorStateFactory stateFactory,
FileStorageService fileStorageService) { FileStorageService fileStorageService) {
super(stateFactory); super(stateFactory);
this.fileStorageService = fileStorageService; this.fileStorageService = fileStorageService;
} }
@GraphState(name = INITIAL, next = MONITOR) @ActorState(name = INITIAL, next = MONITOR)
public void init() { public void init() {
} }
@GraphState(name = MONITOR, @ActorState(name = MONITOR,
next = PURGE, next = PURGE,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
transitions = { PURGE, REMOVE_STALE }, transitions = { PURGE, REMOVE_STALE },
description = """ description = """
Monitor the file storage and trigger at transition to PURGE if any file storage area Monitor the file storage and trigger at transition to PURGE if any file storage area
@ -80,9 +80,9 @@ public class FileStorageMonitorActor extends AbstractStateGraph {
} }
} }
@GraphState(name = PURGE, @ActorState(name = PURGE,
next = MONITOR, next = MONITOR,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Purge the file storage area and transition back to MONITOR. Purge the file storage area and transition back to MONITOR.
""" """
@ -99,10 +99,10 @@ public class FileStorageMonitorActor extends AbstractStateGraph {
fileStorageService.removeFileStorage(storage.id()); fileStorageService.removeFileStorage(storage.id());
} }
@GraphState( @ActorState(
name = REMOVE_STALE, name = REMOVE_STALE,
next = MONITOR, next = MONITOR,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Remove file storage from the database if it doesn't exist on disk. Remove file storage from the database if it doesn't exist on disk.
""" """

View File

@ -2,17 +2,17 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory;
@Singleton @Singleton
public class LoaderMonitorActor extends AbstractProcessSpawnerActor { public class LoaderMonitorActor extends AbstractProcessSpawnerActor {
@Inject @Inject
public LoaderMonitorActor(StateFactory stateFactory, public LoaderMonitorActor(ActorStateFactory stateFactory,
MqPersistence persistence, MqPersistence persistence,
ProcessService processService) { ProcessService processService) {

View File

@ -2,16 +2,16 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Singleton @Singleton
public class MessageQueueMonitorActor extends AbstractStateGraph { public class MessageQueueMonitorActor extends AbstractActorPrototype {
// STATES // STATES
@ -26,17 +26,17 @@ public class MessageQueueMonitorActor extends AbstractStateGraph {
} }
@Inject @Inject
public MessageQueueMonitorActor(StateFactory stateFactory, public MessageQueueMonitorActor(ActorStateFactory stateFactory,
MqPersistence persistence) { MqPersistence persistence) {
super(stateFactory); super(stateFactory);
this.persistence = persistence; this.persistence = persistence;
} }
@GraphState(name = INITIAL, next = MONITOR) @ActorState(name = INITIAL, next = MONITOR)
public void init() { public void init() {
} }
@GraphState(name = MONITOR, next = MONITOR, resume = ResumeBehavior.RETRY, @ActorState(name = MONITOR, next = MONITOR, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Periodically clean up the message queue. Periodically clean up the message queue.
""") """)

View File

@ -2,19 +2,19 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.model.ServiceHeartbeat; import nu.marginalia.control.model.ServiceHeartbeat;
import nu.marginalia.control.svc.HeartbeatService; import nu.marginalia.control.svc.HeartbeatService;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Singleton @Singleton
public class ProcessLivenessMonitorActor extends AbstractStateGraph { public class ProcessLivenessMonitorActor extends AbstractActorPrototype {
// STATES // STATES
@ -26,7 +26,7 @@ public class ProcessLivenessMonitorActor extends AbstractStateGraph {
@Inject @Inject
public ProcessLivenessMonitorActor(StateFactory stateFactory, public ProcessLivenessMonitorActor(ActorStateFactory stateFactory,
ProcessService processService, ProcessService processService,
HeartbeatService heartbeatService) { HeartbeatService heartbeatService) {
super(stateFactory); super(stateFactory);
@ -39,11 +39,11 @@ public class ProcessLivenessMonitorActor extends AbstractStateGraph {
return "Periodically check to ensure that the control service's view of running processes is agreement with the process heartbeats table."; return "Periodically check to ensure that the control service's view of running processes is agreement with the process heartbeats table.";
} }
@GraphState(name = INITIAL, next = MONITOR) @ActorState(name = INITIAL, next = MONITOR)
public void init() { public void init() {
} }
@GraphState(name = MONITOR, next = MONITOR, resume = ResumeBehavior.RETRY, description = """ @ActorState(name = MONITOR, next = MONITOR, resume = ActorResumeBehavior.RETRY, description = """
Periodically check to ensure that the control service's view of Periodically check to ensure that the control service's view of
running processes is agreement with the process heartbeats table. running processes is agreement with the process heartbeats table.

View File

@ -2,10 +2,10 @@ package nu.marginalia.control.actor.task;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqsm.graph.ControlFlowException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -29,10 +29,10 @@ public class ActorProcessWatcher {
* When interrupted, the process is killed and the message is marked as dead. * When interrupted, the process is killed and the message is marked as dead.
*/ */
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long msgId) public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long msgId)
throws ControlFlowException, InterruptedException, SQLException throws AbstractActorPrototype.ControlFlowException, InterruptedException, SQLException
{ {
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
throw new ControlFlowException("ERROR", throw new AbstractActorPrototype.ControlFlowException("ERROR",
"Process " + processId + " did not launch"); "Process " + processId + " did not launch");
} }
@ -52,7 +52,7 @@ public class ActorProcessWatcher {
catch (TimeoutException ex) { catch (TimeoutException ex) {
// Maybe the process died, wait a moment for it to restart // Maybe the process died, wait a moment for it to restart
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
throw new ControlFlowException("ERROR", throw new AbstractActorPrototype.ControlFlowException("ERROR",
"Process " + processId + " died and did not re-launch"); "Process " + processId + " died and did not re-launch");
} }
} }

View File

@ -16,10 +16,10 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.converting.ConvertAction; import nu.marginalia.mqapi.converting.ConvertAction;
import nu.marginalia.mqapi.converting.ConvertRequest; import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.actor.state.ActorResumeBehavior;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -27,7 +27,7 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@Singleton @Singleton
public class ConvertActor extends AbstractStateGraph { public class ConvertActor extends AbstractActorPrototype {
// STATES // STATES
@ -59,7 +59,7 @@ public class ConvertActor extends AbstractStateGraph {
} }
@Inject @Inject
public ConvertActor(StateFactory stateFactory, public ConvertActor(ActorStateFactory stateFactory,
ActorProcessWatcher processWatcher, ActorProcessWatcher processWatcher,
ProcessOutboxes processOutboxes, ProcessOutboxes processOutboxes,
FileStorageService storageService, FileStorageService storageService,
@ -73,15 +73,15 @@ public class ConvertActor extends AbstractStateGraph {
this.gson = gson; this.gson = gson;
} }
@GraphState(name= INITIAL, resume = ResumeBehavior.ERROR, @ActorState(name= INITIAL, resume = ActorResumeBehavior.ERROR,
description = "Pro forma initial state") description = "Pro forma initial state")
public void initial(Integer unused) { public void initial(Integer unused) {
error("This actor does not support the initial state"); error("This actor does not support the initial state");
} }
@GraphState(name = CONVERT, @ActorState(name = CONVERT,
next = CONVERT_WAIT, next = CONVERT_WAIT,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Allocate a storage area for the processed data, Allocate a storage area for the processed data,
then send a convert request to the converter and transition to RECONVERT_WAIT. then send a convert request to the converter and transition to RECONVERT_WAIT.
@ -107,9 +107,9 @@ public class ConvertActor extends AbstractStateGraph {
return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)); return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));
} }
@GraphState(name = CONVERT_ENCYCLOPEDIA, @ActorState(name = CONVERT_ENCYCLOPEDIA,
next = CONVERT_WAIT, next = CONVERT_WAIT,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Allocate a storage area for the processed data, Allocate a storage area for the processed data,
then send a convert request to the converter and transition to RECONVERT_WAIT. then send a convert request to the converter and transition to RECONVERT_WAIT.
@ -138,9 +138,9 @@ public class ConvertActor extends AbstractStateGraph {
return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)); return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));
} }
@GraphState(name = CONVERT_STACKEXCHANGE, @ActorState(name = CONVERT_STACKEXCHANGE,
next = CONVERT_WAIT, next = CONVERT_WAIT,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Allocate a storage area for the processed data, Allocate a storage area for the processed data,
then send a convert request to the converter and transition to RECONVERT_WAIT. then send a convert request to the converter and transition to RECONVERT_WAIT.
@ -169,10 +169,10 @@ public class ConvertActor extends AbstractStateGraph {
return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)); return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));
} }
@GraphState( @ActorState(
name = CONVERT_WAIT, name = CONVERT_WAIT,
next = END, next = END,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Wait for the converter to finish processing the data. Wait for the converter to finish processing the data.
""" """

View File

@ -6,6 +6,7 @@ import com.google.inject.Singleton;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.With; import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.process.ProcessOutboxes; import nu.marginalia.control.process.ProcessOutboxes;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.index.client.IndexClient; import nu.marginalia.index.client.IndexClient;
@ -19,10 +20,9 @@ import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -30,7 +30,7 @@ import java.nio.file.Files;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
@Singleton @Singleton
public class ConvertAndLoadActor extends AbstractStateGraph { public class ConvertAndLoadActor extends AbstractActorPrototype {
// STATES // STATES
@ -69,7 +69,7 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
} }
@Inject @Inject
public ConvertAndLoadActor(StateFactory stateFactory, public ConvertAndLoadActor(ActorStateFactory stateFactory,
ActorProcessWatcher processWatcher, ActorProcessWatcher processWatcher,
ProcessOutboxes processOutboxes, ProcessOutboxes processOutboxes,
FileStorageService storageService, FileStorageService storageService,
@ -86,7 +86,7 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
this.gson = gson; this.gson = gson;
} }
@GraphState(name = INITIAL, @ActorState(name = INITIAL,
next = RECONVERT, next = RECONVERT,
description = """ description = """
Validate the input and transition to RECONVERT Validate the input and transition to RECONVERT
@ -104,9 +104,9 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
return new Message().withCrawlStorageId(crawlStorageId); return new Message().withCrawlStorageId(crawlStorageId);
} }
@GraphState(name = RECONVERT, @ActorState(name = RECONVERT,
next = RECONVERT_WAIT, next = RECONVERT_WAIT,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Allocate a storage area for the processed data, Allocate a storage area for the processed data,
then send a convert request to the converter and transition to RECONVERT_WAIT. then send a convert request to the converter and transition to RECONVERT_WAIT.
@ -135,10 +135,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
.withConverterMsgId(id); .withConverterMsgId(id);
} }
@GraphState( @ActorState(
name = RECONVERT_WAIT, name = RECONVERT_WAIT,
next = LOAD, next = LOAD,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Wait for the converter to finish processing the data. Wait for the converter to finish processing the data.
""" """
@ -153,10 +153,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
} }
@GraphState( @ActorState(
name = LOAD, name = LOAD,
next = LOAD_WAIT, next = LOAD_WAIT,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Send a load request to the loader and transition to LOAD_WAIT. Send a load request to the loader and transition to LOAD_WAIT.
""") """)
@ -169,10 +169,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
} }
@GraphState( @ActorState(
name = LOAD_WAIT, name = LOAD_WAIT,
next = SWAP_LEXICON, next = SWAP_LEXICON,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Wait for the loader to finish loading the data. Wait for the loader to finish loading the data.
""" """
@ -186,10 +186,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
@GraphState( @ActorState(
name = SWAP_LEXICON, name = SWAP_LEXICON,
next = REPARTITION, next = REPARTITION,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Move the lexicon from the LEXICON_STAGING area to the LEXICON_LIVE area, Move the lexicon from the LEXICON_STAGING area to the LEXICON_LIVE area,
then instruct the index-service to reload the lexicon. then instruct the index-service to reload the lexicon.
@ -208,7 +208,7 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
} }
@GraphState( @ActorState(
name = REPARTITION, name = REPARTITION,
next = REPARTITION_WAIT, next = REPARTITION_WAIT,
description = """ description = """
@ -219,10 +219,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, ""); return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, "");
} }
@GraphState( @ActorState(
name = REPARTITION_WAIT, name = REPARTITION_WAIT,
next = REINDEX, next = REINDEX,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Wait for the index-service to finish repartitioning the index. Wait for the index-service to finish repartitioning the index.
""" """
@ -235,7 +235,7 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
} }
} }
@GraphState( @ActorState(
name = REINDEX, name = REINDEX,
next = REINDEX_WAIT, next = REINDEX_WAIT,
description = """ description = """
@ -246,10 +246,10 @@ public class ConvertAndLoadActor extends AbstractStateGraph {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, ""); return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, "");
} }
@GraphState( @ActorState(
name = REINDEX_WAIT, name = REINDEX_WAIT,
next = END, next = END,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Wait for the index-service to finish reindexing the data. Wait for the index-service to finish reindexing the data.
""" """

View File

@ -6,6 +6,7 @@ import com.google.inject.Singleton;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.With; import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.process.ProcessOutboxes; import nu.marginalia.control.process.ProcessOutboxes;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.FileStorageService;
@ -15,15 +16,14 @@ import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.CrawlRequest; import nu.marginalia.mqapi.crawling.CrawlRequest;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@Singleton @Singleton
public class CrawlActor extends AbstractStateGraph { public class CrawlActor extends AbstractActorPrototype {
// STATES // STATES
@ -52,7 +52,7 @@ public class CrawlActor extends AbstractStateGraph {
} }
@Inject @Inject
public CrawlActor(StateFactory stateFactory, public CrawlActor(ActorStateFactory stateFactory,
ProcessOutboxes processOutboxes, ProcessOutboxes processOutboxes,
FileStorageService storageService, FileStorageService storageService,
Gson gson, Gson gson,
@ -65,7 +65,7 @@ public class CrawlActor extends AbstractStateGraph {
this.processWatcher = processWatcher; this.processWatcher = processWatcher;
} }
@GraphState(name = INITIAL, @ActorState(name = INITIAL,
next = CRAWL, next = CRAWL,
description = """ description = """
Validate the input and transition to CRAWL Validate the input and transition to CRAWL
@ -83,9 +83,9 @@ public class CrawlActor extends AbstractStateGraph {
return new Message().withCrawlSpecId(crawlStorageId); return new Message().withCrawlSpecId(crawlStorageId);
} }
@GraphState(name = CRAWL, @ActorState(name = CRAWL,
next = CRAWL_WAIT, next = CRAWL_WAIT,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Allocate a storage area for the crawled data, Allocate a storage area for the crawled data,
then send a crawl request to the crawler and transition to CRAWL_WAIT. then send a crawl request to the crawler and transition to CRAWL_WAIT.
@ -114,10 +114,10 @@ public class CrawlActor extends AbstractStateGraph {
.withCrawlerMsgId(id); .withCrawlerMsgId(id);
} }
@GraphState( @ActorState(
name = CRAWL_WAIT, name = CRAWL_WAIT,
next = END, next = END,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Wait for the crawler to finish retreiving the data. Wait for the crawler to finish retreiving the data.
""" """

View File

@ -2,16 +2,16 @@ package nu.marginalia.control.actor.task;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.svc.ControlFileStorageService; import nu.marginalia.control.svc.ControlFileStorageService;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageBaseType; import nu.marginalia.db.storage.model.FileStorageBaseType;
import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -24,7 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@Singleton @Singleton
public class CrawlJobExtractorActor extends AbstractStateGraph { public class CrawlJobExtractorActor extends AbstractActorPrototype {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES // STATES
@ -38,7 +38,7 @@ public class CrawlJobExtractorActor extends AbstractStateGraph {
private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final ExecutorService executor = Executors.newSingleThreadExecutor();
@Inject @Inject
public CrawlJobExtractorActor(StateFactory stateFactory, public CrawlJobExtractorActor(ActorStateFactory stateFactory,
ProcessService processService, ProcessService processService,
FileStorageService fileStorageService, FileStorageService fileStorageService,
ControlFileStorageService controlFileStorageService ControlFileStorageService controlFileStorageService
@ -57,8 +57,8 @@ public class CrawlJobExtractorActor extends AbstractStateGraph {
return "Run the crawler job extractor process"; return "Run the crawler job extractor process";
} }
@GraphState(name = CREATE_FROM_LINK, next = END, @ActorState(name = CREATE_FROM_LINK, next = END,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Download a list of URLs as provided, Download a list of URLs as provided,
and then spawn a CrawlJobExtractor process, and then spawn a CrawlJobExtractor process,
@ -92,8 +92,8 @@ public class CrawlJobExtractorActor extends AbstractStateGraph {
} }
@GraphState(name = CREATE_FROM_DB, next = END, @ActorState(name = CREATE_FROM_DB, next = END,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Spawns a CrawlJobExtractor process that loads data from the link database, and wait for it to finish. Spawns a CrawlJobExtractor process that loads data from the link database, and wait for it to finish.
""" """

View File

@ -6,13 +6,13 @@ import com.zaxxer.hikari.HikariDataSource;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.With; import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -25,7 +25,7 @@ import java.nio.file.attribute.PosixFilePermissions;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
@Singleton @Singleton
public class ExportDataActor extends AbstractStateGraph { public class ExportDataActor extends AbstractActorPrototype {
private static final String blacklistFilename = "blacklist.csv.gz"; private static final String blacklistFilename = "blacklist.csv.gz";
private static final String domainsFilename = "domains.csv.gz"; private static final String domainsFilename = "domains.csv.gz";
@ -54,7 +54,7 @@ public class ExportDataActor extends AbstractStateGraph {
} }
@Inject @Inject
public ExportDataActor(StateFactory stateFactory, public ExportDataActor(ActorStateFactory stateFactory,
FileStorageService storageService, FileStorageService storageService,
HikariDataSource dataSource) HikariDataSource dataSource)
{ {
@ -63,7 +63,7 @@ public class ExportDataActor extends AbstractStateGraph {
this.dataSource = dataSource; this.dataSource = dataSource;
} }
@GraphState(name = INITIAL, @ActorState(name = INITIAL,
next = EXPORT_BLACKLIST, next = EXPORT_BLACKLIST,
description = """ description = """
Find EXPORT storage area, then transition to EXPORT-BLACKLIST. Find EXPORT storage area, then transition to EXPORT-BLACKLIST.
@ -76,9 +76,9 @@ public class ExportDataActor extends AbstractStateGraph {
return new Message().withStorageId(storage.id()); return new Message().withStorageId(storage.id());
} }
@GraphState(name = EXPORT_BLACKLIST, @ActorState(name = EXPORT_BLACKLIST,
next = EXPORT_DOMAINS, next = EXPORT_DOMAINS,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Export the blacklist from the database to the EXPORT storage area. Export the blacklist from the database to the EXPORT storage area.
""" """
@ -112,10 +112,10 @@ public class ExportDataActor extends AbstractStateGraph {
return message; return message;
} }
@GraphState( @ActorState(
name = EXPORT_DOMAINS, name = EXPORT_DOMAINS,
next = EXPORT_LINK_GRAPH, next = EXPORT_LINK_GRAPH,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Export known domains to the EXPORT storage area. Export known domains to the EXPORT storage area.
""" """
@ -155,10 +155,10 @@ public class ExportDataActor extends AbstractStateGraph {
return message; return message;
} }
@GraphState( @ActorState(
name = EXPORT_LINK_GRAPH, name = EXPORT_LINK_GRAPH,
next = END, next = END,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Export known domains to the EXPORT storage area. Export known domains to the EXPORT storage area.
""" """

View File

@ -6,6 +6,7 @@ import com.google.inject.Singleton;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.With; import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.process.ProcessOutboxes; import nu.marginalia.control.process.ProcessOutboxes;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.FileStorageService;
@ -15,17 +16,16 @@ import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.CrawlRequest; import nu.marginalia.mqapi.crawling.CrawlRequest;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.nio.file.Files; import java.nio.file.Files;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Optional; import java.util.Optional;
@Singleton @Singleton
public class RecrawlActor extends AbstractStateGraph { public class RecrawlActor extends AbstractActorPrototype {
// STATES // STATES
@ -58,7 +58,7 @@ public class RecrawlActor extends AbstractStateGraph {
} }
@Inject @Inject
public RecrawlActor(StateFactory stateFactory, public RecrawlActor(ActorStateFactory stateFactory,
ActorProcessWatcher processWatcher, ActorProcessWatcher processWatcher,
ProcessOutboxes processOutboxes, ProcessOutboxes processOutboxes,
FileStorageService storageService, FileStorageService storageService,
@ -72,7 +72,7 @@ public class RecrawlActor extends AbstractStateGraph {
this.gson = gson; this.gson = gson;
} }
@GraphState(name = INITIAL, @ActorState(name = INITIAL,
next = CRAWL, next = CRAWL,
description = """ description = """
Validate the input and transition to CRAWL Validate the input and transition to CRAWL
@ -110,9 +110,9 @@ public class RecrawlActor extends AbstractStateGraph {
.findFirst(); .findFirst();
} }
@GraphState(name = CRAWL, @ActorState(name = CRAWL,
next = CRAWL_WAIT, next = CRAWL_WAIT,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Send a crawl request to the crawler and transition to CRAWL_WAIT. Send a crawl request to the crawler and transition to CRAWL_WAIT.
""" """
@ -125,10 +125,10 @@ public class RecrawlActor extends AbstractStateGraph {
return recrawlMessage.withCrawlerMsgId(id); return recrawlMessage.withCrawlerMsgId(id);
} }
@GraphState( @ActorState(
name = CRAWL_WAIT, name = CRAWL_WAIT,
next = END, next = END,
resume = ResumeBehavior.RETRY, resume = ActorResumeBehavior.RETRY,
description = """ description = """
Wait for the crawler to finish retrieving the data. Wait for the crawler to finish retrieving the data.
""" """

View File

@ -2,11 +2,11 @@ package nu.marginalia.control.actor.task;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.process.ProcessService; import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -15,7 +15,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@Singleton @Singleton
public class TriggerAdjacencyCalculationActor extends AbstractStateGraph { public class TriggerAdjacencyCalculationActor extends AbstractActorPrototype {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES // STATES
@ -26,7 +26,7 @@ public class TriggerAdjacencyCalculationActor extends AbstractStateGraph {
private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final ExecutorService executor = Executors.newSingleThreadExecutor();
@Inject @Inject
public TriggerAdjacencyCalculationActor(StateFactory stateFactory, public TriggerAdjacencyCalculationActor(ActorStateFactory stateFactory,
ProcessService processService) { ProcessService processService) {
super(stateFactory); super(stateFactory);
this.processService = processService; this.processService = processService;
@ -37,8 +37,8 @@ public class TriggerAdjacencyCalculationActor extends AbstractStateGraph {
return "Calculate website similarities"; return "Calculate website similarities";
} }
@GraphState(name = INITIAL, next = END, @ActorState(name = INITIAL, next = END,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Spawns a WebsitesAdjacenciesCalculator process and waits for it to finish. Spawns a WebsitesAdjacenciesCalculator process and waits for it to finish.
""" """

View File

@ -6,18 +6,18 @@ import com.zaxxer.hikari.HikariDataSource;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.With; import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.mqsm.StateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.SQLException; import java.sql.SQLException;
@Singleton @Singleton
public class TruncateLinkDatabase extends AbstractStateGraph { public class TruncateLinkDatabase extends AbstractActorPrototype {
// STATES // STATES
@ -39,14 +39,14 @@ public class TruncateLinkDatabase extends AbstractStateGraph {
} }
@Inject @Inject
public TruncateLinkDatabase(StateFactory stateFactory, public TruncateLinkDatabase(ActorStateFactory stateFactory,
HikariDataSource dataSource) HikariDataSource dataSource)
{ {
super(stateFactory); super(stateFactory);
this.dataSource = dataSource; this.dataSource = dataSource;
} }
@GraphState(name = INITIAL, @ActorState(name = INITIAL,
next = FLUSH_DATABASE, next = FLUSH_DATABASE,
description = """ description = """
Initial stage Initial stage
@ -55,9 +55,9 @@ public class TruncateLinkDatabase extends AbstractStateGraph {
} }
@GraphState(name = FLUSH_DATABASE, @ActorState(name = FLUSH_DATABASE,
next = END, next = END,
resume = ResumeBehavior.ERROR, resume = ActorResumeBehavior.ERROR,
description = """ description = """
Truncate the domain and link tables. Truncate the domain and link tables.
""" """

View File

@ -1,7 +1,5 @@
package nu.marginalia.control.model; package nu.marginalia.control.model;
import nu.marginalia.mqsm.graph.GraphState;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -10,7 +8,7 @@ public record ActorState(String name,
boolean current, boolean current,
List<String> transitions, List<String> transitions,
String description) { String description) {
public ActorState(GraphState gs, boolean current) { public ActorState(nu.marginalia.actor.state.ActorState gs, boolean current) {
this(gs.name(), current, toTransitions(gs.next(), gs.transitions()), gs.description()); this(gs.name(), current, toTransitions(gs.next(), gs.transitions()), gs.description());
} }
private static List<String> toTransitions(String next, String[] transitions) { private static List<String> toTransitions(String next, String[] transitions) {

View File

@ -1,27 +1,26 @@
package nu.marginalia.control.model; package nu.marginalia.control.model;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.state.MachineState; import nu.marginalia.actor.state.ActorStateInstance;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
public record ActorStateGraph(String description, List<ActorState> states) { public record ActorStateGraph(String description, List<nu.marginalia.control.model.ActorState> states) {
public ActorStateGraph(AbstractStateGraph graph, MachineState currentState) { public ActorStateGraph(AbstractActorPrototype graph, ActorStateInstance currentState) {
this(graph.describe(), getStateList(graph, currentState)); this(graph.describe(), getStateList(graph, currentState));
} }
private static List<ActorState> getStateList( private static List<nu.marginalia.control.model.ActorState> getStateList(
AbstractStateGraph graph, AbstractActorPrototype graph,
MachineState currentState) ActorStateInstance currentState)
{ {
Map<String, GraphState> declaredStates = graph.declaredStates(); Map<String, ActorState> declaredStates = graph.declaredStates();
Set<GraphState> seenStates = new HashSet<>(declaredStates.size()); Set<ActorState> seenStates = new HashSet<>(declaredStates.size());
LinkedList<GraphState> edge = new LinkedList<>(); LinkedList<ActorState> edge = new LinkedList<>();
List<ActorState> statesList = new ArrayList<>(declaredStates.size()); List<nu.marginalia.control.model.ActorState> statesList = new ArrayList<>(declaredStates.size());
edge.add(declaredStates.get("INITIAL")); edge.add(declaredStates.get("INITIAL"));
@ -30,7 +29,7 @@ public record ActorStateGraph(String description, List<ActorState> states) {
if (first == null || !seenStates.add(first)) { if (first == null || !seenStates.add(first)) {
continue; continue;
} }
statesList.add(new ActorState(first, currentState.name().equals(first.name()))); statesList.add(new nu.marginalia.control.model.ActorState(first, currentState.name().equals(first.name())));
edge.add(declaredStates.get(first.next())); edge.add(declaredStates.get(first.next()));
@ -40,10 +39,10 @@ public record ActorStateGraph(String description, List<ActorState> states) {
} }
if (!declaredStates.containsKey("ERROR")) { if (!declaredStates.containsKey("ERROR")) {
statesList.add(new ActorState("ERROR", currentState.name().equals("ERROR"), List.of(), "Terminal error state")); statesList.add(new nu.marginalia.control.model.ActorState("ERROR", currentState.name().equals("ERROR"), List.of(), "Terminal error state"));
} }
if (!declaredStates.containsKey("END")) { if (!declaredStates.containsKey("END")) {
statesList.add(new ActorState("END", currentState.name().equals("END"), List.of(), "The machine terminated successfully")); statesList.add(new nu.marginalia.control.model.ActorState("END", currentState.name().equals("END"), List.of(), "The machine terminated successfully"));
} }
return statesList; return statesList;

View File

@ -11,8 +11,8 @@ import nu.marginalia.control.actor.Actor;
import nu.marginalia.control.model.ActorRunState; import nu.marginalia.control.model.ActorRunState;
import nu.marginalia.control.model.ActorStateGraph; import nu.marginalia.control.model.ActorStateGraph;
import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mqsm.state.MachineState; import nu.marginalia.actor.state.ActorStateInstance;
import spark.Request; import spark.Request;
import spark.Response; import spark.Response;
@ -105,7 +105,7 @@ public class ControlActorService {
final var stateGraph = controlActors.getActorDefinition(e.getKey()); final var stateGraph = controlActors.getActorDefinition(e.getKey());
final MachineState state = e.getValue(); final ActorStateInstance state = e.getValue();
final String actorDescription = stateGraph.describe(); final String actorDescription = stateGraph.describe();
final String machineName = e.getKey().name(); final String machineName = e.getKey().name();
@ -114,7 +114,7 @@ public class ControlActorService {
final String stateDescription = actorStateDescriptions.computeIfAbsent( final String stateDescription = actorStateDescriptions.computeIfAbsent(
(machineName + "." + stateName), (machineName + "." + stateName),
k -> Optional.ofNullable(stateGraph.declaredStates().get(stateName)) k -> Optional.ofNullable(stateGraph.declaredStates().get(stateName))
.map(GraphState::description) .map(ActorState::description)
.orElse("Description missing for " + stateName) .orElse("Description missing for " + stateName)
); );

View File

@ -7,7 +7,6 @@ import lombok.SneakyThrows;
import nu.marginalia.control.model.*; import nu.marginalia.control.model.*;
import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.*; import nu.marginalia.db.storage.model.*;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import spark.Request; import spark.Request;

View File

@ -20,6 +20,8 @@ include 'code:libraries:braille-block-punch-cards'
include 'code:libraries:language-processing' include 'code:libraries:language-processing'
include 'code:libraries:term-frequency-dict' include 'code:libraries:term-frequency-dict'
include 'code:libraries:message-queue'
include 'code:features-search:screenshots' include 'code:features-search:screenshots'
include 'code:features-search:random-websites' include 'code:features-search:random-websites'
include 'code:features-search:query-parser' include 'code:features-search:query-parser'
@ -49,7 +51,6 @@ include 'code:api:process-mqapi'
include 'code:common:service-discovery' include 'code:common:service-discovery'
include 'code:common:service-client' include 'code:common:service-client'
include 'code:common:db' include 'code:common:db'
include 'code:common:message-queue'
include 'code:common:service' include 'code:common:service'
include 'code:common:config' include 'code:common:config'
include 'code:common:model' include 'code:common:model'