diff --git a/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java b/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java index 459cd75b..3ccad794 100644 --- a/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java +++ b/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java @@ -8,31 +8,31 @@ import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.Resume; import nu.marginalia.api.feeds.FeedsClient; import nu.marginalia.api.feeds.RpcFeedUpdateMode; +import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessageState; -import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.service.module.ServiceConfiguration; import java.time.Duration; import java.time.LocalDateTime; -import java.util.concurrent.TimeUnit; public class UpdateRssActor extends RecordActorPrototype { private final FeedsClient feedsClient; private final int nodeId; - private final MqOutbox updateTaskOutbox; - private final Duration initialDelay = Duration.ofMinutes(5); private final Duration updateInterval = Duration.ofHours(24); private final int cleanInterval = 60; + private final MqPersistence persistence; + @Inject - public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration) { + public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration, MqPersistence persistence) { super(gson); this.feedsClient = feedsClient; this.nodeId = serviceConfiguration.node(); - this.updateTaskOutbox = feedsClient.createOutbox("update-rss-actor", nodeId); + this.persistence = persistence; } public record Initial() implements ActorStep {} @@ -88,32 +88,35 @@ public class UpdateRssActor extends RecordActorPrototype { } } case UpdateRefresh(int count, long msgId) when msgId < 0 -> { - long messageId = updateTaskOutbox.sendAsync("UpdateRefresh", ""); - feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH, messageId); + long messageId = feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH); yield new UpdateRefresh(count, messageId); } case UpdateRefresh(int count, long msgId) -> { - var rsp = updateTaskOutbox.waitResponse(msgId, 12, TimeUnit.HOURS); - if (rsp.state() != MqMessageState.OK) { + MqMessage msg = persistence.waitForMessageTerminalState(msgId, Duration.ofSeconds(10), Duration.ofHours(12)); + if (msg == null) { // Retry the update - yield new Error("Failed to update feeds: " + rsp.state()); + yield new Error("Failed to update feeds: message not found"); + } else if (msg.state() != MqMessageState.OK) { + // Retry the update + yield new Error("Failed to update feeds: " + msg.state()); } else { - // Reset the refresh count after a successful update + // Increment the refresh count yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), count + 1); } } case UpdateClean(long msgId) when msgId < 0 -> { - long messageId = updateTaskOutbox.sendAsync("UpdateClean", ""); - feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN, messageId); - + long messageId = feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN); yield new UpdateClean(messageId); } case UpdateClean(long msgId) -> { - var rsp = updateTaskOutbox.waitResponse(msgId, 12, TimeUnit.HOURS); - if (rsp.state() != MqMessageState.OK) { + MqMessage msg = persistence.waitForMessageTerminalState(msgId, Duration.ofSeconds(10), Duration.ofHours(12)); + if (msg == null) { // Retry the update - yield new Error("Failed to clean feeds: " + rsp.state()); + yield new Error("Failed to update feeds: message not found"); + } else if (msg.state() != MqMessageState.OK) { + // Retry the update + yield new Error("Failed to update feeds: " + msg.state()); } else { // Reset the refresh count after a successful update diff --git a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java index 064caf55..d69145b2 100644 --- a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java +++ b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java @@ -1,41 +1,40 @@ package nu.marginalia.api.feeds; import com.google.inject.Inject; +import com.google.inject.Singleton; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.service.client.GrpcChannelPoolFactory; import nu.marginalia.service.client.GrpcSingleNodeChannelPool; import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServicePartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import nu.marginalia.service.module.ServiceConfiguration; +import javax.annotation.CheckReturnValue; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +@Singleton public class FeedsClient { - private static final Logger logger = LoggerFactory.getLogger(FeedsClient.class); private final ExecutorService executorService = Executors.newCachedThreadPool(); private final GrpcSingleNodeChannelPool channelPool; - private final MqPersistence mqPersistence; + private final MqOutbox updateFeedsOutbox; @Inject - public FeedsClient(GrpcChannelPoolFactory factory, MqPersistence mqPersistence) { - this.mqPersistence = mqPersistence; + public FeedsClient(GrpcChannelPoolFactory factory, MqPersistence mqPersistence, ServiceConfiguration serviceConfiguration) { + // The client is only interested in the primary node var key = ServiceKey.forGrpcApi(FeedApiGrpc.class, ServicePartition.any()); + this.channelPool = factory.createSingle(key, FeedApiGrpc::newBlockingStub); + this.updateFeedsOutbox = new MqOutbox(mqPersistence, + "update-rss-feeds", 0, + serviceConfiguration.serviceName(), serviceConfiguration.node(), + UUID.randomUUID()); } - - /** Create an appropriately named outbox for the update actor requests */ - public MqOutbox createOutbox(String callerName, int outboxNodeId) { - return new MqOutbox(mqPersistence, "update-rss-feeds", 1, callerName, outboxNodeId, UUID.randomUUID()); - } - - public CompletableFuture getFeed(int domainId) { try { return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeed) @@ -47,18 +46,20 @@ public class FeedsClient { } } - public void updateFeeds(RpcFeedUpdateMode mode, long msgId) { - try { - channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds) - .run(RpcUpdateRequest.newBuilder() - .setMode(mode) - .setMsgId(msgId) - .build() - ); - } - catch (Exception e) { - logger.error("API Exception", e); - } + /** Update the feeds, return a message ID for the update */ + @CheckReturnValue + public long updateFeeds(RpcFeedUpdateMode mode) throws Exception { + // Create a message for the {@link MqLongRunningTask} paradigm to use for tracking the task + long msgId = updateFeedsOutbox.sendAsync("updateFeeds", ""); + + channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds) + .run(RpcUpdateRequest.newBuilder() + .setMode(mode) + .setMsgId(msgId) + .build() + ); + + return msgId; } } diff --git a/code/libraries/message-queue/java/nu/marginalia/mq/outbox/MqOutbox.java b/code/libraries/message-queue/java/nu/marginalia/mq/outbox/MqOutbox.java index b1f0a44b..e59bc8ed 100644 --- a/code/libraries/message-queue/java/nu/marginalia/mq/outbox/MqOutbox.java +++ b/code/libraries/message-queue/java/nu/marginalia/mq/outbox/MqOutbox.java @@ -93,14 +93,14 @@ public class MqOutbox { } /** Send a message and wait for a response. */ - public MqMessage send(String function, String payload) throws Exception { + public MqMessage sendBlocking(String function, String payload) throws Exception { final long id = sendAsync(function, payload); return waitResponse(id); } /** Send a message and wait for a response */ - public MqMessage send(Object object) throws Exception { + public MqMessage sendBlocking(Object object) throws Exception { final long id = sendAsync(object); return waitResponse(id); diff --git a/code/libraries/message-queue/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/libraries/message-queue/java/nu/marginalia/mq/persistence/MqPersistence.java index 4a6af3fc..0d9a04ed 100644 --- a/code/libraries/message-queue/java/nu/marginalia/mq/persistence/MqPersistence.java +++ b/code/libraries/message-queue/java/nu/marginalia/mq/persistence/MqPersistence.java @@ -149,6 +149,35 @@ public class MqPersistence { } } + /** Blocks until a message reaches a terminal state or the timeout passes. + *

+ * @param msgId The id of the message to wait for + * @param pollInterval The interval to poll the database for updates + * @param timeout The maximum time to wait for the message to reach a terminal state + * @return The message if it reaches a terminal state, or null if the timeout passes + */ + @Nullable + public MqMessage waitForMessageTerminalState(long msgId, Duration pollInterval, Duration timeout) throws InterruptedException, SQLException { + long deadline = System.currentTimeMillis() + timeout.toMillis(); + + do { + var message = getMessage(msgId); + if (message.state().isTerminal()) { + return message; + } + + long timeLeft = deadline - System.currentTimeMillis(); + if (timeLeft <= 0) { + continue; + } + long sleepTime = Math.min(pollInterval.toMillis(), timeLeft); + + Thread.sleep(sleepTime); + } while (System.currentTimeMillis() < deadline); + + return null; + } + /** Creates a new message in the queue referencing as a reply to an existing message * This message will have it's RELATED_ID set to the original message's ID. */ @@ -201,21 +230,6 @@ public class MqPersistence { } } - /** Sends an error response to the message with the given id, this is a convenience wrapper for - * sendResponse() that send a generic error message. The message will be marked as 'ERR'. - *

- * If an exception is thrown while sending the response, it will be logged, but not rethrown - * to avoid creating exception handling pyramids. At this point, we've already given it a college try. - * */ - public void sendErrorResponse(long msgId, String message, Throwable e) { - try { - sendResponse(msgId, MqMessageState.ERR, message + ": " + e.getMessage()); - } catch (SQLException ex) { - logger.error("Failed to send error response", ex); - } - } - - /** Marks unclaimed messages addressed to this inbox with instanceUUID and tick, * then returns the number of messages marked. This is an atomic operation that * ensures that messages aren't double processed. diff --git a/code/libraries/message-queue/java/nu/marginalia/mq/task/MqLongRunningTask.java b/code/libraries/message-queue/java/nu/marginalia/mq/task/MqLongRunningTask.java index 46f74b71..9357aab9 100644 --- a/code/libraries/message-queue/java/nu/marginalia/mq/task/MqLongRunningTask.java +++ b/code/libraries/message-queue/java/nu/marginalia/mq/task/MqLongRunningTask.java @@ -2,6 +2,8 @@ package nu.marginalia.mq.task; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.persistence.MqPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -14,9 +16,10 @@ import java.util.concurrent.Future; *

* The gRPC service will spin off a thread and return immediately, while the task is executed * in the background. The task can then report back to the message queue with the result - * of the task as it completes. + * of the task as it completes, by updating the message's state. * */ public abstract class MqLongRunningTask implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(MqLongRunningTask.class); /** Create a new task with the given message id, name, and persistence. If the msgId is * not positive, a dummy implementation is provided that does not report to the message queue. @@ -32,7 +35,7 @@ public abstract class MqLongRunningTask implements Runnable { /** Creates a thread that will execute the task. The thread is not started automatically */ public Thread asThread(MqTaskFunction r) { - return Thread.ofPlatform().name(name()).start(() -> runNow(r)); + return new Thread(() -> runNow(r), name()); } /** Creates a future that will execute the task on the provided ExecutorService. */ @@ -45,23 +48,22 @@ public abstract class MqLongRunningTask implements Runnable { try { switch (r.run()) { case MqTaskResult.Success success -> { - finish(success.message()); + finish(); return true; } - case MqTaskResult.Failure failure -> fail(failure.message()); + case MqTaskResult.Failure failure -> fail(); } } catch (Exception e) { - fail(e); + logger.error("Task failed", e); + fail(); } return false; } abstract void finish(); - abstract void finish(String message); - abstract void fail(String message); - abstract void fail(Throwable e); + abstract void fail(); public abstract String name(); } @@ -77,13 +79,7 @@ class MqLongRunningTaskDummyImpl extends MqLongRunningTask { public void finish() {} @Override - public void finish(String message) {} - - @Override - public void fail(String message) {} - - @Override - public void fail(Throwable e) {} + public void fail() {} @Override public void run() {} @@ -115,7 +111,7 @@ class MqLongRunningTaskImpl extends MqLongRunningTask { @Override public void finish() { try { - persistence.sendResponse(msgId, MqMessageState.OK, "Success"); + persistence.updateMessageState(msgId, MqMessageState.OK); } catch (Exception e) { throw new RuntimeException(e); @@ -123,35 +119,15 @@ class MqLongRunningTaskImpl extends MqLongRunningTask { } @Override - public void finish(String message) { + public void fail() { try { - persistence.sendResponse(msgId, MqMessageState.OK, message); + persistence.updateMessageState(msgId, MqMessageState.ERR); } catch (Exception e) { throw new RuntimeException(e); } } - @Override - public void fail(String message) { - try { - persistence.sendResponse(msgId, MqMessageState.ERR, message); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void fail(Throwable e) { - try { - persistence.sendResponse(msgId, MqMessageState.ERR, e.getMessage()); - } - catch (Exception e2) { - throw new RuntimeException(e2); - } - } - @Override public void run() {} diff --git a/code/libraries/message-queue/test/nu/marginalia/mq/outbox/MqOutboxTest.java b/code/libraries/message-queue/test/nu/marginalia/mq/outbox/MqOutboxTest.java index c9d34f19..aa06f924 100644 --- a/code/libraries/message-queue/test/nu/marginalia/mq/outbox/MqOutboxTest.java +++ b/code/libraries/message-queue/test/nu/marginalia/mq/outbox/MqOutboxTest.java @@ -110,9 +110,9 @@ public class MqOutboxTest { } @Test - public void testSend() throws Exception { + public void testSendBlocking() throws Exception { var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID()); - Executors.newSingleThreadExecutor().submit(() -> outbox.send("test", "Hello World")); + Executors.newSingleThreadExecutor().submit(() -> outbox.sendBlocking("test", "Hello World")); TimeUnit.MILLISECONDS.sleep(100); @@ -125,14 +125,14 @@ public class MqOutboxTest { @Test - public void testSendAndRespondAsyncInbox() throws Exception { + public void testSendBlockingAndRespondAsyncInbox() throws Exception { var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID()); var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID()); inbox.subscribe(justRespond("Alright then")); inbox.start(); - var rsp = outbox.send("test", "Hello World"); + var rsp = outbox.sendBlocking("test", "Hello World"); assertEquals(MqMessageState.OK, rsp.state()); assertEquals("Alright then", rsp.payload()); @@ -146,14 +146,14 @@ public class MqOutboxTest { } @Test - public void testSendAndRespondSyncInbox() throws Exception { + public void testSendBlockingAndRespondSyncInbox() throws Exception { var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID()); var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID()); inbox.subscribe(justRespond("Alright then")); inbox.start(); - var rsp = outbox.send("test", "Hello World"); + var rsp = outbox.sendBlocking("test", "Hello World"); assertEquals(MqMessageState.OK, rsp.state()); assertEquals("Alright then", rsp.payload()); @@ -167,17 +167,17 @@ public class MqOutboxTest { } @Test - public void testSendMultipleAsyncInbox() throws Exception { + public void testSendBlockingMultipleAsyncInbox() throws Exception { var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID()); var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID()); inbox.subscribe(echo()); inbox.start(); - var rsp1 = outbox.send("test", "one"); - var rsp2 = outbox.send("test", "two"); - var rsp3 = outbox.send("test", "three"); - var rsp4 = outbox.send("test", "four"); + var rsp1 = outbox.sendBlocking("test", "one"); + var rsp2 = outbox.sendBlocking("test", "two"); + var rsp3 = outbox.sendBlocking("test", "three"); + var rsp4 = outbox.sendBlocking("test", "four"); Thread.sleep(500); @@ -201,17 +201,17 @@ public class MqOutboxTest { } @Test - public void testSendMultipleSyncInbox() throws Exception { + public void testSendBlockingMultipleSyncInbox() throws Exception { var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID()); var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID()); inbox.subscribe(echo()); inbox.start(); - var rsp1 = outbox.send("test", "one"); - var rsp2 = outbox.send("test", "two"); - var rsp3 = outbox.send("test", "three"); - var rsp4 = outbox.send("test", "four"); + var rsp1 = outbox.sendBlocking("test", "one"); + var rsp2 = outbox.sendBlocking("test", "two"); + var rsp3 = outbox.sendBlocking("test", "three"); + var rsp4 = outbox.sendBlocking("test", "four"); Thread.sleep(500); @@ -235,13 +235,13 @@ public class MqOutboxTest { } @Test - public void testSendAndRespondWithErrorHandlerAsyncInbox() throws Exception { + public void testSendBlockingAndRespondWithErrorHandlerAsyncInbox() throws Exception { var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID()); var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID()); inbox.start(); - var rsp = outbox.send("test", "Hello World"); + var rsp = outbox.sendBlocking("test", "Hello World"); assertEquals(MqMessageState.ERR, rsp.state()); @@ -254,13 +254,13 @@ public class MqOutboxTest { } @Test - public void testSendAndRespondWithErrorHandlerSyncInbox() throws Exception { + public void testSendBlockingAndRespondWithErrorHandlerSyncInbox() throws Exception { var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID()); var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID()); inbox.start(); - var rsp = outbox.send("test", "Hello World"); + var rsp = outbox.sendBlocking("test", "Hello World"); assertEquals(MqMessageState.ERR, rsp.state());