From f0a8ca440fac57f60bb69e16c0809589e6421a3e Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Thu, 6 Jul 2023 13:33:11 +0200 Subject: [PATCH] MQFSM Usability WIP --- .../java/nu/marginalia/mqsm/StateFactory.java | 50 ++++++++++++++++++- .../java/nu/marginalia/mqsm/StateMachine.java | 36 ++++++------- ...tateGraph.java => AbstractStateGraph.java} | 25 +++++++--- .../nu/marginalia/mqsm/graph/GraphState.java | 2 - .../marginalia/mqsm/graph/ResumeBehavior.java | 8 +++ .../nu/marginalia/mqsm/state/ErrorState.java | 17 ------- .../nu/marginalia/mqsm/state/FinalState.java | 17 ------- .../marginalia/mqsm/state/MachineState.java | 4 ++ .../marginalia/mqsm/state/ResumeBehavior.java | 6 --- .../marginalia/mqsm/state/ResumingState.java | 17 ------- .../mqsm/StateMachineErrorTest.java | 13 ++--- .../mqsm/StateMachineResumeTest.java | 26 +++------- .../nu/marginalia/mqsm/StateMachineTest.java | 39 ++++----------- 13 files changed, 119 insertions(+), 141 deletions(-) rename code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/{StateGraph.java => AbstractStateGraph.java} (88%) create mode 100644 code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java delete mode 100644 code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ErrorState.java delete mode 100644 code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/FinalState.java delete mode 100644 code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ResumeBehavior.java delete mode 100644 code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ResumingState.java diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java index 09c02ea7..6a143157 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java @@ -3,8 +3,8 @@ package nu.marginalia.mqsm; import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.mqsm.state.MachineState; -import nu.marginalia.mqsm.state.ResumeBehavior; import nu.marginalia.mqsm.state.StateTransition; import java.util.function.Function; @@ -74,4 +74,52 @@ public class StateFactory { public StateTransition transition(String state, Object message) { return StateTransition.to(state, gson.toJson(message)); } + + public static class ErrorState implements MachineState { + @Override + public String name() { return "ERROR"; } + + @Override + public StateTransition next(String message) { + throw new UnsupportedOperationException(); + } + + @Override + public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } + + @Override + public boolean isFinal() { return true; } + } + + public static class FinalState implements MachineState { + @Override + public String name() { return "END"; } + + @Override + public StateTransition next(String message) { + throw new UnsupportedOperationException(); + } + + @Override + public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } + + @Override + public boolean isFinal() { return true; } + } + + public static class ResumingState implements MachineState { + @Override + public String name() { return "RESUMING"; } + + @Override + public StateTransition next(String message) { + throw new UnsupportedOperationException(); + } + + @Override + public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } + + @Override + public boolean isFinal() { return false; } + } } diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java index 827005ed..e54b48f7 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java @@ -7,7 +7,8 @@ import nu.marginalia.mq.inbox.MqInboxResponse; import nu.marginalia.mq.inbox.MqSubscription; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqsm.graph.StateGraph; +import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.state.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,13 +31,16 @@ public class StateMachine { private final String queueName; private MachineState state; - private final MachineState errorState = new ErrorState(); - private final MachineState finalState = new FinalState(); - private final MachineState resumingState = new ResumingState(); + private final MachineState errorState = new StateFactory.ErrorState(); + private final MachineState finalState = new StateFactory.FinalState(); + private final MachineState resumingState = new StateFactory.ResumingState(); private final Map allStates = new HashMap<>(); - public StateMachine(MqPersistence persistence, String queueName, UUID instanceUUID) { + public StateMachine(MqPersistence persistence, + String queueName, + UUID instanceUUID, + AbstractStateGraph stateGraph) { this.queueName = queueName; smInbox = new MqInbox(persistence, queueName, instanceUUID, Executors.newSingleThreadExecutor()); @@ -45,28 +49,24 @@ public class StateMachine { smInbox.subscribe(new StateEventSubscription()); registerStates(List.of(errorState, finalState, resumingState)); + registerStates(stateGraph); + + for (var declaredState : stateGraph.declaredStates()) { + if (!allStates.containsKey(declaredState)) { + throw new IllegalArgumentException("State " + declaredState + " is not defined in the state graph"); + } + } } /** Register the state graph */ - public void registerStates(MachineState... states) { - if (state != null) { - throw new IllegalStateException("Cannot register states after state machine has been initialized"); - } - + void registerStates(List states) { for (var state : states) { allStates.put(state.name(), state); } } /** Register the state graph */ - public void registerStates(List states) { - for (var state : states) { - allStates.put(state.name(), state); - } - } - - /** Register the state graph */ - public void registerStates(StateGraph states) { + void registerStates(AbstractStateGraph states) { registerStates(states.asStateList()); } diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/StateGraph.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/AbstractStateGraph.java similarity index 88% rename from code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/StateGraph.java rename to code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/AbstractStateGraph.java index df8f4318..10aca984 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/StateGraph.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/AbstractStateGraph.java @@ -1,22 +1,20 @@ package nu.marginalia.mqsm.graph; -import nu.marginalia.mqsm.StateFactory; import nu.marginalia.mqsm.state.MachineState; +import nu.marginalia.mqsm.StateFactory; import nu.marginalia.mqsm.state.StateTransition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; -public abstract class StateGraph { +public abstract class AbstractStateGraph { private final StateFactory stateFactory; - private static final Logger logger = LoggerFactory.getLogger(StateGraph.class); + private static final Logger logger = LoggerFactory.getLogger(AbstractStateGraph.class); - public StateGraph(StateFactory stateFactory) { + public AbstractStateGraph(StateFactory stateFactory) { this.stateFactory = stateFactory; } @@ -38,6 +36,19 @@ public abstract class StateGraph { throw new ControlFlowException("ERROR", ex.getClass().getSimpleName() + ":" + ex.getMessage()); } + public Set declaredStates() { + Set ret = new HashSet<>(); + + for (var method : getClass().getMethods()) { + var gs = method.getAnnotation(GraphState.class); + if (gs != null) { + ret.add(gs.name()); + ret.add(gs.next()); + } + } + + return ret; + } public List asStateList() { List ret = new ArrayList<>(); diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/GraphState.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/GraphState.java index b79b71aa..62183637 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/GraphState.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/GraphState.java @@ -1,8 +1,6 @@ package nu.marginalia.mqsm.graph; -import nu.marginalia.mqsm.state.ResumeBehavior; - import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java new file mode 100644 index 00000000..2e275cb5 --- /dev/null +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java @@ -0,0 +1,8 @@ +package nu.marginalia.mqsm.graph; + +public enum ResumeBehavior { + /** Retry the state on resume */ + RETRY, + /** Jump to ERROR on resume if the message has been acknowledged */ + ERROR +} diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ErrorState.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ErrorState.java deleted file mode 100644 index dcb19125..00000000 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ErrorState.java +++ /dev/null @@ -1,17 +0,0 @@ -package nu.marginalia.mqsm.state; - -public class ErrorState implements MachineState { - @Override - public String name() { return "ERROR"; } - - @Override - public StateTransition next(String message) { - throw new UnsupportedOperationException(); - } - - @Override - public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } - - @Override - public boolean isFinal() { return true; } -} diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/FinalState.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/FinalState.java deleted file mode 100644 index dc2362fe..00000000 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/FinalState.java +++ /dev/null @@ -1,17 +0,0 @@ -package nu.marginalia.mqsm.state; - -public class FinalState implements MachineState { - @Override - public String name() { return "END"; } - - @Override - public StateTransition next(String message) { - throw new UnsupportedOperationException(); - } - - @Override - public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } - - @Override - public boolean isFinal() { return true; } -} diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/MachineState.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/MachineState.java index 11efc7c5..ec3c26ff 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/MachineState.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/MachineState.java @@ -1,9 +1,13 @@ package nu.marginalia.mqsm.state; +import nu.marginalia.mqsm.graph.ResumeBehavior; + public interface MachineState { String name(); + StateTransition next(String message); ResumeBehavior resumeBehavior(); + boolean isFinal(); } diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ResumeBehavior.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ResumeBehavior.java deleted file mode 100644 index a82446f8..00000000 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ResumeBehavior.java +++ /dev/null @@ -1,6 +0,0 @@ -package nu.marginalia.mqsm.state; - -public enum ResumeBehavior { - RETRY, - ERROR -} diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ResumingState.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ResumingState.java deleted file mode 100644 index ce01bb79..00000000 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/state/ResumingState.java +++ /dev/null @@ -1,17 +0,0 @@ -package nu.marginalia.mqsm.state; - -public class ResumingState implements MachineState { - @Override - public String name() { return "RESUMING"; } - - @Override - public StateTransition next(String message) { - throw new UnsupportedOperationException(); - } - - @Override - public ResumeBehavior resumeBehavior() { return ResumeBehavior.RETRY; } - - @Override - public boolean isFinal() { return false; } -} diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineErrorTest.java b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineErrorTest.java index 6c6298eb..06279f34 100644 --- a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineErrorTest.java +++ b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineErrorTest.java @@ -4,12 +4,11 @@ import com.google.gson.GsonBuilder; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import nu.marginalia.mq.MqMessageRow; -import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.StateGraph; -import nu.marginalia.mqsm.state.ResumeBehavior; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.ResumeBehavior; import org.junit.jupiter.api.*; import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.junit.jupiter.Container; @@ -55,7 +54,7 @@ public class StateMachineErrorTest { dataSource.close(); } - public static class ErrorHurdles extends StateGraph { + public static class ErrorHurdles extends AbstractStateGraph { public ErrorHurdles(StateFactory stateFactory) { super(stateFactory); @@ -71,17 +70,15 @@ public class StateMachineErrorTest { } @GraphState(name = "OK", next = "END") public void ok() { - + } } @Test public void smResumeResumableFromNew() throws Exception { - var sm = new StateMachine(persistence, inboxId, UUID.randomUUID()); var stateFactory = new StateFactory(new GsonBuilder().create()); - - sm.registerStates(new ErrorHurdles(stateFactory).asStateList()); + var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ErrorHurdles(stateFactory)); sm.init(); diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineResumeTest.java b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineResumeTest.java index 6913e13a..654e3623 100644 --- a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineResumeTest.java +++ b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineResumeTest.java @@ -8,8 +8,8 @@ import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.StateGraph; -import nu.marginalia.mqsm.state.ResumeBehavior; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.ResumeBehavior; import org.junit.jupiter.api.*; import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.junit.jupiter.Container; @@ -55,7 +55,7 @@ public class StateMachineResumeTest { dataSource.close(); } - public static class ResumeTrialsGraph extends StateGraph { + public static class ResumeTrialsGraph extends AbstractStateGraph { public ResumeTrialsGraph(StateFactory stateFactory) { super(stateFactory); @@ -75,10 +75,8 @@ public class StateMachineResumeTest { @Test public void smResumeResumableFromNew() throws Exception { - var sm = new StateMachine(persistence, inboxId, UUID.randomUUID()); var stateFactory = new StateFactory(new GsonBuilder().create()); - - sm.registerStates(new ResumeTrialsGraph(stateFactory).asStateList()); + var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); persistence.sendNewMessage(inboxId, null,"RESUMABLE", "", null); @@ -98,10 +96,8 @@ public class StateMachineResumeTest { @Test public void smResumeFromAck() throws Exception { - var sm = new StateMachine(persistence, inboxId, UUID.randomUUID()); var stateFactory = new StateFactory(new GsonBuilder().create()); - - sm.registerStates(new ResumeTrialsGraph(stateFactory)); + var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); long id = persistence.sendNewMessage(inboxId, null,"RESUMABLE", "", null); persistence.updateMessageState(id, MqMessageState.ACK); @@ -123,10 +119,8 @@ public class StateMachineResumeTest { @Test public void smResumeNonResumableFromNew() throws Exception { - var sm = new StateMachine(persistence, inboxId, UUID.randomUUID()); var stateFactory = new StateFactory(new GsonBuilder().create()); - - sm.registerStates(new ResumeTrialsGraph(stateFactory)); + var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); persistence.sendNewMessage(inboxId, null,"NON-RESUMABLE", "", null); @@ -146,10 +140,8 @@ public class StateMachineResumeTest { @Test public void smResumeNonResumableFromAck() throws Exception { - var sm = new StateMachine(persistence, inboxId, UUID.randomUUID()); var stateFactory = new StateFactory(new GsonBuilder().create()); - - sm.registerStates(new ResumeTrialsGraph(stateFactory)); + var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); long id = persistence.sendNewMessage(inboxId, null,"NON-RESUMABLE", "", null); persistence.updateMessageState(id, MqMessageState.ACK); @@ -170,10 +162,8 @@ public class StateMachineResumeTest { @Test public void smResumeEmptyQueue() throws Exception { - var sm = new StateMachine(persistence, inboxId, UUID.randomUUID()); var stateFactory = new StateFactory(new GsonBuilder().create()); - - sm.registerStates(new ResumeTrialsGraph(stateFactory)); + var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory)); sm.resume(); diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineTest.java b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineTest.java index 789b13ad..a6adfa4c 100644 --- a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineTest.java +++ b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineTest.java @@ -3,19 +3,15 @@ package nu.marginalia.mqsm; import com.google.gson.GsonBuilder; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.mq.MqMessageRow; -import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqTestUtil; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqsm.graph.GraphState; -import nu.marginalia.mqsm.graph.StateGraph; -import nu.marginalia.mqsm.state.ResumeBehavior; +import nu.marginalia.mqsm.graph.AbstractStateGraph; import org.junit.jupiter.api.*; import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import java.util.List; import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -55,7 +51,7 @@ public class StateMachineTest { dataSource.close(); } - public static class TestGraph extends StateGraph { + public static class TestGraph extends AbstractStateGraph { public TestGraph(StateFactory stateFactory) { super(stateFactory); } @@ -87,8 +83,8 @@ public class StateMachineTest { var graph = new TestGraph(stateFactory); - var sm = new StateMachine(persistence, inboxId, UUID.randomUUID()); - sm.registerStates(graph.asStateList()); + var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), graph); + sm.registerStates(graph); sm.init(); @@ -101,34 +97,17 @@ public class StateMachineTest { @Test public void testStartStopStartStop() throws Exception { - var sm = new StateMachine(persistence, inboxId, UUID.randomUUID()); var stateFactory = new StateFactory(new GsonBuilder().create()); - - var initial = stateFactory.create("INITIAL", ResumeBehavior.RETRY, () -> stateFactory.transition("GREET", "World")); - - var greet = stateFactory.create("GREET", ResumeBehavior.RETRY, String.class, (String message) -> { - System.out.println("Hello, " + message + "!"); - return stateFactory.transition("COUNT-TO-FIVE", 0); - }); - - var ctf = stateFactory.create("COUNT-TO-FIVE", ResumeBehavior.RETRY, Integer.class, (Integer count) -> { - System.out.println(count); - if (count < 5) { - return stateFactory.transition("COUNT-TO-FIVE", count + 1); - } else { - return stateFactory.transition("END"); - } - }); - - sm.registerStates(initial, greet, ctf); + var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new TestGraph(stateFactory)); sm.init(); - Thread.sleep(300); + Thread.sleep(150); sm.stop(); - var sm2 = new StateMachine(persistence, inboxId, UUID.randomUUID()); - sm2.registerStates(initial, greet, ctf); + System.out.println("-------------------- "); + + var sm2 = new StateMachine(persistence, inboxId, UUID.randomUUID(), new TestGraph(stateFactory)); sm2.resume(); sm2.join(); sm2.stop();