Trial integration of MQ-FSM into index service.

This commit is contained in:
Viktor Lofgren 2023-07-06 18:04:16 +02:00
parent 34653f03a2
commit d9e6c4f266
18 changed files with 332 additions and 13 deletions

View File

@ -16,7 +16,7 @@ dependencies {
implementation project(':code:common:config') implementation project(':code:common:config')
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client') implementation project(':code:common:service-client')
implementation project(':code:common:message-queue')
implementation project(':code:features-index:index-query') implementation project(':code:features-index:index-query')
implementation libs.lombok implementation libs.lombok

View File

@ -8,27 +8,41 @@ import nu.marginalia.WmsaHome;
import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.client.Context; import nu.marginalia.client.Context;
import nu.marginalia.index.client.model.query.SearchSpecification; import nu.marginalia.index.client.model.query.SearchSpecification;
import nu.marginalia.index.client.model.results.SearchResultItem;
import nu.marginalia.index.client.model.results.SearchResultSet; import nu.marginalia.index.client.model.results.SearchResultSet;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId; import nu.marginalia.service.id.ServiceId;
import javax.annotation.CheckReturnValue; import javax.annotation.CheckReturnValue;
import java.util.List; import java.util.UUID;
@Singleton @Singleton
public class IndexClient extends AbstractDynamicClient { public class IndexClient extends AbstractDynamicClient {
private static final Summary wmsa_search_index_api_time = Summary.build().name("wmsa_search_index_api_time").help("-").register(); private static final Summary wmsa_search_index_api_time = Summary.build().name("wmsa_search_index_api_time").help("-").register();
private final MqOutbox outbox;
@Inject @Inject
public IndexClient(ServiceDescriptors descriptors) { public IndexClient(ServiceDescriptors descriptors,
MqPersistence persistence) {
super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get); super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get);
String inboxName = ServiceId.Index.name + ":" + "0";
String outboxName = System.getProperty("service-name", UUID.randomUUID().toString());
outbox = new MqOutbox(persistence, inboxName, outboxName, UUID.randomUUID());
setTimeout(30); setTimeout(30);
} }
public MqOutbox outbox() {
return outbox;
}
@CheckReturnValue @CheckReturnValue
public SearchResultSet query(Context ctx, SearchSpecification specs) { public SearchResultSet query(Context ctx, SearchSpecification specs) {
return wmsa_search_index_api_time.time( return wmsa_search_index_api_time.time(

View File

@ -0,0 +1,8 @@
package nu.marginalia.index.client;
public class IndexMqEndpoints {
public static final String INDEX_IS_BLOCKED = "INDEX-IS-BLOCKED";
public static final String INDEX_REPARTITION = "INDEX-REPARTITION";
public static final String INDEX_REINDEX = "INDEX-REINDEX";
}

View File

@ -6,6 +6,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -27,11 +28,12 @@ public class MqOutbox {
public MqOutbox(MqPersistence persistence, public MqOutbox(MqPersistence persistence,
String inboxName, String inboxName,
String outboxName,
UUID instanceUUID) { UUID instanceUUID) {
this.persistence = persistence; this.persistence = persistence;
this.inboxName = inboxName; this.inboxName = inboxName;
this.replyInboxName = "reply:" + inboxName; this.replyInboxName = outboxName + "//" + inboxName;
this.instanceUUID = instanceUUID.toString(); this.instanceUUID = instanceUUID.toString();
pollThread = new Thread(this::poll, "mq-outbox-poll-thread:" + inboxName); pollThread = new Thread(this::poll, "mq-outbox-poll-thread:" + inboxName);
@ -90,10 +92,26 @@ public class MqOutbox {
} }
/** Send a message and wait for a response. */
public MqMessage send(String function, String payload) throws Exception { public MqMessage send(String function, String payload) throws Exception {
final long id = sendAsync(function, payload);
return waitResponse(id);
}
/** Send a message asynchronously, without waiting for a response.
* <br>
* Use waitResponse(id) or pollResponse(id) to fetch the response. */
public long sendAsync(String function, String payload) throws Exception {
var id = persistence.sendNewMessage(inboxName, replyInboxName, function, payload, null); var id = persistence.sendNewMessage(inboxName, replyInboxName, function, payload, null);
pendingRequests.put(id, id); pendingRequests.put(id, id);
return id;
}
/** Blocks until a response arrives for the given message id. */
public MqMessage waitResponse(long id) throws Exception {
synchronized (pendingResponses) { synchronized (pendingResponses) {
while (!pendingResponses.containsKey(id)) { while (!pendingResponses.containsKey(id)) {
pendingResponses.wait(100); pendingResponses.wait(100);
@ -102,6 +120,12 @@ public class MqOutbox {
} }
} }
/** Polls for a response for the given message id. */
public Optional<MqMessage> pollResponse(long id) {
// no need to sync here if we aren't going to wait()
return Optional.ofNullable(pendingResponses.remove(id));
}
public long notify(String function, String payload) throws Exception { public long notify(String function, String payload) throws Exception {
return persistence.sendNewMessage(inboxName, null, function, payload, null); return persistence.sendNewMessage(inboxName, null, function, payload, null);
} }

View File

@ -44,7 +44,7 @@ public class StateMachine {
this.queueName = queueName; this.queueName = queueName;
smInbox = new MqInbox(persistence, queueName, instanceUUID, Executors.newSingleThreadExecutor()); smInbox = new MqInbox(persistence, queueName, instanceUUID, Executors.newSingleThreadExecutor());
smOutbox = new MqOutbox(persistence, queueName, instanceUUID); smOutbox = new MqOutbox(persistence, queueName, queueName+"//out", instanceUUID);
smInbox.subscribe(new StateEventSubscription()); smInbox.subscribe(new StateEventSubscription());
@ -144,6 +144,12 @@ public class StateMachine {
nextState, nextState,
message); message);
if (!allStates.containsKey(nextState)) {
logger.error("Unknown state {}", nextState);
setErrorState();
return;
}
synchronized (this) { synchronized (this) {
this.state = allStates.get(nextState); this.state = allStates.get(nextState);
notifyAll(); notifyAll();

View File

@ -55,13 +55,13 @@ public class MqOutboxTest {
@Test @Test
public void testOpenClose() throws InterruptedException { public void testOpenClose() throws InterruptedException {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, inboxId+"/reply", UUID.randomUUID());
outbox.stop(); outbox.stop();
} }
@Test @Test
public void testSend() throws Exception { public void testSend() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
Executors.newSingleThreadExecutor().submit(() -> outbox.send("test", "Hello World")); Executors.newSingleThreadExecutor().submit(() -> outbox.send("test", "Hello World"));
TimeUnit.MILLISECONDS.sleep(100); TimeUnit.MILLISECONDS.sleep(100);
@ -75,7 +75,7 @@ public class MqOutboxTest {
@Test @Test
public void testSendAndRespond() throws Exception { public void testSendAndRespond() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
inbox.subscribe(justRespond("Alright then")); inbox.subscribe(justRespond("Alright then"));
@ -96,7 +96,7 @@ public class MqOutboxTest {
@Test @Test
public void testSendMultiple() throws Exception { public void testSendMultiple() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
inbox.subscribe(echo()); inbox.subscribe(echo());
@ -130,7 +130,7 @@ public class MqOutboxTest {
@Test @Test
public void testSendAndRespondWithErrorHandler() throws Exception { public void testSendAndRespondWithErrorHandler() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId,inboxId+"/reply", UUID.randomUUID());
var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID()); var inbox = new MqInbox(new MqPersistence(dataSource), inboxId, UUID.randomUUID());
inbox.start(); inbox.start();

View File

@ -12,6 +12,7 @@ java {
dependencies { dependencies {
implementation project(':code:common:service-client') implementation project(':code:common:service-client')
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:message-queue')
implementation project(':code:common:db') implementation project(':code:common:db')
implementation libs.lombok implementation libs.lombok

View File

@ -2,6 +2,7 @@ package nu.marginalia.service.server;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.control.ServiceHeartbeat; import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@ -14,17 +15,19 @@ public class BaseServiceParams {
public final MetricsServer metricsServer; public final MetricsServer metricsServer;
public final ServiceHeartbeat heartbeat; public final ServiceHeartbeat heartbeat;
public final ServiceEventLog eventLog; public final ServiceEventLog eventLog;
public final MqPersistence messageQueuePersistence;
@Inject @Inject
public BaseServiceParams(ServiceConfiguration configuration, public BaseServiceParams(ServiceConfiguration configuration,
Initialization initialization, Initialization initialization,
MetricsServer metricsServer, MetricsServer metricsServer,
ServiceHeartbeat heartbeat, ServiceHeartbeat heartbeat,
ServiceEventLog eventLog) { ServiceEventLog eventLog, MqPersistence messageQueuePersistence) {
this.configuration = configuration; this.configuration = configuration;
this.initialization = initialization; this.initialization = initialization;
this.metricsServer = metricsServer; this.metricsServer = metricsServer;
this.heartbeat = heartbeat; this.heartbeat = heartbeat;
this.eventLog = eventLog; this.eventLog = eventLog;
this.messageQueuePersistence = messageQueuePersistence;
} }
} }

View File

@ -3,6 +3,9 @@ package nu.marginalia.service.server;
import io.prometheus.client.Counter; import io.prometheus.client.Counter;
import nu.marginalia.client.Context; import nu.marginalia.client.Context;
import nu.marginalia.client.exception.MessagingException; import nu.marginalia.client.exception.MessagingException;
import nu.marginalia.mq.inbox.MqInbox;
import nu.marginalia.service.server.mq.MqRequest;
import nu.marginalia.service.server.mq.ServiceMqSubscription;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.Marker; import org.slf4j.Marker;
@ -36,14 +39,25 @@ public class Service {
private final String serviceName; private final String serviceName;
private static volatile boolean initialized = false; private static volatile boolean initialized = false;
protected final MqInbox messageQueueInbox;
public Service(BaseServiceParams params, public Service(BaseServiceParams params,
Runnable configureStaticFiles Runnable configureStaticFiles
) { ) {
this.initialization = params.initialization; this.initialization = params.initialization;
var config = params.configuration;
String inboxName = config.serviceName() + ":" + config.node();
logger.info("Inbox name: {}", inboxName);
messageQueueInbox = new MqInbox(params.messageQueuePersistence,
inboxName,
config.instanceUuid());
messageQueueInbox.subscribe(new ServiceMqSubscription(this));
serviceName = System.getProperty("service-name"); serviceName = System.getProperty("service-name");
initialization.addCallback(params.heartbeat::start); initialization.addCallback(params.heartbeat::start);
initialization.addCallback(messageQueueInbox::start);
initialization.addCallback(() -> params.eventLog.logEvent("SVC-INIT", "")); initialization.addCallback(() -> params.eventLog.logEvent("SVC-INIT", ""));
if (!initialization.isReady() && ! initialized ) { if (!initialization.isReady() && ! initialized ) {
@ -81,6 +95,16 @@ public class Service {
}); });
} }
@MqRequest(endpoint = "SVC-READY")
public boolean mqIsReady() {
return initialization.isReady();
}
@MqRequest(endpoint = "SVC-PING")
public String mqPing() {
return "pong";
}
private void filterPublicRequests(Request request, Response response) { private void filterPublicRequests(Request request, Response response) {
if (null == request.headers("X-Public")) { if (null == request.headers("X-Public")) {
return; return;

View File

@ -0,0 +1,9 @@
package nu.marginalia.service.server.mq;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
public @interface MqNotification {
String endpoint();
}

View File

@ -0,0 +1,9 @@
package nu.marginalia.service.server.mq;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
public @interface MqRequest {
String endpoint();
}

View File

@ -0,0 +1,74 @@
package nu.marginalia.service.server.mq;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSubscription;
import nu.marginalia.service.server.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class ServiceMqSubscription implements MqSubscription {
private static final Logger logger = LoggerFactory.getLogger(ServiceMqSubscription.class);
private final Map<String, Method> requests = new HashMap<>();
private final Map<String, Method> notifications = new HashMap<>();
private final Service service;
public ServiceMqSubscription(Service service) {
this.service = service;
for (var method : service.getClass().getMethods()) {
var annotation = method.getAnnotation(MqRequest.class);
if (annotation != null) {
requests.put(annotation.endpoint(), method);
}
if (method.getAnnotation(MqNotification.class) != null) {
notifications.put(method.getName(), method);
}
}
}
@Override
public boolean filter(MqMessage rawMessage) {
boolean isInteresting = requests.containsKey(rawMessage.function())
|| notifications.containsKey(rawMessage.function());
if (!isInteresting) {
logger.warn("Received message for unknown function " + rawMessage.function());
}
return isInteresting;
}
@Override
public MqInboxResponse onRequest(MqMessage msg) {
var method = requests.get(msg.function());
try {
return MqInboxResponse.ok(method.invoke(service, msg.payload()).toString());
}
catch (InvocationTargetException ex) {
logger.error("Error invoking method " + method, ex);
return MqInboxResponse.err(ex.getCause().getMessage());
}
catch (Exception ex) {
logger.error("Error invoking method " + method, ex);
return MqInboxResponse.err(ex.getMessage());
}
}
@Override
public void onNotification(MqMessage msg) {
var method = notifications.get(msg.function());
try {
method.invoke(service, msg.payload());
}
catch (Exception ex) {
logger.error("Error invoking method " + method, ex);
}
}
}

View File

@ -3,6 +3,7 @@ package nu.marginalia.index;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.Schedulers;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.index.index.SearchIndex; import nu.marginalia.index.index.SearchIndex;
import nu.marginalia.index.svc.IndexOpsService; import nu.marginalia.index.svc.IndexOpsService;
import nu.marginalia.index.svc.IndexQueryService; import nu.marginalia.index.svc.IndexQueryService;
@ -10,6 +11,7 @@ import nu.marginalia.index.svc.IndexSearchSetsService;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.*; import nu.marginalia.service.server.*;
import nu.marginalia.service.server.mq.MqRequest;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -72,6 +74,27 @@ public class IndexService extends Service {
volatile boolean initialized = false; volatile boolean initialized = false;
@MqRequest(endpoint = IndexMqEndpoints.INDEX_REPARTITION)
public String repartition(String message) {
if (!opsService.repartition()) {
throw new IllegalStateException("Ops lock busy");
}
return "ok";
}
@MqRequest(endpoint = IndexMqEndpoints.INDEX_REINDEX)
public String reindex(String message) throws Exception {
if (!opsService.reindex()) {
throw new IllegalStateException("Ops lock busy");
}
return "ok";
}
@MqRequest(endpoint = IndexMqEndpoints.INDEX_IS_BLOCKED)
public String isBlocked(String message) throws Exception {
return Boolean.valueOf(opsService.isBusy()).toString();
}
public void initialize() { public void initialize() {
if (!initialized) { if (!initialized) {
init.waitReady(); init.waitReady();

View File

@ -30,6 +30,13 @@ public class IndexOpsService {
return opsLock.isLocked(); return opsLock.isLocked();
} }
public boolean repartition() {
return run(searchSetService::recalculateAll);
}
public boolean reindex() throws Exception {
return run(index::switchIndex).isPresent();
}
public Object repartitionEndpoint(Request request, Response response) throws Exception { public Object repartitionEndpoint(Request request, Response response) throws Exception {
if (!run(searchSetService::recalculateAll)) { if (!run(searchSetService::recalculateAll)) {

View File

@ -31,6 +31,7 @@ dependencies {
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client') implementation project(':code:common:service-client')
implementation project(':code:api:search-api') implementation project(':code:api:search-api')
implementation project(':code:api:index-api')
implementation libs.lombok implementation libs.lombok

View File

@ -3,6 +3,7 @@ package nu.marginalia.control;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors; import nu.marginalia.client.ServiceMonitors;
import nu.marginalia.control.process.ControlProcesses;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.renderer.MustacheRenderer; import nu.marginalia.renderer.MustacheRenderer;
@ -33,7 +34,8 @@ public class ControlService extends Service {
HeartbeatService heartbeatService, HeartbeatService heartbeatService,
EventLogService eventLogService, EventLogService eventLogService,
RendererFactory rendererFactory, RendererFactory rendererFactory,
MqPersistence messageQueuePersistence MqPersistence messageQueuePersistence,
ControlProcesses controlProcesses
) throws IOException { ) throws IOException {
super(params); super(params);
@ -52,6 +54,10 @@ public class ControlService extends Service {
Map.of("heartbeats", heartbeatService.getHeartbeats(), Map.of("heartbeats", heartbeatService.getHeartbeats(),
"events", eventLogService.getLastEntries(100) "events", eventLogService.getLastEntries(100)
))); )));
Spark.get("/public/repartition", (req, rsp) -> {
controlProcesses.start("REPARTITION-REINDEX");
return "OK";
});
monitors.subscribe(this::logMonitorStateChange); monitors.subscribe(this::logMonitorStateChange);

View File

@ -0,0 +1,38 @@
package nu.marginalia.control.process;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateMachine;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Singleton
public class ControlProcesses {
private final MqPersistence persistence;
public Map<String, StateMachine> stateMachines = new HashMap<>();
@Inject
public ControlProcesses(MqPersistence persistence,
RepartitionReindexProcess repartitionReindexProcess
) {
this.persistence = persistence;
register("REPARTITION-REINDEX", repartitionReindexProcess);
}
private void register(String name, AbstractStateGraph graph) {
stateMachines.put(name, new StateMachine(persistence, name, UUID.randomUUID(), graph));
}
public void start(String name) throws Exception {
stateMachines.get(name).init();
}
public void resume(String name) throws Exception {
stateMachines.get(name).resume();
}
}

View File

@ -0,0 +1,72 @@
package nu.marginalia.control.process;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
@Singleton
public class RepartitionReindexProcess extends AbstractStateGraph {
private final MqOutbox indexOutbox;
// STATES
private static final String INITIAL = "INITIAL";
private static final String REPARTITION = "REPARTITION";
private static final String REPARTITION_REPLY = "REPARTITION-REPLY";
private static final String REINDEX = "REINDEX";
private static final String REINDEX_REPLY = "REINDEX-REPLY";
private static final String END = "END";
@Inject
public RepartitionReindexProcess(StateFactory stateFactory, IndexClient indexClient) {
super(stateFactory);
indexOutbox = indexClient.outbox();
}
@GraphState(name = INITIAL, next = REPARTITION)
public void init() throws Exception {
var rsp = indexOutbox.send(IndexMqEndpoints.INDEX_IS_BLOCKED, "");
if (rsp.payload().equalsIgnoreCase("true")) {
error("Index is blocked");
}
}
@GraphState(name = REPARTITION, next = REPARTITION_REPLY)
public Long repartition() throws Exception {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, "");
}
@GraphState(name = REPARTITION_REPLY, next = REINDEX)
public void repartitionReply(Long id) throws Exception {
var rsp = indexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
@GraphState(name = REINDEX, next = REINDEX_REPLY)
public Long reindex() throws Exception {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, "");
}
@GraphState(name = REINDEX_REPLY, next = END)
public void reindexReply(Long id) throws Exception {
var rsp = indexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
}