diff --git a/code/api/index-api/build.gradle b/code/api/index-api/build.gradle
index 6dbcd98f..edb6056d 100644
--- a/code/api/index-api/build.gradle
+++ b/code/api/index-api/build.gradle
@@ -16,7 +16,7 @@ dependencies {
implementation project(':code:common:config')
implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client')
-
+ implementation project(':code:common:message-queue')
implementation project(':code:features-index:index-query')
implementation libs.lombok
diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java
index 8db8772f..b8d2e683 100644
--- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java
+++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java
@@ -8,27 +8,41 @@ import nu.marginalia.WmsaHome;
import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.client.Context;
import nu.marginalia.index.client.model.query.SearchSpecification;
-import nu.marginalia.index.client.model.results.SearchResultItem;
import nu.marginalia.index.client.model.results.SearchResultSet;
import nu.marginalia.model.gson.GsonFactory;
+import nu.marginalia.mq.outbox.MqOutbox;
+import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import javax.annotation.CheckReturnValue;
-import java.util.List;
+import java.util.UUID;
@Singleton
public class IndexClient extends AbstractDynamicClient {
private static final Summary wmsa_search_index_api_time = Summary.build().name("wmsa_search_index_api_time").help("-").register();
+ private final MqOutbox outbox;
+
@Inject
- public IndexClient(ServiceDescriptors descriptors) {
+ public IndexClient(ServiceDescriptors descriptors,
+ MqPersistence persistence) {
super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get);
+ String inboxName = ServiceId.Index.name + ":" + "0";
+ String outboxName = System.getProperty("service-name", UUID.randomUUID().toString());
+
+ outbox = new MqOutbox(persistence, inboxName, outboxName, UUID.randomUUID());
+
setTimeout(30);
}
+
+ public MqOutbox outbox() {
+ return outbox;
+ }
+
@CheckReturnValue
public SearchResultSet query(Context ctx, SearchSpecification specs) {
return wmsa_search_index_api_time.time(
diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java
new file mode 100644
index 00000000..9d2476f8
--- /dev/null
+++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java
@@ -0,0 +1,8 @@
+package nu.marginalia.index.client;
+
+public class IndexMqEndpoints {
+ public static final String INDEX_IS_BLOCKED = "INDEX-IS-BLOCKED";
+ public static final String INDEX_REPARTITION = "INDEX-REPARTITION";
+ public static final String INDEX_REINDEX = "INDEX-REINDEX";
+
+}
diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java
index e8faa0ab..75fb8fcd 100644
--- a/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java
+++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java
@@ -6,6 +6,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -27,11 +28,12 @@ public class MqOutbox {
public MqOutbox(MqPersistence persistence,
String inboxName,
+ String outboxName,
UUID instanceUUID) {
this.persistence = persistence;
this.inboxName = inboxName;
- this.replyInboxName = "reply:" + inboxName;
+ this.replyInboxName = outboxName + "//" + inboxName;
this.instanceUUID = instanceUUID.toString();
pollThread = new Thread(this::poll, "mq-outbox-poll-thread:" + inboxName);
@@ -90,10 +92,26 @@ public class MqOutbox {
}
+ /** Send a message and wait for a response. */
public MqMessage send(String function, String payload) throws Exception {
+ final long id = sendAsync(function, payload);
+
+ return waitResponse(id);
+ }
+
+ /** Send a message asynchronously, without waiting for a response.
+ *
+ * Use waitResponse(id) or pollResponse(id) to fetch the response. */
+ public long sendAsync(String function, String payload) throws Exception {
var id = persistence.sendNewMessage(inboxName, replyInboxName, function, payload, null);
+
pendingRequests.put(id, id);
+ return id;
+ }
+
+ /** Blocks until a response arrives for the given message id. */
+ public MqMessage waitResponse(long id) throws Exception {
synchronized (pendingResponses) {
while (!pendingResponses.containsKey(id)) {
pendingResponses.wait(100);
@@ -102,6 +120,12 @@ public class MqOutbox {
}
}
+ /** Polls for a response for the given message id. */
+ public Optional pollResponse(long id) {
+ // no need to sync here if we aren't going to wait()
+ return Optional.ofNullable(pendingResponses.remove(id));
+ }
+
public long notify(String function, String payload) throws Exception {
return persistence.sendNewMessage(inboxName, null, function, payload, null);
}
diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java
index e54b48f7..3518e9e5 100644
--- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java
+++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java
@@ -44,7 +44,7 @@ public class StateMachine {
this.queueName = queueName;
smInbox = new MqInbox(persistence, queueName, instanceUUID, Executors.newSingleThreadExecutor());
- smOutbox = new MqOutbox(persistence, queueName, instanceUUID);
+ smOutbox = new MqOutbox(persistence, queueName, queueName+"//out", instanceUUID);
smInbox.subscribe(new StateEventSubscription());
@@ -144,6 +144,12 @@ public class StateMachine {
nextState,
message);
+ if (!allStates.containsKey(nextState)) {
+ logger.error("Unknown state {}", nextState);
+ setErrorState();
+ return;
+ }
+
synchronized (this) {
this.state = allStates.get(nextState);
notifyAll();
diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java b/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java
index 6dc51f2d..3b7996f1 100644
--- a/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java
+++ b/code/common/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java
@@ -55,13 +55,13 @@ public class MqOutboxTest {
@Test
public void testOpenClose() throws InterruptedException {
- var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+ var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, inboxId+"/reply", UUID.randomUUID());
outbox.stop();
}
@Test
public void testSend() throws Exception {
- var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+ var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
Executors.newSingleThreadExecutor().submit(() -> outbox.send("test", "Hello World"));
TimeUnit.MILLISECONDS.sleep(100);
@@ -75,7 +75,7 @@ public class MqOutboxTest {
@Test
public void testSendAndRespond() throws Exception {
- var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+ var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
inbox.subscribe(justRespond("Alright then"));
@@ -96,7 +96,7 @@ public class MqOutboxTest {
@Test
public void testSendMultiple() throws Exception {
- var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+ var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
inbox.subscribe(echo());
@@ -130,7 +130,7 @@ public class MqOutboxTest {
@Test
public void testSendAndRespondWithErrorHandler() throws Exception {
- var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
+ var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
inbox.start();
diff --git a/code/common/service/build.gradle b/code/common/service/build.gradle
index f153500b..156b826f 100644
--- a/code/common/service/build.gradle
+++ b/code/common/service/build.gradle
@@ -12,6 +12,7 @@ java {
dependencies {
implementation project(':code:common:service-client')
implementation project(':code:common:service-discovery')
+ implementation project(':code:common:message-queue')
implementation project(':code:common:db')
implementation libs.lombok
diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java b/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java
index 1cd94b6c..abec5e55 100644
--- a/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java
+++ b/code/common/service/src/main/java/nu/marginalia/service/server/BaseServiceParams.java
@@ -2,6 +2,7 @@ package nu.marginalia.service.server;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.service.module.ServiceConfiguration;
@@ -14,17 +15,19 @@ public class BaseServiceParams {
public final MetricsServer metricsServer;
public final ServiceHeartbeat heartbeat;
public final ServiceEventLog eventLog;
+ public final MqPersistence messageQueuePersistence;
@Inject
public BaseServiceParams(ServiceConfiguration configuration,
Initialization initialization,
MetricsServer metricsServer,
ServiceHeartbeat heartbeat,
- ServiceEventLog eventLog) {
+ ServiceEventLog eventLog, MqPersistence messageQueuePersistence) {
this.configuration = configuration;
this.initialization = initialization;
this.metricsServer = metricsServer;
this.heartbeat = heartbeat;
this.eventLog = eventLog;
+ this.messageQueuePersistence = messageQueuePersistence;
}
}
diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/Service.java b/code/common/service/src/main/java/nu/marginalia/service/server/Service.java
index 5a287c99..e8386fb8 100644
--- a/code/common/service/src/main/java/nu/marginalia/service/server/Service.java
+++ b/code/common/service/src/main/java/nu/marginalia/service/server/Service.java
@@ -3,6 +3,9 @@ package nu.marginalia.service.server;
import io.prometheus.client.Counter;
import nu.marginalia.client.Context;
import nu.marginalia.client.exception.MessagingException;
+import nu.marginalia.mq.inbox.MqInbox;
+import nu.marginalia.service.server.mq.MqRequest;
+import nu.marginalia.service.server.mq.ServiceMqSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
@@ -36,14 +39,25 @@ public class Service {
private final String serviceName;
private static volatile boolean initialized = false;
+ protected final MqInbox messageQueueInbox;
+
public Service(BaseServiceParams params,
Runnable configureStaticFiles
) {
this.initialization = params.initialization;
+ var config = params.configuration;
+
+ String inboxName = config.serviceName() + ":" + config.node();
+ logger.info("Inbox name: {}", inboxName);
+ messageQueueInbox = new MqInbox(params.messageQueuePersistence,
+ inboxName,
+ config.instanceUuid());
+ messageQueueInbox.subscribe(new ServiceMqSubscription(this));
serviceName = System.getProperty("service-name");
initialization.addCallback(params.heartbeat::start);
+ initialization.addCallback(messageQueueInbox::start);
initialization.addCallback(() -> params.eventLog.logEvent("SVC-INIT", ""));
if (!initialization.isReady() && ! initialized ) {
@@ -81,6 +95,16 @@ public class Service {
});
}
+ @MqRequest(endpoint = "SVC-READY")
+ public boolean mqIsReady() {
+ return initialization.isReady();
+ }
+
+ @MqRequest(endpoint = "SVC-PING")
+ public String mqPing() {
+ return "pong";
+ }
+
private void filterPublicRequests(Request request, Response response) {
if (null == request.headers("X-Public")) {
return;
diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java b/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java
new file mode 100644
index 00000000..20586f3e
--- /dev/null
+++ b/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java
@@ -0,0 +1,9 @@
+package nu.marginalia.service.server.mq;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface MqNotification {
+ String endpoint();
+}
diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqRequest.java b/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqRequest.java
new file mode 100644
index 00000000..60b7ebd8
--- /dev/null
+++ b/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqRequest.java
@@ -0,0 +1,9 @@
+package nu.marginalia.service.server.mq;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface MqRequest {
+ String endpoint();
+}
diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java b/code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java
new file mode 100644
index 00000000..d344d928
--- /dev/null
+++ b/code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java
@@ -0,0 +1,74 @@
+package nu.marginalia.service.server.mq;
+
+import nu.marginalia.mq.MqMessage;
+import nu.marginalia.mq.inbox.MqInboxResponse;
+import nu.marginalia.mq.inbox.MqSubscription;
+import nu.marginalia.service.server.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ServiceMqSubscription implements MqSubscription {
+ private static final Logger logger = LoggerFactory.getLogger(ServiceMqSubscription.class);
+ private final Map requests = new HashMap<>();
+ private final Map notifications = new HashMap<>();
+ private final Service service;
+
+ public ServiceMqSubscription(Service service) {
+ this.service = service;
+ for (var method : service.getClass().getMethods()) {
+ var annotation = method.getAnnotation(MqRequest.class);
+ if (annotation != null) {
+ requests.put(annotation.endpoint(), method);
+ }
+ if (method.getAnnotation(MqNotification.class) != null) {
+ notifications.put(method.getName(), method);
+ }
+ }
+ }
+
+ @Override
+ public boolean filter(MqMessage rawMessage) {
+ boolean isInteresting = requests.containsKey(rawMessage.function())
+ || notifications.containsKey(rawMessage.function());
+
+ if (!isInteresting) {
+ logger.warn("Received message for unknown function " + rawMessage.function());
+ }
+
+ return isInteresting;
+ }
+
+ @Override
+ public MqInboxResponse onRequest(MqMessage msg) {
+ var method = requests.get(msg.function());
+
+ try {
+ return MqInboxResponse.ok(method.invoke(service, msg.payload()).toString());
+ }
+ catch (InvocationTargetException ex) {
+ logger.error("Error invoking method " + method, ex);
+ return MqInboxResponse.err(ex.getCause().getMessage());
+ }
+ catch (Exception ex) {
+ logger.error("Error invoking method " + method, ex);
+ return MqInboxResponse.err(ex.getMessage());
+ }
+ }
+
+ @Override
+ public void onNotification(MqMessage msg) {
+ var method = notifications.get(msg.function());
+
+ try {
+ method.invoke(service, msg.payload());
+ }
+ catch (Exception ex) {
+ logger.error("Error invoking method " + method, ex);
+ }
+ }
+}
diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java
index 369e8309..82ed2617 100644
--- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java
+++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java
@@ -3,6 +3,7 @@ package nu.marginalia.index;
import com.google.gson.Gson;
import com.google.inject.Inject;
import io.reactivex.rxjava3.schedulers.Schedulers;
+import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.index.index.SearchIndex;
import nu.marginalia.index.svc.IndexOpsService;
import nu.marginalia.index.svc.IndexQueryService;
@@ -10,6 +11,7 @@ import nu.marginalia.index.svc.IndexSearchSetsService;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.*;
+import nu.marginalia.service.server.mq.MqRequest;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +74,27 @@ public class IndexService extends Service {
volatile boolean initialized = false;
+ @MqRequest(endpoint = IndexMqEndpoints.INDEX_REPARTITION)
+ public String repartition(String message) {
+ if (!opsService.repartition()) {
+ throw new IllegalStateException("Ops lock busy");
+ }
+ return "ok";
+ }
+
+ @MqRequest(endpoint = IndexMqEndpoints.INDEX_REINDEX)
+ public String reindex(String message) throws Exception {
+ if (!opsService.reindex()) {
+ throw new IllegalStateException("Ops lock busy");
+ }
+
+ return "ok";
+ }
+ @MqRequest(endpoint = IndexMqEndpoints.INDEX_IS_BLOCKED)
+ public String isBlocked(String message) throws Exception {
+ return Boolean.valueOf(opsService.isBusy()).toString();
+ }
+
public void initialize() {
if (!initialized) {
init.waitReady();
diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java
index 34ed2927..36377c7c 100644
--- a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java
+++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java
@@ -30,6 +30,13 @@ public class IndexOpsService {
return opsLock.isLocked();
}
+ public boolean repartition() {
+ return run(searchSetService::recalculateAll);
+ }
+ public boolean reindex() throws Exception {
+ return run(index::switchIndex).isPresent();
+ }
+
public Object repartitionEndpoint(Request request, Response response) throws Exception {
if (!run(searchSetService::recalculateAll)) {
diff --git a/code/services-satellite/control-service/build.gradle b/code/services-satellite/control-service/build.gradle
index 6bfffcaa..fac386e2 100644
--- a/code/services-satellite/control-service/build.gradle
+++ b/code/services-satellite/control-service/build.gradle
@@ -31,6 +31,7 @@ dependencies {
implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client')
implementation project(':code:api:search-api')
+ implementation project(':code:api:index-api')
implementation libs.lombok
diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java
index 40559746..93873abb 100644
--- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java
+++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java
@@ -3,6 +3,7 @@ package nu.marginalia.control;
import com.google.gson.Gson;
import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors;
+import nu.marginalia.control.process.ControlProcesses;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.renderer.MustacheRenderer;
@@ -33,7 +34,8 @@ public class ControlService extends Service {
HeartbeatService heartbeatService,
EventLogService eventLogService,
RendererFactory rendererFactory,
- MqPersistence messageQueuePersistence
+ MqPersistence messageQueuePersistence,
+ ControlProcesses controlProcesses
) throws IOException {
super(params);
@@ -52,6 +54,10 @@ public class ControlService extends Service {
Map.of("heartbeats", heartbeatService.getHeartbeats(),
"events", eventLogService.getLastEntries(100)
)));
+ Spark.get("/public/repartition", (req, rsp) -> {
+ controlProcesses.start("REPARTITION-REINDEX");
+ return "OK";
+ });
monitors.subscribe(this::logMonitorStateChange);
diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java
new file mode 100644
index 00000000..7a70eb83
--- /dev/null
+++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java
@@ -0,0 +1,38 @@
+package nu.marginalia.control.process;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import nu.marginalia.mq.persistence.MqPersistence;
+import nu.marginalia.mqsm.StateMachine;
+import nu.marginalia.mqsm.graph.AbstractStateGraph;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+@Singleton
+public class ControlProcesses {
+ private final MqPersistence persistence;
+ public Map stateMachines = new HashMap<>();
+
+ @Inject
+ public ControlProcesses(MqPersistence persistence,
+ RepartitionReindexProcess repartitionReindexProcess
+ ) {
+ this.persistence = persistence;
+
+ register("REPARTITION-REINDEX", repartitionReindexProcess);
+ }
+
+ private void register(String name, AbstractStateGraph graph) {
+ stateMachines.put(name, new StateMachine(persistence, name, UUID.randomUUID(), graph));
+ }
+
+ public void start(String name) throws Exception {
+ stateMachines.get(name).init();
+ }
+
+ public void resume(String name) throws Exception {
+ stateMachines.get(name).resume();
+ }
+}
diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/RepartitionReindexProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/RepartitionReindexProcess.java
new file mode 100644
index 00000000..ef76e654
--- /dev/null
+++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/RepartitionReindexProcess.java
@@ -0,0 +1,72 @@
+package nu.marginalia.control.process;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import nu.marginalia.index.client.IndexClient;
+import nu.marginalia.index.client.IndexMqEndpoints;
+import nu.marginalia.mq.MqMessageState;
+import nu.marginalia.mq.outbox.MqOutbox;
+import nu.marginalia.mqsm.StateFactory;
+import nu.marginalia.mqsm.graph.AbstractStateGraph;
+import nu.marginalia.mqsm.graph.GraphState;
+
+@Singleton
+public class RepartitionReindexProcess extends AbstractStateGraph {
+
+ private final MqOutbox indexOutbox;
+
+ // STATES
+
+ private static final String INITIAL = "INITIAL";
+ private static final String REPARTITION = "REPARTITION";
+ private static final String REPARTITION_REPLY = "REPARTITION-REPLY";
+ private static final String REINDEX = "REINDEX";
+ private static final String REINDEX_REPLY = "REINDEX-REPLY";
+ private static final String END = "END";
+
+
+ @Inject
+ public RepartitionReindexProcess(StateFactory stateFactory, IndexClient indexClient) {
+ super(stateFactory);
+
+ indexOutbox = indexClient.outbox();
+ }
+
+ @GraphState(name = INITIAL, next = REPARTITION)
+ public void init() throws Exception {
+ var rsp = indexOutbox.send(IndexMqEndpoints.INDEX_IS_BLOCKED, "");
+
+ if (rsp.payload().equalsIgnoreCase("true")) {
+ error("Index is blocked");
+ }
+ }
+
+ @GraphState(name = REPARTITION, next = REPARTITION_REPLY)
+ public Long repartition() throws Exception {
+ return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, "");
+ }
+
+ @GraphState(name = REPARTITION_REPLY, next = REINDEX)
+ public void repartitionReply(Long id) throws Exception {
+ var rsp = indexOutbox.waitResponse(id);
+
+ if (rsp.state() != MqMessageState.OK) {
+ error("Repartition failed");
+ }
+ }
+
+ @GraphState(name = REINDEX, next = REINDEX_REPLY)
+ public Long reindex() throws Exception {
+ return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, "");
+ }
+
+ @GraphState(name = REINDEX_REPLY, next = END)
+ public void reindexReply(Long id) throws Exception {
+ var rsp = indexOutbox.waitResponse(id);
+
+ if (rsp.state() != MqMessageState.OK) {
+ error("Repartition failed");
+ }
+ }
+
+}