diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java index c0662937..c8e2acb9 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java @@ -42,18 +42,11 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { public static final String RECONVERT = "RECONVERT"; public static final String RECONVERT_WAIT = "RECONVERT-WAIT"; public static final String LOAD = "LOAD"; - public static final String LOAD_WAIT = "LOAD-WAIT"; - public static final String SWAP_LEXICON = "SWAP-LEXICON"; - public static final String REPARTITION = "REPARTITION"; - public static final String REPARTITION_WAIT = "REPARTITION-WAIT"; public static final String REINDEX_FWD = "REINDEX_FWD"; - public static final String REINDEX_FWD_WAIT = "REINDEX-FWD-WAIT"; public static final String REINDEX_FULL = "REINDEX_FULL"; - public static final String REINDEX_FULL_WAIT = "REINDEX-FULL-WAIT"; public static final String REINDEX_PRIO = "REINDEX_PRIO"; - public static final String REINDEX_PRIO_WAIT = "REINDEX-PRIO-WAIT"; - public static final String SWITCH_OVER = "SWITCH-LINKDB"; + public static final String SWITCH_OVER = "SWITCH-OVER"; public static final String END = "END"; private final ActorProcessWatcher processWatcher; @@ -170,81 +163,39 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { @ActorState( name = LOAD, - next = LOAD_WAIT, - resume = ActorResumeBehavior.ERROR, - description = """ - Send a load request to the loader and transition to LOAD_WAIT. - """) - public Message load(Message message) throws Exception { - - var request = new LoadRequest(message.processedStorageId); - long id = mqLoaderOutbox.sendAsync(LoadRequest.class.getSimpleName(), gson.toJson(request)); - - return message.withLoaderMsgId(id); - - } - - @ActorState( - name = LOAD_WAIT, - next = SWAP_LEXICON, + next = REPARTITION, resume = ActorResumeBehavior.RETRY, description = """ - Wait for the loader to finish loading the data. - """ - ) - public void loadWait(Message message) throws Exception { + Instruct the loader to process the data + """) + public void load(Message message) throws Exception { + if (message.loaderMsgId <= 0) { + var request = new LoadRequest(message.processedStorageId); + long id = mqLoaderOutbox.sendAsync(LoadRequest.class.getSimpleName(), gson.toJson(request)); + + transition(LOAD, message.withLoaderMsgId(id)); + } var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, message.loaderMsgId); if (rsp.state() != MqMessageState.OK) error("Loader failed"); + } - - - @ActorState( - name = SWAP_LEXICON, - next = REPARTITION, - resume = ActorResumeBehavior.RETRY, - description = """ - Move the lexicon from the LEXICON_STAGING area to the LEXICON_LIVE area, - so that the index service can load it after repartitioning. - """ - ) - public void swapLexicon(Message message) throws Exception { - var live = storageService.getStorageByType(FileStorageType.LEXICON_LIVE); - - var staging = storageService.getStorageByType(FileStorageType.LEXICON_STAGING); - var fromSource = staging.asPath().resolve("dictionary.dat"); - var liveDest = live.asPath().resolve("dictionary.dat"); - - // Swap in new lexicon - logger.info("Moving " + fromSource + " to " + liveDest); - Files.move(fromSource, liveDest, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE); - } - - @ActorState( name = REPARTITION, - next = REPARTITION_WAIT, - description = """ - Instruct the index-service to repartition the index then transition to REPARTITION_WAIT. - """ - ) - public Long repartition() throws Exception { - return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, ""); - } - - @ActorState( - name = REPARTITION_WAIT, next = REINDEX_FWD, resume = ActorResumeBehavior.RETRY, description = """ - Wait for the index-service to finish repartitioning the index. + Instruct the index-service to repartition. """ ) - public void repartitionReply(Long id) throws Exception { - var rsp = indexOutbox.waitResponse(id); + public void repartition(Long id) throws Exception { + if (id == null) { + transition(REPARTITION, indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, "")); + } + var rsp = indexOutbox.waitResponse(id); if (rsp.state() != MqMessageState.OK) { error("Repartition failed"); } @@ -252,25 +203,18 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { @ActorState( name = REINDEX_FWD, - next = REINDEX_FWD_WAIT, + next = REINDEX_FULL, + resume = ActorResumeBehavior.RETRY, description = """ Reconstruct the fwd index """ ) - public Long reindexFwd() throws Exception { - var request = new CreateIndexRequest(IndexName.FORWARD); - return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)); - } + public void reindexFwd(Long id) throws Exception { + if (id == null) { + var request = new CreateIndexRequest(IndexName.FORWARD); + transition(REINDEX_FWD, mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request))); + } - @ActorState( - name = REINDEX_FWD_WAIT, - next = REINDEX_FULL, - resume = ActorResumeBehavior.RETRY, - description = """ - Wait for the reindex job to finish. - """ - ) - public void reindexFwdWait(Long id) throws Exception { var rsp = mqIndexConstructorOutbox.waitResponse(id); if (rsp.state() != MqMessageState.OK) { @@ -280,25 +224,18 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { @ActorState( name = REINDEX_FULL, - next = REINDEX_FULL_WAIT, - description = """ - Reconstruct the full index - """ - ) - public Long reindexFull() throws Exception { - var request = new CreateIndexRequest(IndexName.REVERSE_FULL); - return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)); - } - - @ActorState( - name = REINDEX_FULL_WAIT, next = REINDEX_PRIO, resume = ActorResumeBehavior.RETRY, description = """ - Wait for the reindex job to finish. + Reconstruct the reverse full index """ ) - public void reindexFullWait(Long id) throws Exception { + public void reindexFull(Long id) throws Exception { + if (id == null) { + var request = new CreateIndexRequest(IndexName.REVERSE_FULL); + transition(REINDEX_FULL, mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request))); + } + var rsp = mqIndexConstructorOutbox.waitResponse(id); if (rsp.state() != MqMessageState.OK) { @@ -308,25 +245,18 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { @ActorState( name = REINDEX_PRIO, - next = REINDEX_PRIO_WAIT, - resume = ActorResumeBehavior.RETRY, - description = """ - Reconstruct the prio index - """ - ) - public long reindexPrio() throws Exception { - var request = new CreateIndexRequest(IndexName.REVERSE_PRIO); - return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)); - } - @ActorState( - name = REINDEX_PRIO_WAIT, next = SWITCH_OVER, resume = ActorResumeBehavior.RETRY, description = """ - Wait for the reindex job to finish. + Reconstruct the reverse prio index """ ) - public void reindexPrioWait(Long id) throws Exception { + public void reindexPrio(Long id) throws Exception { + if (id == null) { + var request = new CreateIndexRequest(IndexName.REVERSE_PRIO); + transition(REINDEX_PRIO, mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request))); + } + var rsp = mqIndexConstructorOutbox.waitResponse(id); if (rsp.state() != MqMessageState.OK) { @@ -334,17 +264,26 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { } } - @ActorState( name = SWITCH_OVER, next = END, resume = ActorResumeBehavior.RETRY, description = """ - Instruct the search service to switch to the new linkdb, - and the index service to switch over to the new index. + Move the new lexicon into place, instruct the search service to + switch to the new linkdb, and the index service to switch over to the new index. """ ) public void switchOver(Long id) throws Exception { + var live = storageService.getStorageByType(FileStorageType.LEXICON_LIVE); + var staging = storageService.getStorageByType(FileStorageType.LEXICON_STAGING); + var fromSource = staging.asPath().resolve("dictionary.dat"); + var liveDest = live.asPath().resolve("dictionary.dat"); + + // Swap in new lexicon + logger.info("Moving " + fromSource + " to " + liveDest); + Files.move(fromSource, liveDest, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE); + + // Notify services to switch over searchOutbox.sendNotice(SearchMqEndpoints.SWITCH_LINKDB, ":-)"); indexOutbox.sendNotice(IndexMqEndpoints.INDEX_REINDEX, ":^D"); }