(control) Actor terminations work better

Improves jank in the abort actor action, which would sometimes cause actors to hang or restart.
This commit is contained in:
Viktor Lofgren 2024-01-10 14:18:49 +01:00
parent d56b394bcc
commit f310ad8d98
5 changed files with 36 additions and 11 deletions

View File

@ -6,6 +6,12 @@ public record ActorRunState(String name,
String stateDescription,
boolean terminal,
boolean canStart) {
public boolean isDaemon() {
return name.startsWith("PROC_")
|| name.startsWith("MONITOR_");
}
public String stateIcon() {
if (terminal) {
return "\uD83D\uDE34";

View File

@ -269,7 +269,6 @@ public class ActorStateMachine {
}
if (!state.isFinal()) {
logger.info("Transitioning from state {}", state.name());
var transition = state.next(msg.payload());
if (!expectedMessage.isExpected(msg)) {
@ -321,7 +320,8 @@ public class ActorStateMachine {
expectedMessage = ExpectedMessage.expectId(abortMsgId);
// Add a state transition to the final state
// Add a state transition to the monitor state, causing it to reset the state machine to the initial state
// (or if no monitor state is defined, set it to the final state)
smOutbox.sendNotice(abortMsgId, finalState.name(), "");
// Dislodge the current task with an interrupt.

View File

@ -17,7 +17,8 @@
onsubmit="return toggleActorSwitch('{{name}}')">
<input
type="submit"
value="On"
{{#if daemon}}value="Disable"{{/if}}
{{#unless daemon}}value="Terminate"{{/unless}}
class="toggle-switch-on"
id="toggle-{{name}}-button"
title="Terminate the actor"
@ -33,11 +34,12 @@
type="submit"
{{#unless canStart}}
disabled
value="Stop"
value="Enable"
title="This actor cannot be started here"
{{/unless}}
{{#if canStart}}
value="Off"
{{#if daemon}}value="Enable"{{/if}}
{{#unless daemon}}value="Start"{{/unless}}
title="Start the actor"
{{/if}}
class="toggle-switch-off"

View File

@ -3,9 +3,13 @@ package nu.marginalia.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.state.ActorControlFlowException;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.outbox.MqOutbox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
@ -14,10 +18,14 @@ import java.util.concurrent.TimeoutException;
@Singleton
public class ActorProcessWatcher {
private static final Logger logger = LoggerFactory.getLogger(ActorProcessWatcher.class);
private final MqPersistence persistence;
private final ProcessService processService;
@Inject
public ActorProcessWatcher(ProcessService processService) {
public ActorProcessWatcher(MqPersistence persistence,
ProcessService processService) {
this.persistence = persistence;
this.processService = processService;
}
@ -42,7 +50,11 @@ public class ActorProcessWatcher {
for (;;) {
try {
return outbox.waitResponse(msgId, 5, TimeUnit.SECONDS);
// Check for interruption before waiting for response
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
return outbox.waitResponse(msgId, 1, TimeUnit.SECONDS);
}
catch (InterruptedException ex) {
// Here we mark the message as dead, as it's the user that has aborted the process
@ -51,9 +63,14 @@ public class ActorProcessWatcher {
outbox.flagAsDead(msgId);
processService.kill(processId);
throw ex;
logger.info("Process {} killed due to interrupt", processId);
}
catch (TimeoutException ex) {
var state = persistence.getMessage(msgId).state();
if (state == MqMessageState.ERR || state == MqMessageState.DEAD) {
throw new ActorControlFlowException("Process " + processId + " marked message as " + state);
}
// Maybe the process died, wait a moment for it to restart
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
throw new ActorControlFlowException("Process " + processId + " died and did not re-launch");

View File

@ -146,7 +146,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
}
case ReindexFwd(long id) when id < 0 -> new ReindexFwd(createIndex(IndexName.FORWARD));
case ReindexFwd(long id) -> {
var rsp = mqIndexConstructorOutbox.waitResponse(id);
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id);
if (rsp.state() != MqMessageState.OK)
yield new Error("Repartition failed");
@ -155,7 +155,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
}
case ReindexFull(long id) when id < 0 -> new ReindexFull(createIndex(IndexName.REVERSE_FULL));
case ReindexFull(long id) -> {
var rsp = mqIndexConstructorOutbox.waitResponse(id);
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id);
if (rsp.state() != MqMessageState.OK)
yield new Error("Repartition failed");
@ -164,7 +164,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
}
case ReindexPrio(long id) when id < 0 -> new ReindexPrio(createIndex(IndexName.REVERSE_PRIO));
case ReindexPrio(long id) -> {
var rsp = mqIndexConstructorOutbox.waitResponse(id);
var rsp = processWatcher.waitResponse(mqIndexConstructorOutbox, ProcessService.ProcessId.INDEX_CONSTRUCTOR, id);
if (rsp.state() != MqMessageState.OK)
yield new Error("Repartition failed");