From a2bc9a98c06f2a15093550e454aae3820970134d Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Sun, 10 Nov 2024 13:33:57 +0100 Subject: [PATCH] (feed) Use the message queue to permit the feeds service to tell the calling actor when it's finished --- .../marginalia/actor/proc/UpdateRssActor.java | 61 +++++-- code/functions/live-capture/api/build.gradle | 1 + .../nu/marginalia/api/feeds/FeedsClient.java | 21 ++- .../api/src/main/protobuf/feeds.proto | 2 + code/functions/live-capture/build.gradle | 1 + .../rss/svc/FeedFetcherService.java | 3 +- .../marginalia/rss/svc/FeedsGrpcService.java | 22 ++- .../mq/persistence/MqPersistence.java | 20 ++- .../marginalia/mq/task/MqLongRunningTask.java | 162 ++++++++++++++++++ .../nu/marginalia/mq/task/MqTaskFunction.java | 5 + .../nu/marginalia/mq/task/MqTaskResult.java | 14 ++ 11 files changed, 285 insertions(+), 27 deletions(-) create mode 100644 code/libraries/message-queue/java/nu/marginalia/mq/task/MqLongRunningTask.java create mode 100644 code/libraries/message-queue/java/nu/marginalia/mq/task/MqTaskFunction.java create mode 100644 code/libraries/message-queue/java/nu/marginalia/mq/task/MqTaskResult.java diff --git a/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java b/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java index 7b1441f7..459cd75b 100644 --- a/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java +++ b/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java @@ -8,16 +8,21 @@ import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.Resume; import nu.marginalia.api.feeds.FeedsClient; import nu.marginalia.api.feeds.RpcFeedUpdateMode; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.service.module.ServiceConfiguration; import java.time.Duration; import java.time.LocalDateTime; +import java.util.concurrent.TimeUnit; public class UpdateRssActor extends RecordActorPrototype { private final FeedsClient feedsClient; private final int nodeId; + private final MqOutbox updateTaskOutbox; + private final Duration initialDelay = Duration.ofMinutes(5); private final Duration updateInterval = Duration.ofHours(24); private final int cleanInterval = 60; @@ -27,15 +32,24 @@ public class UpdateRssActor extends RecordActorPrototype { super(gson); this.feedsClient = feedsClient; this.nodeId = serviceConfiguration.node(); + this.updateTaskOutbox = feedsClient.createOutbox("update-rss-actor", nodeId); } public record Initial() implements ActorStep {} @Resume(behavior = ActorResumeBehavior.RETRY) public record Wait(String ts, int refreshCount) implements ActorStep {} - @Resume(behavior = ActorResumeBehavior.RESTART) - public record UpdateRefresh(int refreshCount) implements ActorStep {} - @Resume(behavior = ActorResumeBehavior.RESTART) - public record UpdateClean() implements ActorStep {} + @Resume(behavior = ActorResumeBehavior.RETRY) + public record UpdateRefresh(int refreshCount, long msgId) implements ActorStep { + public UpdateRefresh(int refreshCount) { + this(refreshCount, -1); + } + } + @Resume(behavior = ActorResumeBehavior.RETRY) + public record UpdateClean(long msgId) implements ActorStep { + public UpdateClean() { + this(-1); + } + } @Override public ActorStep transition(ActorStep self) throws Exception { @@ -73,17 +87,38 @@ public class UpdateRssActor extends RecordActorPrototype { } } - case UpdateRefresh(int count) -> { - feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH); - - // Increment the refresh count and schedule the next update - yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), count + 1); + case UpdateRefresh(int count, long msgId) when msgId < 0 -> { + long messageId = updateTaskOutbox.sendAsync("UpdateRefresh", ""); + feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH, messageId); + yield new UpdateRefresh(count, messageId); } - case UpdateClean() -> { - feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN); + case UpdateRefresh(int count, long msgId) -> { + var rsp = updateTaskOutbox.waitResponse(msgId, 12, TimeUnit.HOURS); + if (rsp.state() != MqMessageState.OK) { + // Retry the update + yield new Error("Failed to update feeds: " + rsp.state()); + } + else { + // Reset the refresh count after a successful update + yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), count + 1); + } + } + case UpdateClean(long msgId) when msgId < 0 -> { + long messageId = updateTaskOutbox.sendAsync("UpdateClean", ""); + feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN, messageId); - // Reset the refresh count after a clean update - yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), 0); + yield new UpdateClean(messageId); + } + case UpdateClean(long msgId) -> { + var rsp = updateTaskOutbox.waitResponse(msgId, 12, TimeUnit.HOURS); + if (rsp.state() != MqMessageState.OK) { + // Retry the update + yield new Error("Failed to clean feeds: " + rsp.state()); + } + else { + // Reset the refresh count after a successful update + yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), 0); + } } default -> new Error("Unknown actor step: " + self); }; diff --git a/code/functions/live-capture/api/build.gradle b/code/functions/live-capture/api/build.gradle index d5a4e27d..83aceaf0 100644 --- a/code/functions/live-capture/api/build.gradle +++ b/code/functions/live-capture/api/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:config') implementation project(':code:common:service') + implementation project(':code:libraries:message-queue') implementation libs.bundles.slf4j diff --git a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java index be3f93dd..064caf55 100644 --- a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java +++ b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java @@ -1,6 +1,8 @@ package nu.marginalia.api.feeds; import com.google.inject.Inject; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.service.client.GrpcChannelPoolFactory; import nu.marginalia.service.client.GrpcSingleNodeChannelPool; import nu.marginalia.service.discovery.property.ServiceKey; @@ -8,6 +10,7 @@ import nu.marginalia.service.discovery.property.ServicePartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -16,15 +19,23 @@ public class FeedsClient { private static final Logger logger = LoggerFactory.getLogger(FeedsClient.class); private final ExecutorService executorService = Executors.newCachedThreadPool(); private final GrpcSingleNodeChannelPool channelPool; + private final MqPersistence mqPersistence; @Inject - public FeedsClient(GrpcChannelPoolFactory factory) { + public FeedsClient(GrpcChannelPoolFactory factory, MqPersistence mqPersistence) { + this.mqPersistence = mqPersistence; // The client is only interested in the primary node var key = ServiceKey.forGrpcApi(FeedApiGrpc.class, ServicePartition.any()); this.channelPool = factory.createSingle(key, FeedApiGrpc::newBlockingStub); } + /** Create an appropriately named outbox for the update actor requests */ + public MqOutbox createOutbox(String callerName, int outboxNodeId) { + return new MqOutbox(mqPersistence, "update-rss-feeds", 1, callerName, outboxNodeId, UUID.randomUUID()); + } + + public CompletableFuture getFeed(int domainId) { try { return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeed) @@ -36,10 +47,14 @@ public class FeedsClient { } } - public void updateFeeds(RpcFeedUpdateMode mode) { + public void updateFeeds(RpcFeedUpdateMode mode, long msgId) { try { channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds) - .run(RpcUpdateRequest.newBuilder().setMode(mode).build()); + .run(RpcUpdateRequest.newBuilder() + .setMode(mode) + .setMsgId(msgId) + .build() + ); } catch (Exception e) { logger.error("API Exception", e); diff --git a/code/functions/live-capture/api/src/main/protobuf/feeds.proto b/code/functions/live-capture/api/src/main/protobuf/feeds.proto index 2417afc1..738eaf3e 100644 --- a/code/functions/live-capture/api/src/main/protobuf/feeds.proto +++ b/code/functions/live-capture/api/src/main/protobuf/feeds.proto @@ -16,6 +16,8 @@ message RpcDomainId { message RpcUpdateRequest { RpcFeedUpdateMode mode = 1; + int64 msgId = 2; // Id for a message on the message queue, will be replied to with a dummy response when the task is done, + // if the message id is not positive, no response will be attempted to be sent. } enum RpcFeedUpdateMode { diff --git a/code/functions/live-capture/build.gradle b/code/functions/live-capture/build.gradle index 9e142164..d4e1bceb 100644 --- a/code/functions/live-capture/build.gradle +++ b/code/functions/live-capture/build.gradle @@ -21,6 +21,7 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:db') implementation project(':code:libraries:blocking-thread-pool') + implementation project(':code:libraries:message-queue') implementation project(':code:execution:api') diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java index 288e0457..3cc0b84d 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java @@ -81,8 +81,7 @@ public class FeedFetcherService { public void updateFeeds(UpdateMode updateMode) throws IOException { if (updating) // Prevent concurrent updates { - logger.error("Already updating feeds, refusing to start another update"); - return; + throw new IllegalStateException("Already updating feeds, refusing to start another update"); } try (var writer = feedDb.createWriter(); diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java index 6944fc24..46524d16 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java @@ -5,18 +5,21 @@ import io.grpc.stub.StreamObserver; import nu.marginalia.api.feeds.*; import nu.marginalia.db.DbDomainQueries; import nu.marginalia.model.EdgeDomain; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mq.task.MqLongRunningTask; +import nu.marginalia.mq.task.MqTaskResult; import nu.marginalia.rss.db.FeedDb; import nu.marginalia.rss.model.FeedItems; import nu.marginalia.service.server.DiscoverableService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Optional; public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements DiscoverableService { private final FeedDb feedDb; private final DbDomainQueries domainQueries; + private final MqPersistence mqPersistence; private final FeedFetcherService feedFetcherService; private static final Logger logger = LoggerFactory.getLogger(FeedsGrpcService.class); @@ -24,9 +27,11 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis @Inject public FeedsGrpcService(FeedDb feedDb, DbDomainQueries domainQueries, + MqPersistence mqPersistence, FeedFetcherService feedFetcherService) { this.feedDb = feedDb; this.domainQueries = domainQueries; + this.mqPersistence = mqPersistence; this.feedFetcherService = feedFetcherService; } @@ -46,13 +51,14 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis default -> throw new IllegalStateException("Unexpected value: " + request.getMode()); }; - Thread.ofPlatform().start(() -> { - try { - feedFetcherService.updateFeeds(updateMode); - } catch (IOException e) { - logger.error("Failed to update feeds", e); - } - }); + // Start a long-running task to update the feeds + MqLongRunningTask + .of(request.getMsgId(), "updateFeeds", mqPersistence) + .asThread(() -> { + feedFetcherService.updateFeeds(updateMode); + return new MqTaskResult.Success(); + }) + .start(); responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); diff --git a/code/libraries/message-queue/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/libraries/message-queue/java/nu/marginalia/mq/persistence/MqPersistence.java index a3ddc17b..4a6af3fc 100644 --- a/code/libraries/message-queue/java/nu/marginalia/mq/persistence/MqPersistence.java +++ b/code/libraries/message-queue/java/nu/marginalia/mq/persistence/MqPersistence.java @@ -5,8 +5,10 @@ import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.MqMessage; +import nu.marginalia.mq.MqMessageState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.sql.SQLException; @@ -24,6 +26,8 @@ public class MqPersistence { private final HikariDataSource dataSource; private final Gson gson; + private static final Logger logger = LoggerFactory.getLogger(MqPersistence.class); + public MqPersistence(HikariDataSource dataSource) { this.dataSource = dataSource; this.gson = null; @@ -197,6 +201,20 @@ public class MqPersistence { } } + /** Sends an error response to the message with the given id, this is a convenience wrapper for + * sendResponse() that send a generic error message. The message will be marked as 'ERR'. + *

+ * If an exception is thrown while sending the response, it will be logged, but not rethrown + * to avoid creating exception handling pyramids. At this point, we've already given it a college try. + * */ + public void sendErrorResponse(long msgId, String message, Throwable e) { + try { + sendResponse(msgId, MqMessageState.ERR, message + ": " + e.getMessage()); + } catch (SQLException ex) { + logger.error("Failed to send error response", ex); + } + } + /** Marks unclaimed messages addressed to this inbox with instanceUUID and tick, * then returns the number of messages marked. This is an atomic operation that diff --git a/code/libraries/message-queue/java/nu/marginalia/mq/task/MqLongRunningTask.java b/code/libraries/message-queue/java/nu/marginalia/mq/task/MqLongRunningTask.java new file mode 100644 index 00000000..46f74b71 --- /dev/null +++ b/code/libraries/message-queue/java/nu/marginalia/mq/task/MqLongRunningTask.java @@ -0,0 +1,162 @@ +package nu.marginalia.mq.task; + +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.persistence.MqPersistence; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** A long-running task that can be executed asynchronously + * and report back to the message queue. + *

+ * The idiomatic pattern is to create an outbox and send a message corresponding to the task, + * and then pass the message id along with the request to trigger the task over gRPC. + *

+ * The gRPC service will spin off a thread and return immediately, while the task is executed + * in the background. The task can then report back to the message queue with the result + * of the task as it completes. + * */ +public abstract class MqLongRunningTask implements Runnable { + + /** Create a new task with the given message id, name, and persistence. If the msgId is + * not positive, a dummy implementation is provided that does not report to the message queue. + */ + public static MqLongRunningTask of(long msgId, String name, MqPersistence persistence) { + if (msgId <= 0) { + return new MqLongRunningTaskDummyImpl(name); + } + else { + return new MqLongRunningTaskImpl(persistence, msgId, name); + } + } + + /** Creates a thread that will execute the task. The thread is not started automatically */ + public Thread asThread(MqTaskFunction r) { + return Thread.ofPlatform().name(name()).start(() -> runNow(r)); + } + + /** Creates a future that will execute the task on the provided ExecutorService. */ + public Future asFuture(ExecutorService executor, MqTaskFunction r) { + return executor.submit(() -> runNow(r)); + } + + /** Execute the task synchronously and return true if the task was successful */ + public boolean runNow(MqTaskFunction r) { + try { + switch (r.run()) { + case MqTaskResult.Success success -> { + finish(success.message()); + return true; + } + case MqTaskResult.Failure failure -> fail(failure.message()); + } + } + catch (Exception e) { + fail(e); + } + return false; + } + + abstract void finish(); + abstract void finish(String message); + + abstract void fail(String message); + abstract void fail(Throwable e); + + public abstract String name(); +} + +class MqLongRunningTaskDummyImpl extends MqLongRunningTask { + private final String name; + + MqLongRunningTaskDummyImpl(String name) { + this.name = name; + } + + @Override + public void finish() {} + + @Override + public void finish(String message) {} + + @Override + public void fail(String message) {} + + @Override + public void fail(Throwable e) {} + + @Override + public void run() {} + + @Override + public String name() { + return name; + } +} + +class MqLongRunningTaskImpl extends MqLongRunningTask { + private final MqPersistence persistence; + private final long msgId; + private final String name; + + MqLongRunningTaskImpl(MqPersistence persistence, long msgId, String name) { + this.persistence = persistence; + this.msgId = msgId; + this.name = name; + + try { + persistence.updateMessageState(msgId, MqMessageState.ACK); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void finish() { + try { + persistence.sendResponse(msgId, MqMessageState.OK, "Success"); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void finish(String message) { + try { + persistence.sendResponse(msgId, MqMessageState.OK, message); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void fail(String message) { + try { + persistence.sendResponse(msgId, MqMessageState.ERR, message); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void fail(Throwable e) { + try { + persistence.sendResponse(msgId, MqMessageState.ERR, e.getMessage()); + } + catch (Exception e2) { + throw new RuntimeException(e2); + } + } + + @Override + public void run() {} + + @Override + public String name() { + return name; + } +} \ No newline at end of file diff --git a/code/libraries/message-queue/java/nu/marginalia/mq/task/MqTaskFunction.java b/code/libraries/message-queue/java/nu/marginalia/mq/task/MqTaskFunction.java new file mode 100644 index 00000000..c82fb23f --- /dev/null +++ b/code/libraries/message-queue/java/nu/marginalia/mq/task/MqTaskFunction.java @@ -0,0 +1,5 @@ +package nu.marginalia.mq.task; + +public interface MqTaskFunction { + MqTaskResult run() throws Exception; +} diff --git a/code/libraries/message-queue/java/nu/marginalia/mq/task/MqTaskResult.java b/code/libraries/message-queue/java/nu/marginalia/mq/task/MqTaskResult.java new file mode 100644 index 00000000..9ab73607 --- /dev/null +++ b/code/libraries/message-queue/java/nu/marginalia/mq/task/MqTaskResult.java @@ -0,0 +1,14 @@ +package nu.marginalia.mq.task; + +public sealed interface MqTaskResult { + record Success(String message) implements MqTaskResult { + public Success(){ + this("Ok"); + } + } + record Failure(String message) implements MqTaskResult { + public Failure(Throwable e){ + this(e.getClass().getSimpleName() + " : " + e.getMessage()); + } + } +}