(control) Simplify ConvertAndLoadActor

This commit is contained in:
Viktor Lofgren 2023-08-25 13:30:20 +02:00
parent 70a5df96c8
commit 28188a6e59

View File

@ -42,18 +42,11 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
public static final String RECONVERT = "RECONVERT";
public static final String RECONVERT_WAIT = "RECONVERT-WAIT";
public static final String LOAD = "LOAD";
public static final String LOAD_WAIT = "LOAD-WAIT";
public static final String SWAP_LEXICON = "SWAP-LEXICON";
public static final String REPARTITION = "REPARTITION";
public static final String REPARTITION_WAIT = "REPARTITION-WAIT";
public static final String REINDEX_FWD = "REINDEX_FWD";
public static final String REINDEX_FWD_WAIT = "REINDEX-FWD-WAIT";
public static final String REINDEX_FULL = "REINDEX_FULL";
public static final String REINDEX_FULL_WAIT = "REINDEX-FULL-WAIT";
public static final String REINDEX_PRIO = "REINDEX_PRIO";
public static final String REINDEX_PRIO_WAIT = "REINDEX-PRIO-WAIT";
public static final String SWITCH_OVER = "SWITCH-LINKDB";
public static final String SWITCH_OVER = "SWITCH-OVER";
public static final String END = "END";
private final ActorProcessWatcher processWatcher;
@ -170,81 +163,39 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
@ActorState(
name = LOAD,
next = LOAD_WAIT,
resume = ActorResumeBehavior.ERROR,
description = """
Send a load request to the loader and transition to LOAD_WAIT.
""")
public Message load(Message message) throws Exception {
var request = new LoadRequest(message.processedStorageId);
long id = mqLoaderOutbox.sendAsync(LoadRequest.class.getSimpleName(), gson.toJson(request));
return message.withLoaderMsgId(id);
}
@ActorState(
name = LOAD_WAIT,
next = SWAP_LEXICON,
next = REPARTITION,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the loader to finish loading the data.
"""
)
public void loadWait(Message message) throws Exception {
Instruct the loader to process the data
""")
public void load(Message message) throws Exception {
if (message.loaderMsgId <= 0) {
var request = new LoadRequest(message.processedStorageId);
long id = mqLoaderOutbox.sendAsync(LoadRequest.class.getSimpleName(), gson.toJson(request));
transition(LOAD, message.withLoaderMsgId(id));
}
var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, message.loaderMsgId);
if (rsp.state() != MqMessageState.OK)
error("Loader failed");
}
@ActorState(
name = SWAP_LEXICON,
next = REPARTITION,
resume = ActorResumeBehavior.RETRY,
description = """
Move the lexicon from the LEXICON_STAGING area to the LEXICON_LIVE area,
so that the index service can load it after repartitioning.
"""
)
public void swapLexicon(Message message) throws Exception {
var live = storageService.getStorageByType(FileStorageType.LEXICON_LIVE);
var staging = storageService.getStorageByType(FileStorageType.LEXICON_STAGING);
var fromSource = staging.asPath().resolve("dictionary.dat");
var liveDest = live.asPath().resolve("dictionary.dat");
// Swap in new lexicon
logger.info("Moving " + fromSource + " to " + liveDest);
Files.move(fromSource, liveDest, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
}
@ActorState(
name = REPARTITION,
next = REPARTITION_WAIT,
description = """
Instruct the index-service to repartition the index then transition to REPARTITION_WAIT.
"""
)
public Long repartition() throws Exception {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, "");
}
@ActorState(
name = REPARTITION_WAIT,
next = REINDEX_FWD,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the index-service to finish repartitioning the index.
Instruct the index-service to repartition.
"""
)
public void repartitionReply(Long id) throws Exception {
var rsp = indexOutbox.waitResponse(id);
public void repartition(Long id) throws Exception {
if (id == null) {
transition(REPARTITION, indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, ""));
}
var rsp = indexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
@ -252,25 +203,18 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
@ActorState(
name = REINDEX_FWD,
next = REINDEX_FWD_WAIT,
next = REINDEX_FULL,
resume = ActorResumeBehavior.RETRY,
description = """
Reconstruct the fwd index
"""
)
public Long reindexFwd() throws Exception {
var request = new CreateIndexRequest(IndexName.FORWARD);
return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request));
}
public void reindexFwd(Long id) throws Exception {
if (id == null) {
var request = new CreateIndexRequest(IndexName.FORWARD);
transition(REINDEX_FWD, mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)));
}
@ActorState(
name = REINDEX_FWD_WAIT,
next = REINDEX_FULL,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the reindex job to finish.
"""
)
public void reindexFwdWait(Long id) throws Exception {
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
@ -280,25 +224,18 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
@ActorState(
name = REINDEX_FULL,
next = REINDEX_FULL_WAIT,
description = """
Reconstruct the full index
"""
)
public Long reindexFull() throws Exception {
var request = new CreateIndexRequest(IndexName.REVERSE_FULL);
return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request));
}
@ActorState(
name = REINDEX_FULL_WAIT,
next = REINDEX_PRIO,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the reindex job to finish.
Reconstruct the reverse full index
"""
)
public void reindexFullWait(Long id) throws Exception {
public void reindexFull(Long id) throws Exception {
if (id == null) {
var request = new CreateIndexRequest(IndexName.REVERSE_FULL);
transition(REINDEX_FULL, mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)));
}
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
@ -308,25 +245,18 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
@ActorState(
name = REINDEX_PRIO,
next = REINDEX_PRIO_WAIT,
resume = ActorResumeBehavior.RETRY,
description = """
Reconstruct the prio index
"""
)
public long reindexPrio() throws Exception {
var request = new CreateIndexRequest(IndexName.REVERSE_PRIO);
return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request));
}
@ActorState(
name = REINDEX_PRIO_WAIT,
next = SWITCH_OVER,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the reindex job to finish.
Reconstruct the reverse prio index
"""
)
public void reindexPrioWait(Long id) throws Exception {
public void reindexPrio(Long id) throws Exception {
if (id == null) {
var request = new CreateIndexRequest(IndexName.REVERSE_PRIO);
transition(REINDEX_PRIO, mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)));
}
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
@ -334,17 +264,26 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
}
}
@ActorState(
name = SWITCH_OVER,
next = END,
resume = ActorResumeBehavior.RETRY,
description = """
Instruct the search service to switch to the new linkdb,
and the index service to switch over to the new index.
Move the new lexicon into place, instruct the search service to
switch to the new linkdb, and the index service to switch over to the new index.
"""
)
public void switchOver(Long id) throws Exception {
var live = storageService.getStorageByType(FileStorageType.LEXICON_LIVE);
var staging = storageService.getStorageByType(FileStorageType.LEXICON_STAGING);
var fromSource = staging.asPath().resolve("dictionary.dat");
var liveDest = live.asPath().resolve("dictionary.dat");
// Swap in new lexicon
logger.info("Moving " + fromSource + " to " + liveDest);
Files.move(fromSource, liveDest, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
// Notify services to switch over
searchOutbox.sendNotice(SearchMqEndpoints.SWITCH_LINKDB, ":-)");
indexOutbox.sendNotice(IndexMqEndpoints.INDEX_REINDEX, ":^D");
}