mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 21:18:58 +00:00
(mq) Synchronous and Asynchronous inboxes.
This commit is contained in:
parent
0ed938545b
commit
8a53e107fa
@ -10,8 +10,8 @@ import nu.marginalia.client.Context;
|
||||
import nu.marginalia.index.client.model.query.SearchSpecification;
|
||||
import nu.marginalia.index.client.model.results.SearchResultSet;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.mq.MqFactory;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.service.descriptor.ServiceDescriptors;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
|
||||
@ -27,13 +27,13 @@ public class IndexClient extends AbstractDynamicClient {
|
||||
|
||||
@Inject
|
||||
public IndexClient(ServiceDescriptors descriptors,
|
||||
MqPersistence persistence) {
|
||||
MqFactory messageQueueFactory) {
|
||||
super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get);
|
||||
|
||||
String inboxName = ServiceId.Index.name + ":" + "0";
|
||||
String outboxName = System.getProperty("service-name", UUID.randomUUID().toString());
|
||||
|
||||
outbox = new MqOutbox(persistence, inboxName, outboxName, UUID.randomUUID());
|
||||
outbox = messageQueueFactory.createOutbox(inboxName, outboxName, UUID.randomUUID());
|
||||
|
||||
setTimeout(30);
|
||||
}
|
||||
|
@ -5,8 +5,8 @@ import com.google.inject.Singleton;
|
||||
import io.reactivex.rxjava3.core.Observable;
|
||||
import nu.marginalia.client.AbstractDynamicClient;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.mq.MqFactory;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.search.client.model.ApiSearchResults;
|
||||
import nu.marginalia.service.descriptor.ServiceDescriptors;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
@ -28,14 +28,14 @@ public class SearchClient extends AbstractDynamicClient {
|
||||
|
||||
@Inject
|
||||
public SearchClient(ServiceDescriptors descriptors,
|
||||
MqPersistence persistence) {
|
||||
MqFactory messageQueueFactory) {
|
||||
|
||||
super(descriptors.forId(ServiceId.Search), WmsaHome.getHostsFile(), GsonFactory::get);
|
||||
|
||||
String inboxName = ServiceId.Search.name + ":" + "0";
|
||||
String outboxName = System.getProperty("service-name", UUID.randomUUID().toString());
|
||||
|
||||
outbox = new MqOutbox(persistence, inboxName, outboxName, UUID.randomUUID());
|
||||
outbox = messageQueueFactory.createOutbox(inboxName, outboxName, UUID.randomUUID());
|
||||
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,42 @@
|
||||
package nu.marginalia.mq;
|
||||
|
||||
import nu.marginalia.mq.inbox.MqAsynchronousInbox;
|
||||
import nu.marginalia.mq.inbox.MqInboxIf;
|
||||
import nu.marginalia.mq.inbox.MqSingleShotInbox;
|
||||
import nu.marginalia.mq.inbox.MqSynchronousInbox;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
import java.util.UUID;
|
||||
|
||||
@Singleton
|
||||
public class MqFactory {
|
||||
private final MqPersistence persistence;
|
||||
|
||||
@Inject
|
||||
public MqFactory(MqPersistence persistence) {
|
||||
this.persistence = persistence;
|
||||
}
|
||||
|
||||
public MqInboxIf createAsynchronousInbox(String inboxName, UUID instanceUUID)
|
||||
{
|
||||
return new MqAsynchronousInbox(persistence, inboxName, instanceUUID);
|
||||
}
|
||||
|
||||
public MqInboxIf createSynchronousInbox(String inboxName, UUID instanceUUID)
|
||||
{
|
||||
return new MqSynchronousInbox(persistence, inboxName, instanceUUID);
|
||||
}
|
||||
|
||||
public MqSingleShotInbox createSingleShotInbox(String inboxName, UUID instanceUUID)
|
||||
{
|
||||
return new MqSingleShotInbox(persistence, inboxName, instanceUUID);
|
||||
}
|
||||
|
||||
public MqOutbox createOutbox(String inboxName, String outboxName, UUID instanceUUID)
|
||||
{
|
||||
return new MqOutbox(persistence, inboxName, outboxName, instanceUUID);
|
||||
}
|
||||
}
|
@ -15,11 +15,10 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/** Message queue inbox */
|
||||
public class MqInbox {
|
||||
private final Logger logger = LoggerFactory.getLogger(MqInbox.class);
|
||||
/** Message queue inbox that spawns news threads for each message */
|
||||
public class MqAsynchronousInbox implements MqInboxIf {
|
||||
private final Logger logger = LoggerFactory.getLogger(MqAsynchronousInbox.class);
|
||||
|
||||
private final String inboxName;
|
||||
private final String instanceUUID;
|
||||
@ -36,17 +35,17 @@ public class MqInbox {
|
||||
private Thread pollDbThread;
|
||||
private Thread notifyThread;
|
||||
|
||||
public MqInbox(MqPersistence persistence,
|
||||
String inboxName,
|
||||
UUID instanceUUID)
|
||||
public MqAsynchronousInbox(MqPersistence persistence,
|
||||
String inboxName,
|
||||
UUID instanceUUID)
|
||||
{
|
||||
this(persistence, inboxName, instanceUUID, Executors.newCachedThreadPool());
|
||||
}
|
||||
|
||||
public MqInbox(MqPersistence persistence,
|
||||
String inboxName,
|
||||
UUID instanceUUID,
|
||||
ExecutorService executorService)
|
||||
public MqAsynchronousInbox(MqPersistence persistence,
|
||||
String inboxName,
|
||||
UUID instanceUUID,
|
||||
ExecutorService executorService)
|
||||
{
|
||||
this.threadPool = executorService;
|
||||
this.persistence = persistence;
|
||||
@ -55,6 +54,7 @@ public class MqInbox {
|
||||
}
|
||||
|
||||
/** Subscribe to messages on this inbox. Must be run before start()! */
|
||||
@Override
|
||||
public void subscribe(MqSubscription subscription) {
|
||||
eventSubscribers.add(subscription);
|
||||
}
|
||||
@ -62,6 +62,7 @@ public class MqInbox {
|
||||
/** Start receiving messages. <p>
|
||||
* <b>Note:</b> Subscribe to messages before calling this method.
|
||||
* </p> */
|
||||
@Override
|
||||
public void start() {
|
||||
run = true;
|
||||
|
||||
@ -82,6 +83,7 @@ public class MqInbox {
|
||||
}
|
||||
|
||||
/** Stop receiving messages and shut down all threads */
|
||||
@Override
|
||||
public void stop() throws InterruptedException {
|
||||
if (!run)
|
||||
return;
|
||||
@ -185,7 +187,7 @@ public class MqInbox {
|
||||
}
|
||||
}
|
||||
|
||||
public void pollDb() {
|
||||
private void pollDb() {
|
||||
try {
|
||||
for (long tick = 1; run; tick++) {
|
||||
|
||||
@ -210,6 +212,7 @@ public class MqInbox {
|
||||
}
|
||||
|
||||
/** Retrieve the last N messages from the inbox. */
|
||||
@Override
|
||||
public List<MqMessage> replay(int lastN) {
|
||||
try {
|
||||
return persistence.lastNMessages(inboxName, lastN);
|
||||
@ -220,23 +223,4 @@ public class MqInbox {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class MqInboxShredder implements MqSubscription {
|
||||
|
||||
@Override
|
||||
public boolean filter(MqMessage rawMessage) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MqInboxResponse onRequest(MqMessage msg) {
|
||||
logger.warn("Unhandled message {}", msg.msgId());
|
||||
return MqInboxResponse.err();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNotification(MqMessage msg) {
|
||||
logger.warn("Unhandled message {}", msg.msgId());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package nu.marginalia.mq.inbox;
|
||||
|
||||
import nu.marginalia.mq.MqMessage;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface MqInboxIf {
|
||||
void subscribe(MqSubscription subscription);
|
||||
|
||||
void start();
|
||||
|
||||
void stop() throws InterruptedException;
|
||||
|
||||
List<MqMessage> replay(int lastN);
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package nu.marginalia.mq.inbox;
|
||||
|
||||
import nu.marginalia.mq.MqMessage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
class MqInboxShredder implements MqSubscription {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
public MqInboxShredder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(MqMessage rawMessage) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MqInboxResponse onRequest(MqMessage msg) {
|
||||
logger.warn("Unhandled message {}", msg.msgId());
|
||||
return MqInboxResponse.err();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNotification(MqMessage msg) {
|
||||
logger.warn("Unhandled message {}", msg.msgId());
|
||||
}
|
||||
}
|
@ -5,6 +5,7 @@ import nu.marginalia.mq.persistence.MqPersistence;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/** A single-shot inbox that can be used to wait for a single message
|
||||
@ -16,11 +17,12 @@ public class MqSingleShotInbox {
|
||||
private final String instanceUUID;
|
||||
private final MqPersistence persistence;
|
||||
|
||||
public MqSingleShotInbox(String inboxName,
|
||||
String instanceUUID,
|
||||
MqPersistence persistence) {
|
||||
public MqSingleShotInbox(MqPersistence persistence,
|
||||
String inboxName,
|
||||
UUID instanceUUID
|
||||
) {
|
||||
this.inboxName = inboxName;
|
||||
this.instanceUUID = instanceUUID;
|
||||
this.instanceUUID = instanceUUID.toString();
|
||||
this.persistence = persistence;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,197 @@
|
||||
package nu.marginalia.mq.inbox;
|
||||
|
||||
import nu.marginalia.mq.MqMessage;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/** Message queue inbox that responds to a single message at a time
|
||||
* within the polling thread
|
||||
*/
|
||||
public class MqSynchronousInbox implements MqInboxIf {
|
||||
private final Logger logger = LoggerFactory.getLogger(MqSynchronousInbox.class);
|
||||
|
||||
private final String inboxName;
|
||||
private final String instanceUUID;
|
||||
private final MqPersistence persistence;
|
||||
|
||||
private volatile boolean run = true;
|
||||
|
||||
private final int pollIntervalMs = Integer.getInteger("mq.inbox.poll-interval-ms", 100);
|
||||
private final List<MqSubscription> eventSubscribers = new ArrayList<>();
|
||||
|
||||
private Thread pollDbThread;
|
||||
|
||||
public MqSynchronousInbox(MqPersistence persistence,
|
||||
String inboxName,
|
||||
UUID instanceUUID)
|
||||
{
|
||||
this.persistence = persistence;
|
||||
this.inboxName = inboxName;
|
||||
this.instanceUUID = instanceUUID.toString();
|
||||
}
|
||||
|
||||
/** Subscribe to messages on this inbox. Must be run before start()! */
|
||||
@Override
|
||||
public void subscribe(MqSubscription subscription) {
|
||||
eventSubscribers.add(subscription);
|
||||
}
|
||||
|
||||
/** Start receiving messages. <p>
|
||||
* <b>Note:</b> Subscribe to messages before calling this method.
|
||||
* </p> */
|
||||
@Override
|
||||
public void start() {
|
||||
run = true;
|
||||
|
||||
if (eventSubscribers.isEmpty()) {
|
||||
logger.error("No subscribers for inbox {}, registering shredder", inboxName);
|
||||
}
|
||||
|
||||
// Add a final handler that fails any message that is not handled
|
||||
eventSubscribers.add(new MqInboxShredder());
|
||||
|
||||
pollDbThread = new Thread(this::pollDb, "mq-inbox-update-thread:"+inboxName);
|
||||
pollDbThread.setDaemon(true);
|
||||
pollDbThread.start();
|
||||
}
|
||||
|
||||
/** Stop receiving messages and shut down all threads */
|
||||
@Override
|
||||
public void stop() throws InterruptedException {
|
||||
if (!run)
|
||||
return;
|
||||
|
||||
logger.info("Shutting down inbox {}", inboxName);
|
||||
|
||||
run = false;
|
||||
pollDbThread.join();
|
||||
|
||||
}
|
||||
|
||||
private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage msg) {
|
||||
|
||||
if (msg.expectsResponse()) {
|
||||
respondToMessage(subscriber, msg);
|
||||
}
|
||||
else {
|
||||
acknowledgeNotification(subscriber, msg);
|
||||
}
|
||||
}
|
||||
|
||||
private void respondToMessage(MqSubscription subscriber, MqMessage msg) {
|
||||
try {
|
||||
final var rsp = subscriber.onRequest(msg);
|
||||
sendResponse(msg, rsp.state(), rsp.message());
|
||||
} catch (Exception ex) {
|
||||
logger.error("Message Queue subscriber threw exception", ex);
|
||||
sendResponse(msg, MqMessageState.ERR);
|
||||
}
|
||||
}
|
||||
|
||||
private void acknowledgeNotification(MqSubscription subscriber, MqMessage msg) {
|
||||
try {
|
||||
subscriber.onNotification(msg);
|
||||
updateMessageState(msg, MqMessageState.OK);
|
||||
} catch (Exception ex) {
|
||||
logger.error("Message Queue subscriber threw exception", ex);
|
||||
updateMessageState(msg, MqMessageState.ERR);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendResponse(MqMessage msg, MqMessageState state) {
|
||||
try {
|
||||
persistence.updateMessageState(msg.msgId(), state);
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
logger.error("Failed to update message state", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateMessageState(MqMessage msg, MqMessageState state) {
|
||||
try {
|
||||
persistence.updateMessageState(msg.msgId(), state);
|
||||
}
|
||||
catch (SQLException ex2) {
|
||||
logger.error("Failed to update message state", ex2);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendResponse(MqMessage msg, MqMessageState mqMessageState, String response) {
|
||||
try {
|
||||
persistence.sendResponse(msg.msgId(), mqMessageState, response);
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
logger.error("Failed to update message state", ex);
|
||||
}
|
||||
}
|
||||
|
||||
public void pollDb() {
|
||||
try {
|
||||
for (long tick = 1; run; tick++) {
|
||||
|
||||
var messages = pollInbox(tick);
|
||||
|
||||
for (var msg : messages) {
|
||||
handleMessage(msg);
|
||||
}
|
||||
|
||||
if (messages.isEmpty()) {
|
||||
TimeUnit.MILLISECONDS.sleep(pollIntervalMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (InterruptedException ex) {
|
||||
logger.error("MQ inbox update thread interrupted", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleMessage(MqMessage msg) {
|
||||
logger.info("Notifying subscribers of msg {}", msg.msgId());
|
||||
|
||||
boolean handled = false;
|
||||
|
||||
for (var eventSubscriber : eventSubscribers) {
|
||||
if (eventSubscriber.filter(msg)) {
|
||||
handleMessageWithSubscriber(eventSubscriber, msg);
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!handled) {
|
||||
logger.error("No subscriber wanted to handle msg {}", msg.msgId());
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<MqMessage> pollInbox(long tick) {
|
||||
try {
|
||||
return persistence.pollInbox(inboxName, instanceUUID, tick, 1);
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
logger.error("Failed to poll inbox", ex);
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
||||
/** Retrieve the last N messages from the inbox. */
|
||||
@Override
|
||||
public List<MqMessage> replay(int lastN) {
|
||||
try {
|
||||
return persistence.lastNMessages(inboxName, lastN);
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
logger.error("Failed to replay inbox", ex);
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -174,6 +174,7 @@ public class MqPersistence {
|
||||
SET OWNER_INSTANCE=?, OWNER_TICK=?, UPDATED_TIME=CURRENT_TIMESTAMP(6), STATE='ACK'
|
||||
WHERE RECIPIENT_INBOX=?
|
||||
AND OWNER_INSTANCE IS NULL AND STATE='NEW'
|
||||
ORDER BY ID ASC
|
||||
LIMIT ?
|
||||
""");
|
||||
) {
|
||||
|
@ -1,12 +1,12 @@
|
||||
package nu.marginalia.mqsm;
|
||||
|
||||
import nu.marginalia.mq.MqFactory;
|
||||
import nu.marginalia.mq.MqMessage;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.inbox.MqInbox;
|
||||
import nu.marginalia.mq.inbox.MqInboxIf;
|
||||
import nu.marginalia.mq.inbox.MqInboxResponse;
|
||||
import nu.marginalia.mq.inbox.MqSubscription;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqsm.graph.ResumeBehavior;
|
||||
import nu.marginalia.mqsm.graph.AbstractStateGraph;
|
||||
import nu.marginalia.mqsm.state.*;
|
||||
@ -14,7 +14,6 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
/** A state machine that can be used to implement a finite state machine
|
||||
@ -24,7 +23,7 @@ import java.util.function.BiConsumer;
|
||||
public class StateMachine {
|
||||
private final Logger logger = LoggerFactory.getLogger(StateMachine.class);
|
||||
|
||||
private final MqInbox smInbox;
|
||||
private final MqInboxIf smInbox;
|
||||
private final MqOutbox smOutbox;
|
||||
private final String queueName;
|
||||
private MachineState state;
|
||||
@ -37,14 +36,14 @@ public class StateMachine {
|
||||
|
||||
private final Map<String, MachineState> allStates = new HashMap<>();
|
||||
|
||||
public StateMachine(MqPersistence persistence,
|
||||
public StateMachine(MqFactory messageQueueFactory,
|
||||
String queueName,
|
||||
UUID instanceUUID,
|
||||
AbstractStateGraph stateGraph) {
|
||||
this.queueName = queueName;
|
||||
|
||||
smInbox = new MqInbox(persistence, queueName, instanceUUID, Executors.newSingleThreadExecutor());
|
||||
smOutbox = new MqOutbox(persistence, queueName, queueName+"//out", instanceUUID);
|
||||
smInbox = messageQueueFactory.createSynchronousInbox(queueName, instanceUUID);
|
||||
smOutbox = messageQueueFactory.createOutbox(queueName, queueName+"//out", instanceUUID);
|
||||
|
||||
smInbox.subscribe(new StateEventSubscription());
|
||||
|
||||
|
@ -5,10 +5,7 @@ import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.mq.MqMessage;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.MqTestUtil;
|
||||
import nu.marginalia.mq.inbox.MqInboxResponse;
|
||||
import nu.marginalia.mq.inbox.MqInbox;
|
||||
import nu.marginalia.mq.inbox.MqSingleShotInbox;
|
||||
import nu.marginalia.mq.inbox.MqSubscription;
|
||||
import nu.marginalia.mq.inbox.*;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.testcontainers.containers.MariaDBContainer;
|
||||
@ -63,7 +60,7 @@ public class MqOutboxTest {
|
||||
|
||||
@Test
|
||||
public void testSingleShotInboxTimeout() throws Exception {
|
||||
var inbox = new MqSingleShotInbox(inboxId, UUID.randomUUID().toString(), new MqPersistence(dataSource));
|
||||
var inbox = new MqSingleShotInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
var message = inbox.waitForMessage(100, TimeUnit.MILLISECONDS);
|
||||
assertTrue(message.isEmpty());
|
||||
}
|
||||
@ -91,7 +88,7 @@ public class MqOutboxTest {
|
||||
long id = outbox.sendAsync("test", "Hello World");
|
||||
|
||||
// Create a single-shot inbox
|
||||
var inbox = new MqSingleShotInbox(inboxId, UUID.randomUUID().toString(), new MqPersistence(dataSource));
|
||||
var inbox = new MqSingleShotInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
|
||||
// Wait for the message to arrive
|
||||
var message = inbox.waitForMessage(1, TimeUnit.SECONDS);
|
||||
@ -125,11 +122,12 @@ public class MqOutboxTest {
|
||||
outbox.stop();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSendAndRespond() throws Exception {
|
||||
public void testSendAndRespondAsyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
|
||||
|
||||
var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
inbox.subscribe(justRespond("Alright then"));
|
||||
inbox.start();
|
||||
|
||||
@ -147,10 +145,31 @@ public class MqOutboxTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendMultiple() throws Exception {
|
||||
public void testSendAndRespondSyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
|
||||
|
||||
var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
inbox.subscribe(justRespond("Alright then"));
|
||||
inbox.start();
|
||||
|
||||
var rsp = outbox.send("test", "Hello World");
|
||||
|
||||
assertEquals(MqMessageState.OK, rsp.state());
|
||||
assertEquals("Alright then", rsp.payload());
|
||||
|
||||
var messages = MqTestUtil.getMessages(dataSource, inboxId);
|
||||
assertEquals(1, messages.size());
|
||||
assertEquals(MqMessageState.OK, messages.get(0).state());
|
||||
|
||||
outbox.stop();
|
||||
inbox.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendMultipleAsyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
|
||||
|
||||
var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
inbox.subscribe(echo());
|
||||
inbox.start();
|
||||
|
||||
@ -181,9 +200,62 @@ public class MqOutboxTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendAndRespondWithErrorHandler() throws Exception {
|
||||
public void testSendMultipleSyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
|
||||
var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
|
||||
var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
inbox.subscribe(echo());
|
||||
inbox.start();
|
||||
|
||||
var rsp1 = outbox.send("test", "one");
|
||||
var rsp2 = outbox.send("test", "two");
|
||||
var rsp3 = outbox.send("test", "three");
|
||||
var rsp4 = outbox.send("test", "four");
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
assertEquals(MqMessageState.OK, rsp1.state());
|
||||
assertEquals("one", rsp1.payload());
|
||||
assertEquals(MqMessageState.OK, rsp2.state());
|
||||
assertEquals("two", rsp2.payload());
|
||||
assertEquals(MqMessageState.OK, rsp3.state());
|
||||
assertEquals("three", rsp3.payload());
|
||||
assertEquals(MqMessageState.OK, rsp4.state());
|
||||
assertEquals("four", rsp4.payload());
|
||||
|
||||
var messages = MqTestUtil.getMessages(dataSource, inboxId);
|
||||
assertEquals(4, messages.size());
|
||||
for (var message : messages) {
|
||||
assertEquals(MqMessageState.OK, message.state());
|
||||
}
|
||||
|
||||
outbox.stop();
|
||||
inbox.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendAndRespondWithErrorHandlerAsyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
|
||||
var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
|
||||
inbox.start();
|
||||
|
||||
var rsp = outbox.send("test", "Hello World");
|
||||
|
||||
assertEquals(MqMessageState.ERR, rsp.state());
|
||||
|
||||
var messages = MqTestUtil.getMessages(dataSource, inboxId);
|
||||
assertEquals(1, messages.size());
|
||||
assertEquals(MqMessageState.ERR, messages.get(0).state());
|
||||
|
||||
outbox.stop();
|
||||
inbox.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendAndRespondWithErrorHandlerSyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
|
||||
var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
|
||||
|
||||
inbox.start();
|
||||
|
||||
|
@ -3,6 +3,7 @@ package nu.marginalia.mqsm;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.mq.MqFactory;
|
||||
import nu.marginalia.mq.MqMessageRow;
|
||||
import nu.marginalia.mq.MqTestUtil;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
@ -32,6 +33,7 @@ public class StateMachineErrorTest {
|
||||
|
||||
static HikariDataSource dataSource;
|
||||
static MqPersistence persistence;
|
||||
static MqFactory messageQueueFactory;
|
||||
private String inboxId;
|
||||
|
||||
@BeforeEach
|
||||
@ -47,6 +49,7 @@ public class StateMachineErrorTest {
|
||||
|
||||
dataSource = new HikariDataSource(config);
|
||||
persistence = new MqPersistence(dataSource);
|
||||
messageQueueFactory = new MqFactory(persistence);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
@ -78,7 +81,7 @@ public class StateMachineErrorTest {
|
||||
@Test
|
||||
public void smResumeResumableFromNew() throws Exception {
|
||||
var stateFactory = new StateFactory(new GsonBuilder().create());
|
||||
var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ErrorHurdles(stateFactory));
|
||||
var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ErrorHurdles(stateFactory));
|
||||
|
||||
sm.init();
|
||||
|
||||
|
@ -3,6 +3,7 @@ package nu.marginalia.mqsm;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.mq.MqFactory;
|
||||
import nu.marginalia.mq.MqMessageRow;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.MqTestUtil;
|
||||
@ -33,6 +34,7 @@ public class StateMachineResumeTest {
|
||||
|
||||
static HikariDataSource dataSource;
|
||||
static MqPersistence persistence;
|
||||
static MqFactory messageQueueFactory;
|
||||
private String inboxId;
|
||||
|
||||
@BeforeEach
|
||||
@ -48,6 +50,7 @@ public class StateMachineResumeTest {
|
||||
|
||||
dataSource = new HikariDataSource(config);
|
||||
persistence = new MqPersistence(dataSource);
|
||||
messageQueueFactory = new MqFactory(persistence);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
@ -76,7 +79,7 @@ public class StateMachineResumeTest {
|
||||
@Test
|
||||
public void smResumeResumableFromNew() throws Exception {
|
||||
var stateFactory = new StateFactory(new GsonBuilder().create());
|
||||
var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
|
||||
var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
|
||||
|
||||
persistence.sendNewMessage(inboxId, null,"RESUMABLE", "", null);
|
||||
|
||||
@ -97,7 +100,7 @@ public class StateMachineResumeTest {
|
||||
@Test
|
||||
public void smResumeFromAck() throws Exception {
|
||||
var stateFactory = new StateFactory(new GsonBuilder().create());
|
||||
var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
|
||||
var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
|
||||
|
||||
long id = persistence.sendNewMessage(inboxId, null,"RESUMABLE", "", null);
|
||||
persistence.updateMessageState(id, MqMessageState.ACK);
|
||||
@ -120,7 +123,7 @@ public class StateMachineResumeTest {
|
||||
@Test
|
||||
public void smResumeNonResumableFromNew() throws Exception {
|
||||
var stateFactory = new StateFactory(new GsonBuilder().create());
|
||||
var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
|
||||
var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
|
||||
|
||||
persistence.sendNewMessage(inboxId, null,"NON-RESUMABLE", "", null);
|
||||
|
||||
@ -141,7 +144,7 @@ public class StateMachineResumeTest {
|
||||
@Test
|
||||
public void smResumeNonResumableFromAck() throws Exception {
|
||||
var stateFactory = new StateFactory(new GsonBuilder().create());
|
||||
var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
|
||||
var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
|
||||
|
||||
long id = persistence.sendNewMessage(inboxId, null,"NON-RESUMABLE", "", null);
|
||||
persistence.updateMessageState(id, MqMessageState.ACK);
|
||||
@ -163,7 +166,7 @@ public class StateMachineResumeTest {
|
||||
@Test
|
||||
public void smResumeEmptyQueue() throws Exception {
|
||||
var stateFactory = new StateFactory(new GsonBuilder().create());
|
||||
var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
|
||||
var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new ResumeTrialsGraph(stateFactory));
|
||||
|
||||
sm.resume();
|
||||
|
||||
|
@ -3,6 +3,7 @@ package nu.marginalia.mqsm;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.mq.MqFactory;
|
||||
import nu.marginalia.mq.MqTestUtil;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqsm.graph.GraphState;
|
||||
@ -29,6 +30,7 @@ public class StateMachineTest {
|
||||
|
||||
static HikariDataSource dataSource;
|
||||
static MqPersistence persistence;
|
||||
static MqFactory messageQueueFactory;
|
||||
private String inboxId;
|
||||
|
||||
@BeforeEach
|
||||
@ -44,6 +46,7 @@ public class StateMachineTest {
|
||||
|
||||
dataSource = new HikariDataSource(config);
|
||||
persistence = new MqPersistence(dataSource);
|
||||
messageQueueFactory = new MqFactory(persistence);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
@ -83,7 +86,7 @@ public class StateMachineTest {
|
||||
var graph = new TestGraph(stateFactory);
|
||||
|
||||
|
||||
var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), graph);
|
||||
var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), graph);
|
||||
sm.registerStates(graph);
|
||||
|
||||
sm.init();
|
||||
@ -98,7 +101,7 @@ public class StateMachineTest {
|
||||
@Test
|
||||
public void testStartStopStartStop() throws Exception {
|
||||
var stateFactory = new StateFactory(new GsonBuilder().create());
|
||||
var sm = new StateMachine(persistence, inboxId, UUID.randomUUID(), new TestGraph(stateFactory));
|
||||
var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestGraph(stateFactory));
|
||||
|
||||
sm.init();
|
||||
|
||||
@ -107,7 +110,7 @@ public class StateMachineTest {
|
||||
|
||||
System.out.println("-------------------- ");
|
||||
|
||||
var sm2 = new StateMachine(persistence, inboxId, UUID.randomUUID(), new TestGraph(stateFactory));
|
||||
var sm2 = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), new TestGraph(stateFactory));
|
||||
sm2.resume();
|
||||
sm2.join();
|
||||
sm2.stop();
|
||||
|
@ -2,7 +2,7 @@ package nu.marginalia.service.server;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mq.MqFactory;
|
||||
import nu.marginalia.service.control.ServiceEventLog;
|
||||
import nu.marginalia.service.control.ServiceHeartbeat;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
@ -15,19 +15,19 @@ public class BaseServiceParams {
|
||||
public final MetricsServer metricsServer;
|
||||
public final ServiceHeartbeat heartbeat;
|
||||
public final ServiceEventLog eventLog;
|
||||
public final MqPersistence messageQueuePersistence;
|
||||
|
||||
public final MqFactory messageQueueInboxFactory;
|
||||
@Inject
|
||||
public BaseServiceParams(ServiceConfiguration configuration,
|
||||
Initialization initialization,
|
||||
MetricsServer metricsServer,
|
||||
ServiceHeartbeat heartbeat,
|
||||
ServiceEventLog eventLog, MqPersistence messageQueuePersistence) {
|
||||
ServiceEventLog eventLog,
|
||||
MqFactory messageQueueInboxFactory) {
|
||||
this.configuration = configuration;
|
||||
this.initialization = initialization;
|
||||
this.metricsServer = metricsServer;
|
||||
this.heartbeat = heartbeat;
|
||||
this.eventLog = eventLog;
|
||||
this.messageQueuePersistence = messageQueuePersistence;
|
||||
this.messageQueueInboxFactory = messageQueueInboxFactory;
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,7 @@ package nu.marginalia.service.server;
|
||||
import io.prometheus.client.Counter;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.client.exception.MessagingException;
|
||||
import nu.marginalia.mq.inbox.MqInbox;
|
||||
import nu.marginalia.mq.inbox.*;
|
||||
import nu.marginalia.service.server.mq.MqRequest;
|
||||
import nu.marginalia.service.server.mq.ServiceMqSubscription;
|
||||
import org.slf4j.Logger;
|
||||
@ -39,7 +39,7 @@ public class Service {
|
||||
private final String serviceName;
|
||||
private static volatile boolean initialized = false;
|
||||
|
||||
protected final MqInbox messageQueueInbox;
|
||||
protected final MqInboxIf messageQueueInbox;
|
||||
|
||||
public Service(BaseServiceParams params,
|
||||
Runnable configureStaticFiles
|
||||
@ -49,9 +49,9 @@ public class Service {
|
||||
|
||||
String inboxName = config.serviceName() + ":" + config.node();
|
||||
logger.info("Inbox name: {}", inboxName);
|
||||
messageQueueInbox = new MqInbox(params.messageQueuePersistence,
|
||||
inboxName,
|
||||
config.instanceUuid());
|
||||
|
||||
var mqInboxFactory = params.messageQueueInboxFactory;
|
||||
messageQueueInbox = mqInboxFactory.createAsynchronousInbox(inboxName, config.instanceUuid());
|
||||
messageQueueInbox.subscribe(new ServiceMqSubscription(this));
|
||||
|
||||
serviceName = System.getProperty("service-name");
|
||||
|
@ -5,7 +5,7 @@ import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.control.model.ControlProcess;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mq.MqFactory;
|
||||
import nu.marginalia.mqsm.StateMachine;
|
||||
import nu.marginalia.mqsm.graph.AbstractStateGraph;
|
||||
import nu.marginalia.service.control.ServiceEventLog;
|
||||
@ -17,19 +17,19 @@ import java.util.UUID;
|
||||
|
||||
@Singleton
|
||||
public class ControlProcesses {
|
||||
private final MqPersistence persistence;
|
||||
private final ServiceEventLog eventLog;
|
||||
private final Gson gson;
|
||||
private final MqFactory messageQueueFactory;
|
||||
public Map<ControlProcess, StateMachine> stateMachines = new HashMap<>();
|
||||
|
||||
@Inject
|
||||
public ControlProcesses(MqPersistence persistence,
|
||||
public ControlProcesses(MqFactory messageQueueFactory,
|
||||
GsonFactory gsonFactory,
|
||||
BaseServiceParams baseServiceParams,
|
||||
RepartitionReindexProcess repartitionReindexProcess,
|
||||
ReconvertAndLoadProcess reconvertAndLoadProcess
|
||||
) {
|
||||
this.persistence = persistence;
|
||||
this.messageQueueFactory = messageQueueFactory;
|
||||
this.eventLog = baseServiceParams.eventLog;
|
||||
this.gson = gsonFactory.get();
|
||||
register(ControlProcess.REPARTITION_REINDEX, repartitionReindexProcess);
|
||||
@ -37,7 +37,7 @@ public class ControlProcesses {
|
||||
}
|
||||
|
||||
private void register(ControlProcess process, AbstractStateGraph graph) {
|
||||
var sm = new StateMachine(persistence, process.id(), UUID.randomUUID(), graph);
|
||||
var sm = new StateMachine(messageQueueFactory, process.id(), UUID.randomUUID(), graph);
|
||||
|
||||
sm.listen((function, param) -> logStateChange(process, function));
|
||||
|
||||
|
@ -2,7 +2,6 @@ package nu.marginalia.control.svc;
|
||||
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.service.control.ServiceEventLog;
|
||||
import nu.marginalia.service.server.BaseServiceParams;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -18,9 +17,9 @@ public class MessageQueueMonitorService {
|
||||
private final ServiceEventLog eventLog;
|
||||
|
||||
@Inject
|
||||
public MessageQueueMonitorService(BaseServiceParams params) {
|
||||
this.persistence = params.messageQueuePersistence;
|
||||
this.eventLog = params.eventLog;
|
||||
public MessageQueueMonitorService(ServiceEventLog eventLog, MqPersistence persistence) {
|
||||
this.eventLog = eventLog;
|
||||
this.persistence = persistence;
|
||||
|
||||
Thread reaperThread = new Thread(this::run, "message-queue-reaper");
|
||||
reaperThread.setDaemon(true);
|
||||
|
Loading…
Reference in New Issue
Block a user