From d9e6c4f2667ddeae6a479fa50c594641f5d8f0f9 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Thu, 6 Jul 2023 18:04:16 +0200 Subject: [PATCH] Trial integration of MQ-FSM into index service. --- code/api/index-api/build.gradle | 2 +- .../marginalia/index/client/IndexClient.java | 20 ++++- .../index/client/IndexMqEndpoints.java | 8 ++ .../nu/marginalia/mq/outbox/MqOutbox.java | 26 ++++++- .../java/nu/marginalia/mqsm/StateMachine.java | 8 +- .../nu/marginalia/mq/outbox/MqOutboxTest.java | 10 +-- code/common/service/build.gradle | 1 + .../service/server/BaseServiceParams.java | 5 +- .../nu/marginalia/service/server/Service.java | 24 ++++++ .../service/server/mq/MqNotification.java | 9 +++ .../service/server/mq/MqRequest.java | 9 +++ .../server/mq/ServiceMqSubscription.java | 74 +++++++++++++++++++ .../nu/marginalia/index/IndexService.java | 23 ++++++ .../marginalia/index/svc/IndexOpsService.java | 7 ++ .../control-service/build.gradle | 1 + .../nu/marginalia/control/ControlService.java | 8 +- .../control/process/ControlProcesses.java | 38 ++++++++++ .../process/RepartitionReindexProcess.java | 72 ++++++++++++++++++ 18 files changed, 332 insertions(+), 13 deletions(-) create mode 100644 code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java create mode 100644 code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java create mode 100644 code/common/service/src/main/java/nu/marginalia/service/server/mq/MqRequest.java create mode 100644 code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java create mode 100644 code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java create mode 100644 code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/RepartitionReindexProcess.java diff --git a/code/api/index-api/build.gradle b/code/api/index-api/build.gradle index 6dbcd98f..edb6056d 100644 --- a/code/api/index-api/build.gradle +++ b/code/api/index-api/build.gradle @@ -16,7 +16,7 @@ dependencies { implementation project(':code:common:config') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') - + implementation project(':code:common:message-queue') implementation project(':code:features-index:index-query') implementation libs.lombok diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java index 8db8772f..b8d2e683 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java @@ -8,27 +8,41 @@ import nu.marginalia.WmsaHome; import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.Context; import nu.marginalia.index.client.model.query.SearchSpecification; -import nu.marginalia.index.client.model.results.SearchResultItem; import nu.marginalia.index.client.model.results.SearchResultSet; import nu.marginalia.model.gson.GsonFactory; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.id.ServiceId; import javax.annotation.CheckReturnValue; -import java.util.List; +import java.util.UUID; @Singleton public class IndexClient extends AbstractDynamicClient { private static final Summary wmsa_search_index_api_time = Summary.build().name("wmsa_search_index_api_time").help("-").register(); + private final MqOutbox outbox; + @Inject - public IndexClient(ServiceDescriptors descriptors) { + public IndexClient(ServiceDescriptors descriptors, + MqPersistence persistence) { super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get); + String inboxName = ServiceId.Index.name + ":" + "0"; + String outboxName = System.getProperty("service-name", UUID.randomUUID().toString()); + + outbox = new MqOutbox(persistence, inboxName, outboxName, UUID.randomUUID()); + setTimeout(30); } + + public MqOutbox outbox() { + return outbox; + } + @CheckReturnValue public SearchResultSet query(Context ctx, SearchSpecification specs) { return wmsa_search_index_api_time.time( diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java new file mode 100644 index 00000000..9d2476f8 --- /dev/null +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java @@ -0,0 +1,8 @@ +package nu.marginalia.index.client; + +public class IndexMqEndpoints { + public static final String INDEX_IS_BLOCKED = "INDEX-IS-BLOCKED"; + public static final String INDEX_REPARTITION = "INDEX-REPARTITION"; + public static final String INDEX_REINDEX = "INDEX-REINDEX"; + +} diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java index e8faa0ab..75fb8fcd 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java @@ -6,6 +6,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.SQLException; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -27,11 +28,12 @@ public class MqOutbox { public MqOutbox(MqPersistence persistence, String inboxName, + String outboxName, UUID instanceUUID) { this.persistence = persistence; this.inboxName = inboxName; - this.replyInboxName = "reply:" + inboxName; + this.replyInboxName = outboxName + "//" + inboxName; this.instanceUUID = instanceUUID.toString(); pollThread = new Thread(this::poll, "mq-outbox-poll-thread:" + inboxName); @@ -90,10 +92,26 @@ public class MqOutbox { } + /** Send a message and wait for a response. */ public MqMessage send(String function, String payload) throws Exception { + final long id = sendAsync(function, payload); + + return waitResponse(id); + } + + /** Send a message asynchronously, without waiting for a response. + *
+ * Use waitResponse(id) or pollResponse(id) to fetch the response. */ + public long sendAsync(String function, String payload) throws Exception { var id = persistence.sendNewMessage(inboxName, replyInboxName, function, payload, null); + pendingRequests.put(id, id); + return id; + } + + /** Blocks until a response arrives for the given message id. */ + public MqMessage waitResponse(long id) throws Exception { synchronized (pendingResponses) { while (!pendingResponses.containsKey(id)) { pendingResponses.wait(100); @@ -102,6 +120,12 @@ public class MqOutbox { } } + /** Polls for a response for the given message id. */ + public Optional pollResponse(long id) { + // no need to sync here if we aren't going to wait() + return Optional.ofNullable(pendingResponses.remove(id)); + } + public long notify(String function, String payload) throws Exception { return persistence.sendNewMessage(inboxName, null, function, payload, null); } diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java index e54b48f7..3518e9e5 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java @@ -44,7 +44,7 @@ public class StateMachine { this.queueName = queueName; smInbox = new MqInbox(persistence, queueName, instanceUUID, Executors.newSingleThreadExecutor()); - smOutbox = new MqOutbox(persistence, queueName, instanceUUID); + smOutbox = new MqOutbox(persistence, queueName, queueName+"//out", instanceUUID); smInbox.subscribe(new StateEventSubscription()); @@ -144,6 +144,12 @@ public class StateMachine { nextState, message); + if (!allStates.containsKey(nextState)) { + logger.error("Unknown state {}", nextState); + setErrorState(); + return; + } + synchronized (this) { this.state = allStates.get(nextState); notifyAll(); diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java b/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java index 6dc51f2d..3b7996f1 100644 --- a/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java +++ b/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java @@ -55,13 +55,13 @@ public class MqOutboxTest { @Test public void testOpenClose() throws InterruptedException { - var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); + var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, inboxId+"/reply", UUID.randomUUID()); outbox.stop(); } @Test public void testSend() throws Exception { - var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); + var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID()); Executors.newSingleThreadExecutor().submit(() -> outbox.send("test", "Hello World")); TimeUnit.MILLISECONDS.sleep(100); @@ -75,7 +75,7 @@ public class MqOutboxTest { @Test public void testSendAndRespond() throws Exception { - var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); + var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID()); var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); inbox.subscribe(justRespond("Alright then")); @@ -96,7 +96,7 @@ public class MqOutboxTest { @Test public void testSendMultiple() throws Exception { - var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); + var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID()); var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); inbox.subscribe(echo()); @@ -130,7 +130,7 @@ public class MqOutboxTest { @Test public void testSendAndRespondWithErrorHandler() throws Exception { - var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); + var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID()); var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); inbox.start(); diff --git a/code/common/service/build.gradle b/code/common/service/build.gradle index f153500b..156b826f 100644 --- a/code/common/service/build.gradle +++ b/code/common/service/build.gradle @@ -12,6 +12,7 @@ java { dependencies { implementation project(':code:common:service-client') implementation project(':code:common:service-discovery') + implementation project(':code:common:message-queue') implementation project(':code:common:db') implementation libs.lombok diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java b/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java index 1cd94b6c..abec5e55 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java +++ b/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java @@ -2,6 +2,7 @@ package nu.marginalia.service.server; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceHeartbeat; import nu.marginalia.service.module.ServiceConfiguration; @@ -14,17 +15,19 @@ public class BaseServiceParams { public final MetricsServer metricsServer; public final ServiceHeartbeat heartbeat; public final ServiceEventLog eventLog; + public final MqPersistence messageQueuePersistence; @Inject public BaseServiceParams(ServiceConfiguration configuration, Initialization initialization, MetricsServer metricsServer, ServiceHeartbeat heartbeat, - ServiceEventLog eventLog) { + ServiceEventLog eventLog, MqPersistence messageQueuePersistence) { this.configuration = configuration; this.initialization = initialization; this.metricsServer = metricsServer; this.heartbeat = heartbeat; this.eventLog = eventLog; + this.messageQueuePersistence = messageQueuePersistence; } } diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/Service.java b/code/common/service/src/main/java/nu/marginalia/service/server/Service.java index 5a287c99..e8386fb8 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/server/Service.java +++ b/code/common/service/src/main/java/nu/marginalia/service/server/Service.java @@ -3,6 +3,9 @@ package nu.marginalia.service.server; import io.prometheus.client.Counter; import nu.marginalia.client.Context; import nu.marginalia.client.exception.MessagingException; +import nu.marginalia.mq.inbox.MqInbox; +import nu.marginalia.service.server.mq.MqRequest; +import nu.marginalia.service.server.mq.ServiceMqSubscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -36,14 +39,25 @@ public class Service { private final String serviceName; private static volatile boolean initialized = false; + protected final MqInbox messageQueueInbox; + public Service(BaseServiceParams params, Runnable configureStaticFiles ) { this.initialization = params.initialization; + var config = params.configuration; + + String inboxName = config.serviceName() + ":" + config.node(); + logger.info("Inbox name: {}", inboxName); + messageQueueInbox = new MqInbox(params.messageQueuePersistence, + inboxName, + config.instanceUuid()); + messageQueueInbox.subscribe(new ServiceMqSubscription(this)); serviceName = System.getProperty("service-name"); initialization.addCallback(params.heartbeat::start); + initialization.addCallback(messageQueueInbox::start); initialization.addCallback(() -> params.eventLog.logEvent("SVC-INIT", "")); if (!initialization.isReady() && ! initialized ) { @@ -81,6 +95,16 @@ public class Service { }); } + @MqRequest(endpoint = "SVC-READY") + public boolean mqIsReady() { + return initialization.isReady(); + } + + @MqRequest(endpoint = "SVC-PING") + public String mqPing() { + return "pong"; + } + private void filterPublicRequests(Request request, Response response) { if (null == request.headers("X-Public")) { return; diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java b/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java new file mode 100644 index 00000000..20586f3e --- /dev/null +++ b/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java @@ -0,0 +1,9 @@ +package nu.marginalia.service.server.mq; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) +public @interface MqNotification { + String endpoint(); +} diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqRequest.java b/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqRequest.java new file mode 100644 index 00000000..60b7ebd8 --- /dev/null +++ b/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqRequest.java @@ -0,0 +1,9 @@ +package nu.marginalia.service.server.mq; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) +public @interface MqRequest { + String endpoint(); +} diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java b/code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java new file mode 100644 index 00000000..d344d928 --- /dev/null +++ b/code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java @@ -0,0 +1,74 @@ +package nu.marginalia.service.server.mq; + +import nu.marginalia.mq.MqMessage; +import nu.marginalia.mq.inbox.MqInboxResponse; +import nu.marginalia.mq.inbox.MqSubscription; +import nu.marginalia.service.server.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +public class ServiceMqSubscription implements MqSubscription { + private static final Logger logger = LoggerFactory.getLogger(ServiceMqSubscription.class); + private final Map requests = new HashMap<>(); + private final Map notifications = new HashMap<>(); + private final Service service; + + public ServiceMqSubscription(Service service) { + this.service = service; + for (var method : service.getClass().getMethods()) { + var annotation = method.getAnnotation(MqRequest.class); + if (annotation != null) { + requests.put(annotation.endpoint(), method); + } + if (method.getAnnotation(MqNotification.class) != null) { + notifications.put(method.getName(), method); + } + } + } + + @Override + public boolean filter(MqMessage rawMessage) { + boolean isInteresting = requests.containsKey(rawMessage.function()) + || notifications.containsKey(rawMessage.function()); + + if (!isInteresting) { + logger.warn("Received message for unknown function " + rawMessage.function()); + } + + return isInteresting; + } + + @Override + public MqInboxResponse onRequest(MqMessage msg) { + var method = requests.get(msg.function()); + + try { + return MqInboxResponse.ok(method.invoke(service, msg.payload()).toString()); + } + catch (InvocationTargetException ex) { + logger.error("Error invoking method " + method, ex); + return MqInboxResponse.err(ex.getCause().getMessage()); + } + catch (Exception ex) { + logger.error("Error invoking method " + method, ex); + return MqInboxResponse.err(ex.getMessage()); + } + } + + @Override + public void onNotification(MqMessage msg) { + var method = notifications.get(msg.function()); + + try { + method.invoke(service, msg.payload()); + } + catch (Exception ex) { + logger.error("Error invoking method " + method, ex); + } + } +} diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java index 369e8309..82ed2617 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java @@ -3,6 +3,7 @@ package nu.marginalia.index; import com.google.gson.Gson; import com.google.inject.Inject; import io.reactivex.rxjava3.schedulers.Schedulers; +import nu.marginalia.index.client.IndexMqEndpoints; import nu.marginalia.index.index.SearchIndex; import nu.marginalia.index.svc.IndexOpsService; import nu.marginalia.index.svc.IndexQueryService; @@ -10,6 +11,7 @@ import nu.marginalia.index.svc.IndexSearchSetsService; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.*; +import nu.marginalia.service.server.mq.MqRequest; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +74,27 @@ public class IndexService extends Service { volatile boolean initialized = false; + @MqRequest(endpoint = IndexMqEndpoints.INDEX_REPARTITION) + public String repartition(String message) { + if (!opsService.repartition()) { + throw new IllegalStateException("Ops lock busy"); + } + return "ok"; + } + + @MqRequest(endpoint = IndexMqEndpoints.INDEX_REINDEX) + public String reindex(String message) throws Exception { + if (!opsService.reindex()) { + throw new IllegalStateException("Ops lock busy"); + } + + return "ok"; + } + @MqRequest(endpoint = IndexMqEndpoints.INDEX_IS_BLOCKED) + public String isBlocked(String message) throws Exception { + return Boolean.valueOf(opsService.isBusy()).toString(); + } + public void initialize() { if (!initialized) { init.waitReady(); diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java index 34ed2927..36377c7c 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java @@ -30,6 +30,13 @@ public class IndexOpsService { return opsLock.isLocked(); } + public boolean repartition() { + return run(searchSetService::recalculateAll); + } + public boolean reindex() throws Exception { + return run(index::switchIndex).isPresent(); + } + public Object repartitionEndpoint(Request request, Response response) throws Exception { if (!run(searchSetService::recalculateAll)) { diff --git a/code/services-satellite/control-service/build.gradle b/code/services-satellite/control-service/build.gradle index 6bfffcaa..fac386e2 100644 --- a/code/services-satellite/control-service/build.gradle +++ b/code/services-satellite/control-service/build.gradle @@ -31,6 +31,7 @@ dependencies { implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') implementation project(':code:api:search-api') + implementation project(':code:api:index-api') implementation libs.lombok diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java index 40559746..93873abb 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -3,6 +3,7 @@ package nu.marginalia.control; import com.google.gson.Gson; import com.google.inject.Inject; import nu.marginalia.client.ServiceMonitors; +import nu.marginalia.control.process.ControlProcesses; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.renderer.MustacheRenderer; @@ -33,7 +34,8 @@ public class ControlService extends Service { HeartbeatService heartbeatService, EventLogService eventLogService, RendererFactory rendererFactory, - MqPersistence messageQueuePersistence + MqPersistence messageQueuePersistence, + ControlProcesses controlProcesses ) throws IOException { super(params); @@ -52,6 +54,10 @@ public class ControlService extends Service { Map.of("heartbeats", heartbeatService.getHeartbeats(), "events", eventLogService.getLastEntries(100) ))); + Spark.get("/public/repartition", (req, rsp) -> { + controlProcesses.start("REPARTITION-REINDEX"); + return "OK"; + }); monitors.subscribe(this::logMonitorStateChange); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java new file mode 100644 index 00000000..7a70eb83 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java @@ -0,0 +1,38 @@ +package nu.marginalia.control.process; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqsm.StateMachine; +import nu.marginalia.mqsm.graph.AbstractStateGraph; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +@Singleton +public class ControlProcesses { + private final MqPersistence persistence; + public Map stateMachines = new HashMap<>(); + + @Inject + public ControlProcesses(MqPersistence persistence, + RepartitionReindexProcess repartitionReindexProcess + ) { + this.persistence = persistence; + + register("REPARTITION-REINDEX", repartitionReindexProcess); + } + + private void register(String name, AbstractStateGraph graph) { + stateMachines.put(name, new StateMachine(persistence, name, UUID.randomUUID(), graph)); + } + + public void start(String name) throws Exception { + stateMachines.get(name).init(); + } + + public void resume(String name) throws Exception { + stateMachines.get(name).resume(); + } +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/RepartitionReindexProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/RepartitionReindexProcess.java new file mode 100644 index 00000000..ef76e654 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/RepartitionReindexProcess.java @@ -0,0 +1,72 @@ +package nu.marginalia.control.process; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.index.client.IndexClient; +import nu.marginalia.index.client.IndexMqEndpoints; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; + +@Singleton +public class RepartitionReindexProcess extends AbstractStateGraph { + + private final MqOutbox indexOutbox; + + // STATES + + private static final String INITIAL = "INITIAL"; + private static final String REPARTITION = "REPARTITION"; + private static final String REPARTITION_REPLY = "REPARTITION-REPLY"; + private static final String REINDEX = "REINDEX"; + private static final String REINDEX_REPLY = "REINDEX-REPLY"; + private static final String END = "END"; + + + @Inject + public RepartitionReindexProcess(StateFactory stateFactory, IndexClient indexClient) { + super(stateFactory); + + indexOutbox = indexClient.outbox(); + } + + @GraphState(name = INITIAL, next = REPARTITION) + public void init() throws Exception { + var rsp = indexOutbox.send(IndexMqEndpoints.INDEX_IS_BLOCKED, ""); + + if (rsp.payload().equalsIgnoreCase("true")) { + error("Index is blocked"); + } + } + + @GraphState(name = REPARTITION, next = REPARTITION_REPLY) + public Long repartition() throws Exception { + return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, ""); + } + + @GraphState(name = REPARTITION_REPLY, next = REINDEX) + public void repartitionReply(Long id) throws Exception { + var rsp = indexOutbox.waitResponse(id); + + if (rsp.state() != MqMessageState.OK) { + error("Repartition failed"); + } + } + + @GraphState(name = REINDEX, next = REINDEX_REPLY) + public Long reindex() throws Exception { + return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, ""); + } + + @GraphState(name = REINDEX_REPLY, next = END) + public void reindexReply(Long id) throws Exception { + var rsp = indexOutbox.waitResponse(id); + + if (rsp.state() != MqMessageState.OK) { + error("Repartition failed"); + } + } + +}