diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java
index b8d2e683..7ea9d6c9 100644
--- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java
+++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java
@@ -10,8 +10,8 @@ import nu.marginalia.client.Context;
import nu.marginalia.index.client.model.query.SearchSpecification;
import nu.marginalia.index.client.model.results.SearchResultSet;
import nu.marginalia.model.gson.GsonFactory;
+import nu.marginalia.mq.MqFactory;
import nu.marginalia.mq.outbox.MqOutbox;
-import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
@@ -27,13 +27,13 @@ public class IndexClient extends AbstractDynamicClient {
@Inject
public IndexClient(ServiceDescriptors descriptors,
- MqPersistence persistence) {
+ MqFactory messageQueueFactory) {
super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get);
String inboxName = ServiceId.Index.name + ":" + "0";
String outboxName = System.getProperty("service-name", UUID.randomUUID().toString());
- outbox = new MqOutbox(persistence, inboxName, outboxName, UUID.randomUUID());
+ outbox = messageQueueFactory.createOutbox(inboxName, outboxName, UUID.randomUUID());
setTimeout(30);
}
diff --git a/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchClient.java b/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchClient.java
index 69e011bd..6a4f2c4d 100644
--- a/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchClient.java
+++ b/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchClient.java
@@ -5,8 +5,8 @@ import com.google.inject.Singleton;
import io.reactivex.rxjava3.core.Observable;
import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.model.gson.GsonFactory;
+import nu.marginalia.mq.MqFactory;
import nu.marginalia.mq.outbox.MqOutbox;
-import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.search.client.model.ApiSearchResults;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
@@ -28,14 +28,14 @@ public class SearchClient extends AbstractDynamicClient {
@Inject
public SearchClient(ServiceDescriptors descriptors,
- MqPersistence persistence) {
+ MqFactory messageQueueFactory) {
super(descriptors.forId(ServiceId.Search), WmsaHome.getHostsFile(), GsonFactory::get);
String inboxName = ServiceId.Search.name + ":" + "0";
String outboxName = System.getProperty("service-name", UUID.randomUUID().toString());
- outbox = new MqOutbox(persistence, inboxName, outboxName, UUID.randomUUID());
+ outbox = messageQueueFactory.createOutbox(inboxName, outboxName, UUID.randomUUID());
}
diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/MqFactory.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/MqFactory.java
new file mode 100644
index 00000000..792d0bd8
--- /dev/null
+++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/MqFactory.java
@@ -0,0 +1,42 @@
+package nu.marginalia.mq;
+
+import nu.marginalia.mq.inbox.MqAsynchronousInbox;
+import nu.marginalia.mq.inbox.MqInboxIf;
+import nu.marginalia.mq.inbox.MqSingleShotInbox;
+import nu.marginalia.mq.inbox.MqSynchronousInbox;
+import nu.marginalia.mq.outbox.MqOutbox;
+import nu.marginalia.mq.persistence.MqPersistence;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import java.util.UUID;
+
+@Singleton
+public class MqFactory {
+ private final MqPersistence persistence;
+
+ @Inject
+ public MqFactory(MqPersistence persistence) {
+ this.persistence = persistence;
+ }
+
+ public MqInboxIf createAsynchronousInbox(String inboxName, UUID instanceUUID)
+ {
+ return new MqAsynchronousInbox(persistence, inboxName, instanceUUID);
+ }
+
+ public MqInboxIf createSynchronousInbox(String inboxName, UUID instanceUUID)
+ {
+ return new MqSynchronousInbox(persistence, inboxName, instanceUUID);
+ }
+
+ public MqSingleShotInbox createSingleShotInbox(String inboxName, UUID instanceUUID)
+ {
+ return new MqSingleShotInbox(persistence, inboxName, instanceUUID);
+ }
+
+ public MqOutbox createOutbox(String inboxName, String outboxName, UUID instanceUUID)
+ {
+ return new MqOutbox(persistence, inboxName, outboxName, instanceUUID);
+ }
+}
diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInbox.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java
similarity index 86%
rename from code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInbox.java
rename to code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java
index 49f34feb..94fa82f6 100644
--- a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInbox.java
+++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java
@@ -15,11 +15,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-/** Message queue inbox */
-public class MqInbox {
- private final Logger logger = LoggerFactory.getLogger(MqInbox.class);
+/** Message queue inbox that spawns news threads for each message */
+public class MqAsynchronousInbox implements MqInboxIf {
+ private final Logger logger = LoggerFactory.getLogger(MqAsynchronousInbox.class);
private final String inboxName;
private final String instanceUUID;
@@ -36,17 +35,17 @@ public class MqInbox {
private Thread pollDbThread;
private Thread notifyThread;
- public MqInbox(MqPersistence persistence,
- String inboxName,
- UUID instanceUUID)
+ public MqAsynchronousInbox(MqPersistence persistence,
+ String inboxName,
+ UUID instanceUUID)
{
this(persistence, inboxName, instanceUUID, Executors.newCachedThreadPool());
}
- public MqInbox(MqPersistence persistence,
- String inboxName,
- UUID instanceUUID,
- ExecutorService executorService)
+ public MqAsynchronousInbox(MqPersistence persistence,
+ String inboxName,
+ UUID instanceUUID,
+ ExecutorService executorService)
{
this.threadPool = executorService;
this.persistence = persistence;
@@ -55,6 +54,7 @@ public class MqInbox {
}
/** Subscribe to messages on this inbox. Must be run before start()! */
+ @Override
public void subscribe(MqSubscription subscription) {
eventSubscribers.add(subscription);
}
@@ -62,6 +62,7 @@ public class MqInbox {
/** Start receiving messages.
* Note: Subscribe to messages before calling this method.
*
*/
+ @Override
public void start() {
run = true;
@@ -82,6 +83,7 @@ public class MqInbox {
}
/** Stop receiving messages and shut down all threads */
+ @Override
public void stop() throws InterruptedException {
if (!run)
return;
@@ -185,7 +187,7 @@ public class MqInbox {
}
}
- public void pollDb() {
+ private void pollDb() {
try {
for (long tick = 1; run; tick++) {
@@ -210,6 +212,7 @@ public class MqInbox {
}
/** Retrieve the last N messages from the inbox. */
+ @Override
public List replay(int lastN) {
try {
return persistence.lastNMessages(inboxName, lastN);
@@ -220,23 +223,4 @@ public class MqInbox {
}
}
-
- private class MqInboxShredder implements MqSubscription {
-
- @Override
- public boolean filter(MqMessage rawMessage) {
- return true;
- }
-
- @Override
- public MqInboxResponse onRequest(MqMessage msg) {
- logger.warn("Unhandled message {}", msg.msgId());
- return MqInboxResponse.err();
- }
-
- @Override
- public void onNotification(MqMessage msg) {
- logger.warn("Unhandled message {}", msg.msgId());
- }
- }
}
diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxIf.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxIf.java
new file mode 100644
index 00000000..b317a1c5
--- /dev/null
+++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxIf.java
@@ -0,0 +1,15 @@
+package nu.marginalia.mq.inbox;
+
+import nu.marginalia.mq.MqMessage;
+
+import java.util.List;
+
+public interface MqInboxIf {
+ void subscribe(MqSubscription subscription);
+
+ void start();
+
+ void stop() throws InterruptedException;
+
+ List replay(int lastN);
+}
diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java
new file mode 100644
index 00000000..18c346f2
--- /dev/null
+++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java
@@ -0,0 +1,29 @@
+package nu.marginalia.mq.inbox;
+
+import nu.marginalia.mq.MqMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class MqInboxShredder implements MqSubscription {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ public MqInboxShredder() {
+ }
+
+ @Override
+ public boolean filter(MqMessage rawMessage) {
+ return true;
+ }
+
+ @Override
+ public MqInboxResponse onRequest(MqMessage msg) {
+ logger.warn("Unhandled message {}", msg.msgId());
+ return MqInboxResponse.err();
+ }
+
+ @Override
+ public void onNotification(MqMessage msg) {
+ logger.warn("Unhandled message {}", msg.msgId());
+ }
+}
diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java
index 68fca86e..791a195c 100644
--- a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java
+++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java
@@ -5,6 +5,7 @@ import nu.marginalia.mq.persistence.MqPersistence;
import java.sql.SQLException;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
/** A single-shot inbox that can be used to wait for a single message
@@ -16,11 +17,12 @@ public class MqSingleShotInbox {
private final String instanceUUID;
private final MqPersistence persistence;
- public MqSingleShotInbox(String inboxName,
- String instanceUUID,
- MqPersistence persistence) {
+ public MqSingleShotInbox(MqPersistence persistence,
+ String inboxName,
+ UUID instanceUUID
+ ) {
this.inboxName = inboxName;
- this.instanceUUID = instanceUUID;
+ this.instanceUUID = instanceUUID.toString();
this.persistence = persistence;
}
diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java
new file mode 100644
index 00000000..a150a239
--- /dev/null
+++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java
@@ -0,0 +1,197 @@
+package nu.marginalia.mq.inbox;
+
+import nu.marginalia.mq.MqMessage;
+import nu.marginalia.mq.MqMessageState;
+import nu.marginalia.mq.persistence.MqPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/** Message queue inbox that responds to a single message at a time
+ * within the polling thread
+ */
+public class MqSynchronousInbox implements MqInboxIf {
+ private final Logger logger = LoggerFactory.getLogger(MqSynchronousInbox.class);
+
+ private final String inboxName;
+ private final String instanceUUID;
+ private final MqPersistence persistence;
+
+ private volatile boolean run = true;
+
+ private final int pollIntervalMs = Integer.getInteger("mq.inbox.poll-interval-ms", 100);
+ private final List eventSubscribers = new ArrayList<>();
+
+ private Thread pollDbThread;
+
+ public MqSynchronousInbox(MqPersistence persistence,
+ String inboxName,
+ UUID instanceUUID)
+ {
+ this.persistence = persistence;
+ this.inboxName = inboxName;
+ this.instanceUUID = instanceUUID.toString();
+ }
+
+ /** Subscribe to messages on this inbox. Must be run before start()! */
+ @Override
+ public void subscribe(MqSubscription subscription) {
+ eventSubscribers.add(subscription);
+ }
+
+ /** Start receiving messages.
+ * Note: Subscribe to messages before calling this method.
+ *
*/
+ @Override
+ public void start() {
+ run = true;
+
+ if (eventSubscribers.isEmpty()) {
+ logger.error("No subscribers for inbox {}, registering shredder", inboxName);
+ }
+
+ // Add a final handler that fails any message that is not handled
+ eventSubscribers.add(new MqInboxShredder());
+
+ pollDbThread = new Thread(this::pollDb, "mq-inbox-update-thread:"+inboxName);
+ pollDbThread.setDaemon(true);
+ pollDbThread.start();
+ }
+
+ /** Stop receiving messages and shut down all threads */
+ @Override
+ public void stop() throws InterruptedException {
+ if (!run)
+ return;
+
+ logger.info("Shutting down inbox {}", inboxName);
+
+ run = false;
+ pollDbThread.join();
+
+ }
+
+ private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage msg) {
+
+ if (msg.expectsResponse()) {
+ respondToMessage(subscriber, msg);
+ }
+ else {
+ acknowledgeNotification(subscriber, msg);
+ }
+ }
+
+ private void respondToMessage(MqSubscription subscriber, MqMessage msg) {
+ try {
+ final var rsp = subscriber.onRequest(msg);
+ sendResponse(msg, rsp.state(), rsp.message());
+ } catch (Exception ex) {
+ logger.error("Message Queue subscriber threw exception", ex);
+ sendResponse(msg, MqMessageState.ERR);
+ }
+ }
+
+ private void acknowledgeNotification(MqSubscription subscriber, MqMessage msg) {
+ try {
+ subscriber.onNotification(msg);
+ updateMessageState(msg, MqMessageState.OK);
+ } catch (Exception ex) {
+ logger.error("Message Queue subscriber threw exception", ex);
+ updateMessageState(msg, MqMessageState.ERR);
+ }
+ }
+
+ private void sendResponse(MqMessage msg, MqMessageState state) {
+ try {
+ persistence.updateMessageState(msg.msgId(), state);
+ }
+ catch (SQLException ex) {
+ logger.error("Failed to update message state", ex);
+ }
+ }
+
+ private void updateMessageState(MqMessage msg, MqMessageState state) {
+ try {
+ persistence.updateMessageState(msg.msgId(), state);
+ }
+ catch (SQLException ex2) {
+ logger.error("Failed to update message state", ex2);
+ }
+ }
+
+ private void sendResponse(MqMessage msg, MqMessageState mqMessageState, String response) {
+ try {
+ persistence.sendResponse(msg.msgId(), mqMessageState, response);
+ }
+ catch (SQLException ex) {
+ logger.error("Failed to update message state", ex);
+ }
+ }
+
+ public void pollDb() {
+ try {
+ for (long tick = 1; run; tick++) {
+
+ var messages = pollInbox(tick);
+
+ for (var msg : messages) {
+ handleMessage(msg);
+ }
+
+ if (messages.isEmpty()) {
+ TimeUnit.MILLISECONDS.sleep(pollIntervalMs);
+ }
+ }
+ }
+ catch (InterruptedException ex) {
+ logger.error("MQ inbox update thread interrupted", ex);
+ }
+ }
+
+ private void handleMessage(MqMessage msg) {
+ logger.info("Notifying subscribers of msg {}", msg.msgId());
+
+ boolean handled = false;
+
+ for (var eventSubscriber : eventSubscribers) {
+ if (eventSubscriber.filter(msg)) {
+ handleMessageWithSubscriber(eventSubscriber, msg);
+ handled = true;
+ break;
+ }
+ }
+
+ if (!handled) {
+ logger.error("No subscriber wanted to handle msg {}", msg.msgId());
+ }
+ }
+
+ private Collection pollInbox(long tick) {
+ try {
+ return persistence.pollInbox(inboxName, instanceUUID, tick, 1);
+ }
+ catch (SQLException ex) {
+ logger.error("Failed to poll inbox", ex);
+ return List.of();
+ }
+ }
+
+ /** Retrieve the last N messages from the inbox. */
+ @Override
+ public List replay(int lastN) {
+ try {
+ return persistence.lastNMessages(inboxName, lastN);
+ }
+ catch (SQLException ex) {
+ logger.error("Failed to replay inbox", ex);
+ return List.of();
+ }
+ }
+
+}
diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java
index 4e1f3843..198914b3 100644
--- a/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java
+++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java
@@ -174,6 +174,7 @@ public class MqPersistence {
SET OWNER_INSTANCE=?, OWNER_TICK=?, UPDATED_TIME=CURRENT_TIMESTAMP(6), STATE='ACK'
WHERE RECIPIENT_INBOX=?
AND OWNER_INSTANCE IS NULL AND STATE='NEW'
+ ORDER BY ID ASC
LIMIT ?
""");
) {
diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java
index b8ffc739..9b7d2cfa 100644
--- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java
+++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java
@@ -1,12 +1,12 @@
package nu.marginalia.mqsm;
+import nu.marginalia.mq.MqFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
-import nu.marginalia.mq.inbox.MqInbox;
+import nu.marginalia.mq.inbox.MqInboxIf;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSubscription;
import nu.marginalia.mq.outbox.MqOutbox;
-import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.state.*;
@@ -14,7 +14,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
-import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
/** A state machine that can be used to implement a finite state machine
@@ -24,7 +23,7 @@ import java.util.function.BiConsumer;
public class StateMachine {
private final Logger logger = LoggerFactory.getLogger(StateMachine.class);
- private final MqInbox smInbox;
+ private final MqInboxIf smInbox;
private final MqOutbox smOutbox;
private final String queueName;
private MachineState state;
@@ -37,14 +36,14 @@ public class StateMachine {
private final Map allStates = new HashMap<>();
- public StateMachine(MqPersistence persistence,
+ public StateMachine(MqFactory messageQueueFactory,
String queueName,
UUID instanceUUID,
AbstractStateGraph stateGraph) {
this.queueName = queueName;
- smInbox = new MqInbox(persistence, queueName, instanceUUID, Executors.newSingleThreadExecutor());
- smOutbox = new MqOutbox(persistence, queueName, queueName+"//out", instanceUUID);
+ smInbox = messageQueueFactory.createSynchronousInbox(queueName, instanceUUID);
+ smOutbox = messageQueueFactory.createOutbox(queueName, queueName+"//out", instanceUUID);
smInbox.subscribe(new StateEventSubscription());
diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java b/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java
index cb866b52..4411df25 100644
--- a/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java
+++ b/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java
@@ -5,10 +5,7 @@ import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.MqTestUtil;
-import nu.marginalia.mq.inbox.MqInboxResponse;
-import nu.marginalia.mq.inbox.MqInbox;
-import nu.marginalia.mq.inbox.MqSingleShotInbox;
-import nu.marginalia.mq.inbox.MqSubscription;
+import nu.marginalia.mq.inbox.*;
import nu.marginalia.mq.persistence.MqPersistence;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.MariaDBContainer;
@@ -63,7 +60,7 @@ public class MqOutboxTest {
@Test
public void testSingleShotInboxTimeout() throws Exception {
- var inbox = new MqSingleShotInbox(inboxId, UUID.randomUUID().toString(), new MqPersistence(dataSource));
+ var inbox = new MqSingleShotInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
var message = inbox.waitForMessage(100, TimeUnit.MILLISECONDS);
assertTrue(message.isEmpty());
}
@@ -91,7 +88,7 @@ public class MqOutboxTest {
long id = outbox.sendAsync("test", "Hello World");
// Create a single-shot inbox
- var inbox = new MqSingleShotInbox(inboxId, UUID.randomUUID().toString(), new MqPersistence(dataSource));
+ var inbox = new MqSingleShotInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
// Wait for the message to arrive
var message = inbox.waitForMessage(1, TimeUnit.SECONDS);
@@ -125,11 +122,12 @@ public class MqOutboxTest {
outbox.stop();
}
+
@Test
- public void testSendAndRespond() throws Exception {
+ public void testSendAndRespondAsyncInbox() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
- var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+ var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
inbox.subscribe(justRespond("Alright then"));
inbox.start();
@@ -147,10 +145,31 @@ public class MqOutboxTest {
}
@Test
- public void testSendMultiple() throws Exception {
+ public void testSendAndRespondSyncInbox() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
- var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+ var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+ inbox.subscribe(justRespond("Alright then"));
+ inbox.start();
+
+ var rsp = outbox.send("test", "Hello World");
+
+ assertEquals(MqMessageState.OK, rsp.state());
+ assertEquals("Alright then", rsp.payload());
+
+ var messages = MqTestUtil.getMessages(dataSource, inboxId);
+ assertEquals(1, messages.size());
+ assertEquals(MqMessageState.OK, messages.get(0).state());
+
+ outbox.stop();
+ inbox.stop();
+ }
+
+ @Test
+ public void testSendMultipleAsyncInbox() throws Exception {
+ var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
+
+ var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
inbox.subscribe(echo());
inbox.start();
@@ -181,9 +200,62 @@ public class MqOutboxTest {
}
@Test
- public void testSendAndRespondWithErrorHandler() throws Exception {
+ public void testSendMultipleSyncInbox() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
- var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+
+ var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+ inbox.subscribe(echo());
+ inbox.start();
+
+ var rsp1 = outbox.send("test", "one");
+ var rsp2 = outbox.send("test", "two");
+ var rsp3 = outbox.send("test", "three");
+ var rsp4 = outbox.send("test", "four");
+
+ Thread.sleep(500);
+
+ assertEquals(MqMessageState.OK, rsp1.state());
+ assertEquals("one", rsp1.payload());
+ assertEquals(MqMessageState.OK, rsp2.state());
+ assertEquals("two", rsp2.payload());
+ assertEquals(MqMessageState.OK, rsp3.state());
+ assertEquals("three", rsp3.payload());
+ assertEquals(MqMessageState.OK, rsp4.state());
+ assertEquals("four", rsp4.payload());
+
+ var messages = MqTestUtil.getMessages(dataSource, inboxId);
+ assertEquals(4, messages.size());
+ for (var message : messages) {
+ assertEquals(MqMessageState.OK, message.state());
+ }
+
+ outbox.stop();
+ inbox.stop();
+ }
+
+ @Test
+ public void testSendAndRespondWithErrorHandlerAsyncInbox() throws Exception {
+ var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
+ var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+
+ inbox.start();
+
+ var rsp = outbox.send("test", "Hello World");
+
+ assertEquals(MqMessageState.ERR, rsp.state());
+
+ var messages = MqTestUtil.getMessages(dataSource, inboxId);
+ assertEquals(1, messages.size());
+ assertEquals(MqMessageState.ERR, messages.get(0).state());
+
+ outbox.stop();
+ inbox.stop();
+ }
+
+ @Test
+ public void testSendAndRespondWithErrorHandlerSyncInbox() throws Exception {
+ var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
+ var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
inbox.start();
diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineErrorTest.java b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineErrorTest.java
index 9d7306c2..f41a7dbd 100644
--- a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineErrorTest.java
+++ b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineErrorTest.java
@@ -3,6 +3,7 @@ package nu.marginalia.mqsm;
import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
+import nu.marginalia.mq.MqFactory;
import nu.marginalia.mq.MqMessageRow;
import nu.marginalia.mq.MqTestUtil;
import nu.marginalia.mq.persistence.MqPersistence;
@@ -32,6 +33,7 @@ public class StateMachineErrorTest {
static HikariDataSource dataSource;
static MqPersistence persistence;
+ static MqFactory messageQueueFactory;
private String inboxId;
@BeforeEach
@@ -47,6 +49,7 @@ public class StateMachineErrorTest {
dataSource = new HikariDataSource(config);
persistence = new MqPersistence(dataSource);
+ messageQueueFactory = new MqFactory(persistence);
}
@AfterAll
@@ -78,7 +81,7 @@ public class StateMachineErrorTest {
@Test
public void smResumeResumableFromNew() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create());
- var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ErrorHurdles(stateFactory));
+ var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ErrorHurdles(stateFactory));
sm.init();
diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineResumeTest.java b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineResumeTest.java
index f3524968..79af8d07 100644
--- a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineResumeTest.java
+++ b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineResumeTest.java
@@ -3,6 +3,7 @@ package nu.marginalia.mqsm;
import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
+import nu.marginalia.mq.MqFactory;
import nu.marginalia.mq.MqMessageRow;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.MqTestUtil;
@@ -33,6 +34,7 @@ public class StateMachineResumeTest {
static HikariDataSource dataSource;
static MqPersistence persistence;
+ static MqFactory messageQueueFactory;
private String inboxId;
@BeforeEach
@@ -48,6 +50,7 @@ public class StateMachineResumeTest {
dataSource = new HikariDataSource(config);
persistence = new MqPersistence(dataSource);
+ messageQueueFactory = new MqFactory(persistence);
}
@AfterAll
@@ -76,7 +79,7 @@ public class StateMachineResumeTest {
@Test
public void smResumeResumableFromNew() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create());
- var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
+ var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
persistence.sendNewMessage(inboxId, null,"RESUMABLE", "", null);
@@ -97,7 +100,7 @@ public class StateMachineResumeTest {
@Test
public void smResumeFromAck() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create());
- var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
+ var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
long id = persistence.sendNewMessage(inboxId, null,"RESUMABLE", "", null);
persistence.updateMessageState(id, MqMessageState.ACK);
@@ -120,7 +123,7 @@ public class StateMachineResumeTest {
@Test
public void smResumeNonResumableFromNew() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create());
- var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
+ var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
persistence.sendNewMessage(inboxId, null,"NON-RESUMABLE", "", null);
@@ -141,7 +144,7 @@ public class StateMachineResumeTest {
@Test
public void smResumeNonResumableFromAck() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create());
- var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
+ var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
long id = persistence.sendNewMessage(inboxId, null,"NON-RESUMABLE", "", null);
persistence.updateMessageState(id, MqMessageState.ACK);
@@ -163,7 +166,7 @@ public class StateMachineResumeTest {
@Test
public void smResumeEmptyQueue() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create());
- var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
+ var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
sm.resume();
diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineTest.java b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineTest.java
index 1130fe04..27ae869e 100644
--- a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineTest.java
+++ b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineTest.java
@@ -3,6 +3,7 @@ package nu.marginalia.mqsm;
import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
+import nu.marginalia.mq.MqFactory;
import nu.marginalia.mq.MqTestUtil;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.graph.GraphState;
@@ -29,6 +30,7 @@ public class StateMachineTest {
static HikariDataSource dataSource;
static MqPersistence persistence;
+ static MqFactory messageQueueFactory;
private String inboxId;
@BeforeEach
@@ -44,6 +46,7 @@ public class StateMachineTest {
dataSource = new HikariDataSource(config);
persistence = new MqPersistence(dataSource);
+ messageQueueFactory = new MqFactory(persistence);
}
@AfterAll
@@ -83,7 +86,7 @@ public class StateMachineTest {
var graph = new TestGraph(stateFactory);
- var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), graph);
+ var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), graph);
sm.registerStates(graph);
sm.init();
@@ -98,7 +101,7 @@ public class StateMachineTest {
@Test
public void testStartStopStartStop() throws Exception {
var stateFactory = new StateFactory(new GsonBuilder().create());
- var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new TestGraph(stateFactory));
+ var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestGraph(stateFactory));
sm.init();
@@ -107,7 +110,7 @@ public class StateMachineTest {
System.out.println("-------------------- ");
- var sm2 = new StateMachine(persistence, inboxId, UUID.randomUUID(), new TestGraph(stateFactory));
+ var sm2 = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestGraph(stateFactory));
sm2.resume();
sm2.join();
sm2.stop();
diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java b/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java
index abec5e55..2ff07b55 100644
--- a/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java
+++ b/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java
@@ -2,7 +2,7 @@ package nu.marginalia.service.server;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import nu.marginalia.mq.persistence.MqPersistence;
+import nu.marginalia.mq.MqFactory;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.service.module.ServiceConfiguration;
@@ -15,19 +15,19 @@ public class BaseServiceParams {
public final MetricsServer metricsServer;
public final ServiceHeartbeat heartbeat;
public final ServiceEventLog eventLog;
- public final MqPersistence messageQueuePersistence;
-
+ public final MqFactory messageQueueInboxFactory;
@Inject
public BaseServiceParams(ServiceConfiguration configuration,
Initialization initialization,
MetricsServer metricsServer,
ServiceHeartbeat heartbeat,
- ServiceEventLog eventLog, MqPersistence messageQueuePersistence) {
+ ServiceEventLog eventLog,
+ MqFactory messageQueueInboxFactory) {
this.configuration = configuration;
this.initialization = initialization;
this.metricsServer = metricsServer;
this.heartbeat = heartbeat;
this.eventLog = eventLog;
- this.messageQueuePersistence = messageQueuePersistence;
+ this.messageQueueInboxFactory = messageQueueInboxFactory;
}
}
diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/Service.java b/code/common/service/src/main/java/nu/marginalia/service/server/Service.java
index e8386fb8..ebd75753 100644
--- a/code/common/service/src/main/java/nu/marginalia/service/server/Service.java
+++ b/code/common/service/src/main/java/nu/marginalia/service/server/Service.java
@@ -3,7 +3,7 @@ package nu.marginalia.service.server;
import io.prometheus.client.Counter;
import nu.marginalia.client.Context;
import nu.marginalia.client.exception.MessagingException;
-import nu.marginalia.mq.inbox.MqInbox;
+import nu.marginalia.mq.inbox.*;
import nu.marginalia.service.server.mq.MqRequest;
import nu.marginalia.service.server.mq.ServiceMqSubscription;
import org.slf4j.Logger;
@@ -39,7 +39,7 @@ public class Service {
private final String serviceName;
private static volatile boolean initialized = false;
- protected final MqInbox messageQueueInbox;
+ protected final MqInboxIf messageQueueInbox;
public Service(BaseServiceParams params,
Runnable configureStaticFiles
@@ -49,9 +49,9 @@ public class Service {
String inboxName = config.serviceName() + ":" + config.node();
logger.info("Inbox name: {}", inboxName);
- messageQueueInbox = new MqInbox(params.messageQueuePersistence,
- inboxName,
- config.instanceUuid());
+
+ var mqInboxFactory = params.messageQueueInboxFactory;
+ messageQueueInbox = mqInboxFactory.createAsynchronousInbox(inboxName, config.instanceUuid());
messageQueueInbox.subscribe(new ServiceMqSubscription(this));
serviceName = System.getProperty("service-name");
diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java
index 6b8a64eb..e553d7df 100644
--- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java
+++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java
@@ -5,7 +5,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.model.ControlProcess;
import nu.marginalia.model.gson.GsonFactory;
-import nu.marginalia.mq.persistence.MqPersistence;
+import nu.marginalia.mq.MqFactory;
import nu.marginalia.mqsm.StateMachine;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.service.control.ServiceEventLog;
@@ -17,19 +17,19 @@ import java.util.UUID;
@Singleton
public class ControlProcesses {
- private final MqPersistence persistence;
private final ServiceEventLog eventLog;
private final Gson gson;
+ private final MqFactory messageQueueFactory;
public Map stateMachines = new HashMap<>();
@Inject
- public ControlProcesses(MqPersistence persistence,
+ public ControlProcesses(MqFactory messageQueueFactory,
GsonFactory gsonFactory,
BaseServiceParams baseServiceParams,
RepartitionReindexProcess repartitionReindexProcess,
ReconvertAndLoadProcess reconvertAndLoadProcess
) {
- this.persistence = persistence;
+ this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
this.gson = gsonFactory.get();
register(ControlProcess.REPARTITION_REINDEX, repartitionReindexProcess);
@@ -37,7 +37,7 @@ public class ControlProcesses {
}
private void register(ControlProcess process, AbstractStateGraph graph) {
- var sm = new StateMachine(persistence, process.id(), UUID.randomUUID(), graph);
+ var sm = new StateMachine(messageQueueFactory, process.id(), UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function));
diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java
index a5200275..4ba2585c 100644
--- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java
+++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/MessageQueueMonitorService.java
@@ -2,7 +2,6 @@ package nu.marginalia.control.svc;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.control.ServiceEventLog;
-import nu.marginalia.service.server.BaseServiceParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,9 +17,9 @@ public class MessageQueueMonitorService {
private final ServiceEventLog eventLog;
@Inject
- public MessageQueueMonitorService(BaseServiceParams params) {
- this.persistence = params.messageQueuePersistence;
- this.eventLog = params.eventLog;
+ public MessageQueueMonitorService(ServiceEventLog eventLog, MqPersistence persistence) {
+ this.eventLog = eventLog;
+ this.persistence = persistence;
Thread reaperThread = new Thread(this::run, "message-queue-reaper");
reaperThread.setDaemon(true);