mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 21:18:58 +00:00
(feed) Use the message queue to permit the feeds service to tell the calling actor when it's finished
This commit is contained in:
parent
a2bc9a98c0
commit
a456ec9599
@ -8,31 +8,31 @@ import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.actor.state.Resume;
|
||||
import nu.marginalia.api.feeds.FeedsClient;
|
||||
import nu.marginalia.api.feeds.RpcFeedUpdateMode;
|
||||
import nu.marginalia.mq.MqMessage;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class UpdateRssActor extends RecordActorPrototype {
|
||||
|
||||
private final FeedsClient feedsClient;
|
||||
private final int nodeId;
|
||||
|
||||
private final MqOutbox updateTaskOutbox;
|
||||
|
||||
private final Duration initialDelay = Duration.ofMinutes(5);
|
||||
private final Duration updateInterval = Duration.ofHours(24);
|
||||
private final int cleanInterval = 60;
|
||||
|
||||
private final MqPersistence persistence;
|
||||
|
||||
@Inject
|
||||
public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration) {
|
||||
public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration, MqPersistence persistence) {
|
||||
super(gson);
|
||||
this.feedsClient = feedsClient;
|
||||
this.nodeId = serviceConfiguration.node();
|
||||
this.updateTaskOutbox = feedsClient.createOutbox("update-rss-actor", nodeId);
|
||||
this.persistence = persistence;
|
||||
}
|
||||
|
||||
public record Initial() implements ActorStep {}
|
||||
@ -88,32 +88,35 @@ public class UpdateRssActor extends RecordActorPrototype {
|
||||
}
|
||||
}
|
||||
case UpdateRefresh(int count, long msgId) when msgId < 0 -> {
|
||||
long messageId = updateTaskOutbox.sendAsync("UpdateRefresh", "");
|
||||
feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH, messageId);
|
||||
long messageId = feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH);
|
||||
yield new UpdateRefresh(count, messageId);
|
||||
}
|
||||
case UpdateRefresh(int count, long msgId) -> {
|
||||
var rsp = updateTaskOutbox.waitResponse(msgId, 12, TimeUnit.HOURS);
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
MqMessage msg = persistence.waitForMessageTerminalState(msgId, Duration.ofSeconds(10), Duration.ofHours(12));
|
||||
if (msg == null) {
|
||||
// Retry the update
|
||||
yield new Error("Failed to update feeds: " + rsp.state());
|
||||
yield new Error("Failed to update feeds: message not found");
|
||||
} else if (msg.state() != MqMessageState.OK) {
|
||||
// Retry the update
|
||||
yield new Error("Failed to update feeds: " + msg.state());
|
||||
}
|
||||
else {
|
||||
// Reset the refresh count after a successful update
|
||||
// Increment the refresh count
|
||||
yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), count + 1);
|
||||
}
|
||||
}
|
||||
case UpdateClean(long msgId) when msgId < 0 -> {
|
||||
long messageId = updateTaskOutbox.sendAsync("UpdateClean", "");
|
||||
feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN, messageId);
|
||||
|
||||
long messageId = feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN);
|
||||
yield new UpdateClean(messageId);
|
||||
}
|
||||
case UpdateClean(long msgId) -> {
|
||||
var rsp = updateTaskOutbox.waitResponse(msgId, 12, TimeUnit.HOURS);
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
MqMessage msg = persistence.waitForMessageTerminalState(msgId, Duration.ofSeconds(10), Duration.ofHours(12));
|
||||
if (msg == null) {
|
||||
// Retry the update
|
||||
yield new Error("Failed to clean feeds: " + rsp.state());
|
||||
yield new Error("Failed to update feeds: message not found");
|
||||
} else if (msg.state() != MqMessageState.OK) {
|
||||
// Retry the update
|
||||
yield new Error("Failed to update feeds: " + msg.state());
|
||||
}
|
||||
else {
|
||||
// Reset the refresh count after a successful update
|
||||
|
@ -1,41 +1,40 @@
|
||||
package nu.marginalia.api.feeds;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
||||
import nu.marginalia.service.client.GrpcSingleNodeChannelPool;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
import javax.annotation.CheckReturnValue;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@Singleton
|
||||
public class FeedsClient {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FeedsClient.class);
|
||||
private final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
private final GrpcSingleNodeChannelPool<FeedApiGrpc.FeedApiBlockingStub> channelPool;
|
||||
private final MqPersistence mqPersistence;
|
||||
private final MqOutbox updateFeedsOutbox;
|
||||
|
||||
@Inject
|
||||
public FeedsClient(GrpcChannelPoolFactory factory, MqPersistence mqPersistence) {
|
||||
this.mqPersistence = mqPersistence;
|
||||
public FeedsClient(GrpcChannelPoolFactory factory, MqPersistence mqPersistence, ServiceConfiguration serviceConfiguration) {
|
||||
|
||||
// The client is only interested in the primary node
|
||||
var key = ServiceKey.forGrpcApi(FeedApiGrpc.class, ServicePartition.any());
|
||||
|
||||
this.channelPool = factory.createSingle(key, FeedApiGrpc::newBlockingStub);
|
||||
this.updateFeedsOutbox = new MqOutbox(mqPersistence,
|
||||
"update-rss-feeds", 0,
|
||||
serviceConfiguration.serviceName(), serviceConfiguration.node(),
|
||||
UUID.randomUUID());
|
||||
}
|
||||
|
||||
|
||||
/** Create an appropriately named outbox for the update actor requests */
|
||||
public MqOutbox createOutbox(String callerName, int outboxNodeId) {
|
||||
return new MqOutbox(mqPersistence, "update-rss-feeds", 1, callerName, outboxNodeId, UUID.randomUUID());
|
||||
}
|
||||
|
||||
|
||||
public CompletableFuture<RpcFeed> getFeed(int domainId) {
|
||||
try {
|
||||
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeed)
|
||||
@ -47,18 +46,20 @@ public class FeedsClient {
|
||||
}
|
||||
}
|
||||
|
||||
public void updateFeeds(RpcFeedUpdateMode mode, long msgId) {
|
||||
try {
|
||||
channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds)
|
||||
.run(RpcUpdateRequest.newBuilder()
|
||||
.setMode(mode)
|
||||
.setMsgId(msgId)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("API Exception", e);
|
||||
}
|
||||
/** Update the feeds, return a message ID for the update */
|
||||
@CheckReturnValue
|
||||
public long updateFeeds(RpcFeedUpdateMode mode) throws Exception {
|
||||
// Create a message for the {@link MqLongRunningTask} paradigm to use for tracking the task
|
||||
long msgId = updateFeedsOutbox.sendAsync("updateFeeds", "");
|
||||
|
||||
channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds)
|
||||
.run(RpcUpdateRequest.newBuilder()
|
||||
.setMode(mode)
|
||||
.setMsgId(msgId)
|
||||
.build()
|
||||
);
|
||||
|
||||
return msgId;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -93,14 +93,14 @@ public class MqOutbox {
|
||||
}
|
||||
|
||||
/** Send a message and wait for a response. */
|
||||
public MqMessage send(String function, String payload) throws Exception {
|
||||
public MqMessage sendBlocking(String function, String payload) throws Exception {
|
||||
final long id = sendAsync(function, payload);
|
||||
|
||||
return waitResponse(id);
|
||||
}
|
||||
|
||||
/** Send a message and wait for a response */
|
||||
public MqMessage send(Object object) throws Exception {
|
||||
public MqMessage sendBlocking(Object object) throws Exception {
|
||||
final long id = sendAsync(object);
|
||||
|
||||
return waitResponse(id);
|
||||
|
@ -149,6 +149,35 @@ public class MqPersistence {
|
||||
}
|
||||
}
|
||||
|
||||
/** Blocks until a message reaches a terminal state or the timeout passes.
|
||||
* <p>
|
||||
* @param msgId The id of the message to wait for
|
||||
* @param pollInterval The interval to poll the database for updates
|
||||
* @param timeout The maximum time to wait for the message to reach a terminal state
|
||||
* @return The message if it reaches a terminal state, or null if the timeout passes
|
||||
*/
|
||||
@Nullable
|
||||
public MqMessage waitForMessageTerminalState(long msgId, Duration pollInterval, Duration timeout) throws InterruptedException, SQLException {
|
||||
long deadline = System.currentTimeMillis() + timeout.toMillis();
|
||||
|
||||
do {
|
||||
var message = getMessage(msgId);
|
||||
if (message.state().isTerminal()) {
|
||||
return message;
|
||||
}
|
||||
|
||||
long timeLeft = deadline - System.currentTimeMillis();
|
||||
if (timeLeft <= 0) {
|
||||
continue;
|
||||
}
|
||||
long sleepTime = Math.min(pollInterval.toMillis(), timeLeft);
|
||||
|
||||
Thread.sleep(sleepTime);
|
||||
} while (System.currentTimeMillis() < deadline);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Creates a new message in the queue referencing as a reply to an existing message
|
||||
* This message will have it's RELATED_ID set to the original message's ID.
|
||||
*/
|
||||
@ -201,21 +230,6 @@ public class MqPersistence {
|
||||
}
|
||||
}
|
||||
|
||||
/** Sends an error response to the message with the given id, this is a convenience wrapper for
|
||||
* sendResponse() that send a generic error message. The message will be marked as 'ERR'.
|
||||
* <p></p>
|
||||
* If an exception is thrown while sending the response, it will be logged, but not rethrown
|
||||
* to avoid creating exception handling pyramids. At this point, we've already given it a college try.
|
||||
* */
|
||||
public void sendErrorResponse(long msgId, String message, Throwable e) {
|
||||
try {
|
||||
sendResponse(msgId, MqMessageState.ERR, message + ": " + e.getMessage());
|
||||
} catch (SQLException ex) {
|
||||
logger.error("Failed to send error response", ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Marks unclaimed messages addressed to this inbox with instanceUUID and tick,
|
||||
* then returns the number of messages marked. This is an atomic operation that
|
||||
* ensures that messages aren't double processed.
|
||||
|
@ -2,6 +2,8 @@ package nu.marginalia.mq.task;
|
||||
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
@ -14,9 +16,10 @@ import java.util.concurrent.Future;
|
||||
* <p></p>
|
||||
* The gRPC service will spin off a thread and return immediately, while the task is executed
|
||||
* in the background. The task can then report back to the message queue with the result
|
||||
* of the task as it completes.
|
||||
* of the task as it completes, by updating the message's state.
|
||||
* */
|
||||
public abstract class MqLongRunningTask implements Runnable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(MqLongRunningTask.class);
|
||||
|
||||
/** Create a new task with the given message id, name, and persistence. If the msgId is
|
||||
* not positive, a dummy implementation is provided that does not report to the message queue.
|
||||
@ -32,7 +35,7 @@ public abstract class MqLongRunningTask implements Runnable {
|
||||
|
||||
/** Creates a thread that will execute the task. The thread is not started automatically */
|
||||
public Thread asThread(MqTaskFunction r) {
|
||||
return Thread.ofPlatform().name(name()).start(() -> runNow(r));
|
||||
return new Thread(() -> runNow(r), name());
|
||||
}
|
||||
|
||||
/** Creates a future that will execute the task on the provided ExecutorService. */
|
||||
@ -45,23 +48,22 @@ public abstract class MqLongRunningTask implements Runnable {
|
||||
try {
|
||||
switch (r.run()) {
|
||||
case MqTaskResult.Success success -> {
|
||||
finish(success.message());
|
||||
finish();
|
||||
return true;
|
||||
}
|
||||
case MqTaskResult.Failure failure -> fail(failure.message());
|
||||
case MqTaskResult.Failure failure -> fail();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
fail(e);
|
||||
logger.error("Task failed", e);
|
||||
fail();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
abstract void finish();
|
||||
abstract void finish(String message);
|
||||
|
||||
abstract void fail(String message);
|
||||
abstract void fail(Throwable e);
|
||||
abstract void fail();
|
||||
|
||||
public abstract String name();
|
||||
}
|
||||
@ -77,13 +79,7 @@ class MqLongRunningTaskDummyImpl extends MqLongRunningTask {
|
||||
public void finish() {}
|
||||
|
||||
@Override
|
||||
public void finish(String message) {}
|
||||
|
||||
@Override
|
||||
public void fail(String message) {}
|
||||
|
||||
@Override
|
||||
public void fail(Throwable e) {}
|
||||
public void fail() {}
|
||||
|
||||
@Override
|
||||
public void run() {}
|
||||
@ -115,7 +111,7 @@ class MqLongRunningTaskImpl extends MqLongRunningTask {
|
||||
@Override
|
||||
public void finish() {
|
||||
try {
|
||||
persistence.sendResponse(msgId, MqMessageState.OK, "Success");
|
||||
persistence.updateMessageState(msgId, MqMessageState.OK);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
@ -123,35 +119,15 @@ class MqLongRunningTaskImpl extends MqLongRunningTask {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finish(String message) {
|
||||
public void fail() {
|
||||
try {
|
||||
persistence.sendResponse(msgId, MqMessageState.OK, message);
|
||||
persistence.updateMessageState(msgId, MqMessageState.ERR);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fail(String message) {
|
||||
try {
|
||||
persistence.sendResponse(msgId, MqMessageState.ERR, message);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fail(Throwable e) {
|
||||
try {
|
||||
persistence.sendResponse(msgId, MqMessageState.ERR, e.getMessage());
|
||||
}
|
||||
catch (Exception e2) {
|
||||
throw new RuntimeException(e2);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {}
|
||||
|
||||
|
@ -110,9 +110,9 @@ public class MqOutboxTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSend() throws Exception {
|
||||
public void testSendBlocking() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
|
||||
Executors.newSingleThreadExecutor().submit(() -> outbox.send("test", "Hello World"));
|
||||
Executors.newSingleThreadExecutor().submit(() -> outbox.sendBlocking("test", "Hello World"));
|
||||
|
||||
TimeUnit.MILLISECONDS.sleep(100);
|
||||
|
||||
@ -125,14 +125,14 @@ public class MqOutboxTest {
|
||||
|
||||
|
||||
@Test
|
||||
public void testSendAndRespondAsyncInbox() throws Exception {
|
||||
public void testSendBlockingAndRespondAsyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
|
||||
|
||||
var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
|
||||
inbox.subscribe(justRespond("Alright then"));
|
||||
inbox.start();
|
||||
|
||||
var rsp = outbox.send("test", "Hello World");
|
||||
var rsp = outbox.sendBlocking("test", "Hello World");
|
||||
|
||||
assertEquals(MqMessageState.OK, rsp.state());
|
||||
assertEquals("Alright then", rsp.payload());
|
||||
@ -146,14 +146,14 @@ public class MqOutboxTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendAndRespondSyncInbox() throws Exception {
|
||||
public void testSendBlockingAndRespondSyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
|
||||
|
||||
var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
|
||||
inbox.subscribe(justRespond("Alright then"));
|
||||
inbox.start();
|
||||
|
||||
var rsp = outbox.send("test", "Hello World");
|
||||
var rsp = outbox.sendBlocking("test", "Hello World");
|
||||
|
||||
assertEquals(MqMessageState.OK, rsp.state());
|
||||
assertEquals("Alright then", rsp.payload());
|
||||
@ -167,17 +167,17 @@ public class MqOutboxTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendMultipleAsyncInbox() throws Exception {
|
||||
public void testSendBlockingMultipleAsyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
|
||||
|
||||
var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
|
||||
inbox.subscribe(echo());
|
||||
inbox.start();
|
||||
|
||||
var rsp1 = outbox.send("test", "one");
|
||||
var rsp2 = outbox.send("test", "two");
|
||||
var rsp3 = outbox.send("test", "three");
|
||||
var rsp4 = outbox.send("test", "four");
|
||||
var rsp1 = outbox.sendBlocking("test", "one");
|
||||
var rsp2 = outbox.sendBlocking("test", "two");
|
||||
var rsp3 = outbox.sendBlocking("test", "three");
|
||||
var rsp4 = outbox.sendBlocking("test", "four");
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
@ -201,17 +201,17 @@ public class MqOutboxTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendMultipleSyncInbox() throws Exception {
|
||||
public void testSendBlockingMultipleSyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
|
||||
|
||||
var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
|
||||
inbox.subscribe(echo());
|
||||
inbox.start();
|
||||
|
||||
var rsp1 = outbox.send("test", "one");
|
||||
var rsp2 = outbox.send("test", "two");
|
||||
var rsp3 = outbox.send("test", "three");
|
||||
var rsp4 = outbox.send("test", "four");
|
||||
var rsp1 = outbox.sendBlocking("test", "one");
|
||||
var rsp2 = outbox.sendBlocking("test", "two");
|
||||
var rsp3 = outbox.sendBlocking("test", "three");
|
||||
var rsp4 = outbox.sendBlocking("test", "four");
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
@ -235,13 +235,13 @@ public class MqOutboxTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendAndRespondWithErrorHandlerAsyncInbox() throws Exception {
|
||||
public void testSendBlockingAndRespondWithErrorHandlerAsyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
|
||||
var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
|
||||
|
||||
inbox.start();
|
||||
|
||||
var rsp = outbox.send("test", "Hello World");
|
||||
var rsp = outbox.sendBlocking("test", "Hello World");
|
||||
|
||||
assertEquals(MqMessageState.ERR, rsp.state());
|
||||
|
||||
@ -254,13 +254,13 @@ public class MqOutboxTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendAndRespondWithErrorHandlerSyncInbox() throws Exception {
|
||||
public void testSendBlockingAndRespondWithErrorHandlerSyncInbox() throws Exception {
|
||||
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
|
||||
var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
|
||||
|
||||
inbox.start();
|
||||
|
||||
var rsp = outbox.send("test", "Hello World");
|
||||
var rsp = outbox.sendBlocking("test", "Hello World");
|
||||
|
||||
assertEquals(MqMessageState.ERR, rsp.state());
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user