(control) Fix spurious state detection in control-side actors

A race condition was found where precession actors would sometimes skip a step, because when invoking ExecutorRemoteActor.getState(), it would get the last 'OK' actor state from a previous run of the actor!

To avoid this, the trigger method was changed from returning a boolean to the message ID, negative if an error occurred, to be passed to getState to select only messages that pertain to the present or future runs.
This commit is contained in:
Viktor Lofgren 2023-12-09 12:50:05 +01:00
parent d0982e7ba5
commit eccb12b366
4 changed files with 44 additions and 24 deletions

View File

@ -34,8 +34,23 @@ public class ExecutorRemoteActorFactory {
}
public interface ExecutorRemoteActorIf<T> {
boolean trigger(T object) throws Exception;
String getState();
/** Trigger the remote actor with the given object. The object will be serialized to JSON and sent to the
* remote actor. If the remote actor does not respond after a time period, a timeout will occur and a negative
* message id will be returned.
*
* @param object The message to send to the remote actot
* @return The message id of the response message, or a negative number if the remote actor did not respond
* within a reasonable timeout seconds.
*/
long trigger(T object) throws Exception;
/** Get the last finished state of the actor.
* <p>
* The message id of the request initiating the actor must be provided to ensure that
* we don't get a state from a previous run.
*/
String getState(long fromMsgId);
}
public record CrawlData(FileStorageId storageId, boolean cascadeLoad) {}
@ -58,11 +73,11 @@ class ExecutorRemoteActor<T> implements ExecutorRemoteActorFactory.ExecutorRemot
this.triggerFunction = triggerFunction;
}
public boolean trigger(T object) throws Exception {
public long trigger(T object) throws Exception {
return trigger(gson.toJson(object));
}
public boolean trigger(String payload) throws Exception {
public long trigger(String payload) throws Exception {
long id = persistence.sendNewMessage(inboxName, null, null, triggerFunction, payload, null);
// Wait for the remote actor to respond to the message
@ -70,19 +85,19 @@ class ExecutorRemoteActor<T> implements ExecutorRemoteActorFactory.ExecutorRemot
for (int i = 0; i < 120; i++) {
var msg = persistence.getMessage(id);
if (msg.state() == MqMessageState.ACK || msg.state() == MqMessageState.OK)
return true;
return id;
if (msg.state() == MqMessageState.ERR || msg.state() == MqMessageState.DEAD)
return false;
return -id;
TimeUnit.SECONDS.sleep(1);
}
return false; // Timeout
return -1; // Timeout
}
public String getState() {
public String getState(long fromMsgId) {
return persistence
.getHeadMessage(inboxName)
.getHeadMessage(inboxName, fromMsgId)
.map(MqMessage::function)
.orElse("INITIAL");
}

View File

@ -496,17 +496,21 @@ public class MqPersistence {
return gson;
}
/** Returns the last message sent to this inbox with a state of 'OK' */
public Optional<MqMessage> getHeadMessage(String inboxName) {
/** Returns the last message sent to this inbox with a state of 'OK'
* with an id greater than or equal to fromMsgId
*/
public Optional<MqMessage> getHeadMessage(String inboxName, long fromMsgId) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX
FROM MESSAGE_QUEUE
WHERE RECIPIENT_INBOX = ? AND STATE='OK'
WHERE RECIPIENT_INBOX = ? AND STATE='OK' AND ID >= ?
ORDER BY ID DESC LIMIT 1
"""))
{
query.setString(1, inboxName);
query.setLong(2, fromMsgId);
var rs = query.executeQuery();
if (rs.next()) {
long msgId = rs.getLong(1);

View File

@ -25,7 +25,7 @@ public class RecrawlAllActor extends RecordActorPrototype {
public record Initial() implements ActorStep {}
public record WaitFinished(int node) implements ActorStep {}
public record WaitFinished(int node, long msgId) implements ActorStep {}
@Resume(behavior=ActorResumeBehavior.RETRY)
public record Trigger(int node) implements ActorStep {}
public record AdvanceNode(int node) implements ActorStep {}
@ -49,17 +49,18 @@ public class RecrawlAllActor extends RecordActorPrototype {
var data = new ExecutorRemoteActorFactory.CrawlData(activeFileStorage.get(0), true);
if (remoteActorFactory.createCrawlRemote(node).trigger(data)) {
yield new WaitFinished(node);
long msgId = remoteActorFactory.createCrawlRemote(node).trigger(data);
if (msgId >= 0) {
yield new WaitFinished(node, msgId);
}
else {
yield new AdvanceNode(node);
}
}
case WaitFinished(int node) -> {
case WaitFinished(int node, long msgId) -> {
var remoteActor = remoteActorFactory.createCrawlRemote(node);
for (;;) {
var state = remoteActor.getState();
var state = remoteActor.getState(msgId);
if ("END".equals(state) || "ERROR".equals(state)) {
break;
}
@ -80,8 +81,7 @@ public class RecrawlAllActor extends RecordActorPrototype {
public RecrawlAllActor(Gson gson,
ExecutorRemoteActorFactory remoteActorFactory,
FileStorageService fileStorageService,
PrecessionNodes precessionNodes,
NodeConfigurationService nodeConfigurationService)
PrecessionNodes precessionNodes)
{
super(gson);
this.remoteActorFactory = remoteActorFactory;

View File

@ -24,7 +24,7 @@ public class ReprocessAllActor extends RecordActorPrototype {
public record Initial() implements ActorStep {}
public record WaitFinished(int node) implements ActorStep {}
public record WaitFinished(int node, long msgId) implements ActorStep {}
@Resume(behavior=ActorResumeBehavior.RETRY)
public record Trigger(int node) implements ActorStep {}
public record AdvanceNode(int node) implements ActorStep {}
@ -47,17 +47,18 @@ public class ReprocessAllActor extends RecordActorPrototype {
var data = new ExecutorRemoteActorFactory.ConvertAndLoadData(activeFileStorage.get(0));
if (remoteActorFactory.createConvertAndLoadRemote(node).trigger(data)) {
yield new WaitFinished(node);
long msgId = remoteActorFactory.createConvertAndLoadRemote(node).trigger(data);
if (msgId >= 0) {
yield new WaitFinished(node, msgId);
}
else {
yield new AdvanceNode(node);
}
}
case WaitFinished(int node) -> {
case WaitFinished(int node, long msgId) -> {
var remoteActor = remoteActorFactory.createConvertAndLoadRemote(node);
for (;;) {
var state = remoteActor.getState();
var state = remoteActor.getState(msgId);
if ("END".equals(state) || "ERROR".equals(state))
break;
TimeUnit.SECONDS.sleep(10);