From 1dafa0c74d42195c00f9aa6daaee9e263938f49c Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 27 Nov 2023 16:01:12 +0100 Subject: [PATCH] (mqapi/control) Repair repartition endpoint, deprecate notify endpoints. The repartition endpoint was mis-addressing its mqapi notifications, omitting the proper nodeId. In fixing this, it became apparent that having both @MqRequest and @MqNotification is a serious footgun, and the two should be unified into a single API where the caller isn't burdened with knowledge of the remote end's implementation specifics. --- .../marginalia/index/client/IndexClient.java | 10 ++++- .../nu/marginalia/service/id/ServiceId.java | 4 ++ .../service/server/mq/MqNotification.java | 9 ---- .../server/mq/ServiceMqSubscription.java | 37 +++++++---------- .../marginalia/actor/ActorStateMachine.java | 6 +-- .../nu/marginalia/mq/MessageQueueFactory.java | 9 +++- .../mq/inbox/MqAsynchronousInbox.java | 38 ++++------------- .../marginalia/mq/inbox/MqInboxShredder.java | 4 -- .../marginalia/mq/inbox/MqSubscription.java | 2 - .../mq/inbox/MqSynchronousInbox.java | 41 ++++--------------- .../nu/marginalia/mq/outbox/MqOutboxTest.java | 4 -- .../links/DomainLinksLoaderServiceTest.java | 14 ++++--- .../java/nu/marginalia/api/ApiService.java | 4 +- .../node/svc/ControlNodeActionsService.java | 12 +----- .../nu/marginalia/executor/ExecutorSvc.java | 3 +- .../nu/marginalia/index/IndexService.java | 5 +-- 16 files changed, 68 insertions(+), 134 deletions(-) delete mode 100644 code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java index 9f232cce..82624c0f 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java @@ -17,7 +17,6 @@ import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.id.ServiceId; -import java.util.ArrayList; import java.util.List; import javax.annotation.CheckReturnValue; @@ -27,6 +26,7 @@ import java.util.UUID; public class IndexClient extends AbstractDynamicClient { private static final Summary wmsa_search_index_api_time = Summary.build().name("wmsa_search_index_api_time").help("-").register(); + private final MessageQueueFactory messageQueueFactory; MqOutbox outbox; @@ -36,6 +36,7 @@ public class IndexClient extends AbstractDynamicClient { @Named("wmsa-system-node") Integer nodeId) { super(descriptors.forId(ServiceId.Index), GsonFactory::get); + this.messageQueueFactory = messageQueueFactory; String inboxName = ServiceId.Index.name; String outboxName = System.getProperty("service-name:"+nodeId, UUID.randomUUID().toString()); @@ -76,4 +77,11 @@ public class IndexClient extends AbstractDynamicClient { return super.get(ctx, node, "/is-blocked", Boolean.class); } + public void triggerRepartition(int node) throws Exception { + messageQueueFactory.sendSingleShotRequest( + ServiceId.Index.withNode(node), + IndexMqEndpoints.INDEX_REPARTITION, + null + ); + } } diff --git a/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java b/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java index 5d1bfd0c..94aeb10b 100644 --- a/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java +++ b/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java @@ -19,6 +19,10 @@ public enum ServiceId { this.name = name; } + public String withNode(int node) { + return name + ":" + node; + } + public static ServiceId byName(String name) { for (ServiceId id : values()) { if (id.name.equals(name)) { diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java b/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java deleted file mode 100644 index 20586f3e..00000000 --- a/code/common/service/src/main/java/nu/marginalia/service/server/mq/MqNotification.java +++ /dev/null @@ -1,9 +0,0 @@ -package nu.marginalia.service.server.mq; - -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; - -@Retention(RetentionPolicy.RUNTIME) -public @interface MqNotification { - String endpoint(); -} diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java b/code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java index 61a024f5..868a545e 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java +++ b/code/common/service/src/main/java/nu/marginalia/service/server/mq/ServiceMqSubscription.java @@ -15,7 +15,6 @@ import java.util.Map; public class ServiceMqSubscription implements MqSubscription { private static final Logger logger = LoggerFactory.getLogger(ServiceMqSubscription.class); private final Map requests = new HashMap<>(); - private final Map notifications = new HashMap<>(); private final Service service; @@ -31,13 +30,6 @@ public class ServiceMqSubscription implements MqSubscription { requests.put(annotation.endpoint(), method); } } - - for (var method : service.getClass().getMethods()) { - var annotation = method.getAnnotation(MqNotification.class); - if (annotation != null) { - notifications.put(annotation.endpoint(), method); - } - } } @Override @@ -45,9 +37,6 @@ public class ServiceMqSubscription implements MqSubscription { if (requests.containsKey(rawMessage.function())) { return true; } - if (notifications.containsKey(rawMessage.function())) { - return true; - } logger.warn("Received message for unknown function " + rawMessage.function()); @@ -58,8 +47,21 @@ public class ServiceMqSubscription implements MqSubscription { public MqInboxResponse onRequest(MqMessage msg) { var method = requests.get(msg.function()); + if (null == method) { + logger.error("Received message for unregistered function handler " + msg.function()); + return MqInboxResponse.err("[No handler]"); + } + try { - return MqInboxResponse.ok(method.invoke(service, msg.payload()).toString()); + if (method.getReturnType() == void.class) { + method.invoke(service, msg.payload()); + return MqInboxResponse.ok(); + } + else { + // Do we want to just toString() here? Gson? Something else? + String rv = method.invoke(service, msg.payload()).toString(); + return MqInboxResponse.ok(rv); + } } catch (InvocationTargetException ex) { logger.error("Error invoking method " + method, ex); @@ -71,15 +73,4 @@ public class ServiceMqSubscription implements MqSubscription { } } - @Override - public void onNotification(MqMessage msg) { - var method = notifications.get(msg.function()); - - try { - method.invoke(service, msg.payload()); - } - catch (Exception ex) { - logger.error("Error invoking method " + method, ex); - } - } } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java index c33b2ef8..ce39e73a 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java @@ -344,11 +344,6 @@ public class ActorStateMachine { @Override public MqInboxResponse onRequest(MqMessage msg) { - return null; - } - - @Override - public void onNotification(MqMessage msg) { onStateTransition(msg); try { stateChangeListeners.forEach(l -> l.accept(msg.function(), msg.payload())); @@ -357,6 +352,7 @@ public class ActorStateMachine { // Rethrowing this will flag the message as an error in the message queue throw new RuntimeException("Error in state change listener", ex); } + return MqInboxResponse.ok(); } } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java index 672556ea..c0f2ed0e 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/MessageQueueFactory.java @@ -3,12 +3,12 @@ package nu.marginalia.mq; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.mq.inbox.MqAsynchronousInbox; -import nu.marginalia.mq.inbox.MqInboxIf; import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.mq.inbox.MqSynchronousInbox; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.persistence.MqPersistence; +import javax.annotation.Nullable; import java.util.UUID; @Singleton @@ -41,4 +41,11 @@ public class MessageQueueFactory { { return new MqOutbox(persistence, inboxName, inboxNode, outboxName, outboxNode, instanceUUID); } + + /** Send a request to the specified inbox with a dummy reply inbox, + * do not wait for a response. + */ + public void sendSingleShotRequest(String inboxName, String function, @Nullable String payload) throws Exception { + persistence.sendNewMessage(inboxName, null, null, function, payload, null); + } } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java index 1eb45e6e..7411b6e2 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java @@ -131,36 +131,25 @@ public class MqAsynchronousInbox implements MqInboxIf { } private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage msg) { - - if (msg.expectsResponse()) { - threadPool.execute(() -> respondToMessage(subscriber, msg)); - } - else { - threadPool.execute(() -> acknowledgeNotification(subscriber, msg)); - } + threadPool.execute(() -> respondToMessage(subscriber, msg)); } private void respondToMessage(MqSubscription subscriber, MqMessage msg) { try { final var rsp = subscriber.onRequest(msg); - sendResponse(msg, rsp.state(), rsp.message()); + if (msg.expectsResponse()) { + sendResponse(msg, rsp.state(), rsp.message()); + } + else { + registerResponse(msg, rsp.state()); + } } catch (Exception ex) { logger.error("Message Queue subscriber threw exception", ex); - sendResponse(msg, MqMessageState.ERR); + registerResponse(msg, MqMessageState.ERR); } } - private void acknowledgeNotification(MqSubscription subscriber, MqMessage msg) { - try { - subscriber.onNotification(msg); - updateMessageState(msg, MqMessageState.OK); - } catch (Exception ex) { - logger.error("Message Queue subscriber threw exception", ex); - updateMessageState(msg, MqMessageState.ERR); - } - } - - private void sendResponse(MqMessage msg, MqMessageState state) { + private void registerResponse(MqMessage msg, MqMessageState state) { try { persistence.updateMessageState(msg.msgId(), state); } @@ -169,15 +158,6 @@ public class MqAsynchronousInbox implements MqInboxIf { } } - private void updateMessageState(MqMessage msg, MqMessageState state) { - try { - persistence.updateMessageState(msg.msgId(), state); - } - catch (SQLException ex2) { - logger.error("Failed to update message state", ex2); - } - } - private void sendResponse(MqMessage msg, MqMessageState mqMessageState, String response) { try { persistence.sendResponse(msg.msgId(), mqMessageState, response); diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java index 18c346f2..9be3fe4a 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInboxShredder.java @@ -22,8 +22,4 @@ class MqInboxShredder implements MqSubscription { return MqInboxResponse.err(); } - @Override - public void onNotification(MqMessage msg) { - logger.warn("Unhandled message {}", msg.msgId()); - } } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSubscription.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSubscription.java index 417b7b35..688737cd 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSubscription.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSubscription.java @@ -9,6 +9,4 @@ public interface MqSubscription { /** Handle the message and return a response. */ MqInboxResponse onRequest(MqMessage msg); - /** Handle a message with no reply address */ - void onNotification(MqMessage msg); } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java index f698b882..9fa0ef4d 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java @@ -80,37 +80,21 @@ public class MqSynchronousInbox implements MqInboxIf { } private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage msg) { - - if (msg.expectsResponse()) { - respondToMessage(subscriber, msg); - } - else { - acknowledgeNotification(subscriber, msg); - } - } - - private void respondToMessage(MqSubscription subscriber, MqMessage msg) { try { final var rsp = subscriber.onRequest(msg); - sendResponse(msg, rsp.state(), rsp.message()); + if (msg.expectsResponse()) { + sendResponse(msg, rsp.state(), rsp.message()); + } + else { + registerResponse(msg, rsp.state()); + } } catch (Exception ex) { logger.error("Message Queue subscriber threw exception", ex); - sendResponse(msg, MqMessageState.ERR); + registerResponse(msg, MqMessageState.ERR); } } - private void acknowledgeNotification(MqSubscription subscriber, MqMessage msg) { - try { - subscriber.onNotification(msg); - updateMessageState(msg, MqMessageState.OK); - } - catch (Exception ex) { - logger.error("Message Queue subscriber threw exception", ex); - updateMessageState(msg, MqMessageState.ERR); - } - } - - private void sendResponse(MqMessage msg, MqMessageState state) { + private void registerResponse(MqMessage msg, MqMessageState state) { try { persistence.updateMessageState(msg.msgId(), state); } @@ -119,15 +103,6 @@ public class MqSynchronousInbox implements MqInboxIf { } } - private void updateMessageState(MqMessage msg, MqMessageState state) { - try { - persistence.updateMessageState(msg.msgId(), state); - } - catch (SQLException ex2) { - logger.error("Failed to update message state", ex2); - } - } - private void sendResponse(MqMessage msg, MqMessageState mqMessageState, String response) { try { persistence.sendResponse(msg.msgId(), mqMessageState, response); diff --git a/code/libraries/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java b/code/libraries/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java index 52f44659..537a0368 100644 --- a/code/libraries/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java +++ b/code/libraries/message-queue/src/test/java/nu/marginalia/mq/outbox/MqOutboxTest.java @@ -283,8 +283,6 @@ public class MqOutboxTest { return MqInboxResponse.ok(response); } - @Override - public void onNotification(MqMessage msg) { } }; } @@ -300,8 +298,6 @@ public class MqOutboxTest { return MqInboxResponse.ok(msg.payload()); } - @Override - public void onNotification(MqMessage msg) {} }; } diff --git a/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java b/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java index 3b27f2fb..9852b630 100644 --- a/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java +++ b/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java @@ -14,10 +14,7 @@ import nu.marginalia.model.processed.DomainLinkRecord; import nu.marginalia.model.processed.DomainRecord; import nu.marginalia.process.control.ProcessAdHocTaskHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.mockito.Mockito; import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.junit.jupiter.Container; @@ -33,6 +30,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Tag("slow") @Testcontainers +@Disabled // Error in the SQL loading mechanism, we don't deal with DELIMITER correctly + // which means we can't get around flyway's bugs necessitating DELIMITER. class DomainLinksLoaderServiceTest { List toDelete = new ArrayList<>(); ProcessHeartbeat heartbeat; @@ -57,7 +56,10 @@ class DomainLinksLoaderServiceTest { dataSource = new HikariDataSource(config); - List migrations = List.of("db/migration/V23_11_0_007__domain_node_affinity.sql"); + List migrations = List.of( + "db/migration/V23_11_0_007__domain_node_affinity.sql", + "db/migration/V23_11_0_008__purge_procedure.sql" + ); for (String migration : migrations) { try (var resource = Objects.requireNonNull(ClassLoader.getSystemResourceAsStream(migration), "Could not load migration script " + migration); @@ -136,7 +138,7 @@ class DomainLinksLoaderServiceTest { var input = new LoaderInputData(workDir, 2); var domainRegistry = domainService.getOrCreateDomainIds(input); - var dls = new DomainLinksLoaderService(dataSource); + var dls = new DomainLinksLoaderService(dataSource, new ProcessConfiguration("test", 1, UUID.randomUUID())); dls.loadLinks(domainRegistry, heartbeat, input); Map> expected = new HashMap<>(); diff --git a/code/services-application/api-service/src/main/java/nu/marginalia/api/ApiService.java b/code/services-application/api-service/src/main/java/nu/marginalia/api/ApiService.java index 6d0225e9..8c1d303d 100644 --- a/code/services-application/api-service/src/main/java/nu/marginalia/api/ApiService.java +++ b/code/services-application/api-service/src/main/java/nu/marginalia/api/ApiService.java @@ -11,7 +11,7 @@ import nu.marginalia.client.Context; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.query.client.QueryClient; import nu.marginalia.service.server.*; -import nu.marginalia.service.server.mq.MqNotification; +import nu.marginalia.service.server.mq.MqRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -62,7 +62,7 @@ public class ApiService extends Service { Spark.get("/public/api/:key/search/*", this::search, gson::toJson); } - @MqNotification(endpoint = "FLUSH_CACHES") + @MqRequest(endpoint = "FLUSH_CACHES") public void flushCaches(String unusedArg) { logger.info("Flushing caches"); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java index bf59fa41..9cd11fb4 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java @@ -4,24 +4,15 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.client.Context; import nu.marginalia.control.RedirectControl; -import nu.marginalia.control.Redirects; -import nu.marginalia.db.DomainTypes; import nu.marginalia.executor.client.ExecutorClient; import nu.marginalia.index.client.IndexClient; -import nu.marginalia.index.client.IndexMqEndpoints; -import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.outbox.MqOutbox; -import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.service.control.ServiceEventLog; -import nu.marginalia.service.id.ServiceId; -import nu.marginalia.service.module.ServiceConfiguration; import spark.Request; import spark.Response; import spark.Spark; import java.nio.file.Files; import java.nio.file.Path; -import java.util.UUID; @Singleton public class ControlNodeActionsService { @@ -112,7 +103,8 @@ public class ControlNodeActionsService { } public Object triggerRepartition(Request request, Response response) throws Exception { - indexClient.outbox().sendAsync(IndexMqEndpoints.INDEX_REPARTITION, ""); + indexClient.triggerRepartition(Integer.parseInt(request.params("node"))); + return ""; } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java index 7fe08e5d..68921066 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java @@ -11,7 +11,6 @@ import nu.marginalia.executor.model.ActorRunStates; import nu.marginalia.executor.svc.*; import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.Service; -import nu.marginalia.service.server.mq.MqNotification; import nu.marginalia.service.server.mq.MqRequest; import nu.marginalia.storage.FileStorageService; import org.slf4j.Logger; @@ -82,7 +81,7 @@ public class ExecutorSvc extends Service { Spark.post("/transfer/yield", transferService::yieldDomain); } - @MqNotification(endpoint="FIRST-BOOT") + @MqRequest(endpoint="FIRST-BOOT") public void setUpDefaultActors(String message) throws Exception { logger.info("Initializing default actors"); actorControlService.start(ExecutorActor.MONITOR_PROCESS_LIVENESS); diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java index ddfd43c9..c1027ad9 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java @@ -15,7 +15,6 @@ import nu.marginalia.linkdb.LinkdbReader; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.*; -import nu.marginalia.service.server.mq.MqNotification; import nu.marginalia.service.server.mq.MqRequest; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -98,7 +97,7 @@ public class IndexService extends Service { } @SneakyThrows - @MqNotification(endpoint = IndexMqEndpoints.SWITCH_LINKDB) + @MqRequest(endpoint = IndexMqEndpoints.SWITCH_LINKDB) public void switchLinkdb(String unusedArg) { logger.info("Switching link database"); @@ -112,7 +111,7 @@ public class IndexService extends Service { } } - @MqNotification(endpoint = IndexMqEndpoints.SWITCH_INDEX) + @MqRequest(endpoint = IndexMqEndpoints.SWITCH_INDEX) public String switchIndex(String message) throws Exception { if (!opsService.switchIndex()) { throw new IllegalStateException("Ops lock busy");