mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-24 05:18:58 +00:00
(feed) Use the message queue to permit the feeds service to tell the calling actor when it's finished
This commit is contained in:
parent
e24a98390c
commit
a2bc9a98c0
@ -8,16 +8,21 @@ import nu.marginalia.actor.state.ActorStep;
|
|||||||
import nu.marginalia.actor.state.Resume;
|
import nu.marginalia.actor.state.Resume;
|
||||||
import nu.marginalia.api.feeds.FeedsClient;
|
import nu.marginalia.api.feeds.FeedsClient;
|
||||||
import nu.marginalia.api.feeds.RpcFeedUpdateMode;
|
import nu.marginalia.api.feeds.RpcFeedUpdateMode;
|
||||||
|
import nu.marginalia.mq.MqMessageState;
|
||||||
|
import nu.marginalia.mq.outbox.MqOutbox;
|
||||||
import nu.marginalia.service.module.ServiceConfiguration;
|
import nu.marginalia.service.module.ServiceConfiguration;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class UpdateRssActor extends RecordActorPrototype {
|
public class UpdateRssActor extends RecordActorPrototype {
|
||||||
|
|
||||||
private final FeedsClient feedsClient;
|
private final FeedsClient feedsClient;
|
||||||
private final int nodeId;
|
private final int nodeId;
|
||||||
|
|
||||||
|
private final MqOutbox updateTaskOutbox;
|
||||||
|
|
||||||
private final Duration initialDelay = Duration.ofMinutes(5);
|
private final Duration initialDelay = Duration.ofMinutes(5);
|
||||||
private final Duration updateInterval = Duration.ofHours(24);
|
private final Duration updateInterval = Duration.ofHours(24);
|
||||||
private final int cleanInterval = 60;
|
private final int cleanInterval = 60;
|
||||||
@ -27,15 +32,24 @@ public class UpdateRssActor extends RecordActorPrototype {
|
|||||||
super(gson);
|
super(gson);
|
||||||
this.feedsClient = feedsClient;
|
this.feedsClient = feedsClient;
|
||||||
this.nodeId = serviceConfiguration.node();
|
this.nodeId = serviceConfiguration.node();
|
||||||
|
this.updateTaskOutbox = feedsClient.createOutbox("update-rss-actor", nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public record Initial() implements ActorStep {}
|
public record Initial() implements ActorStep {}
|
||||||
@Resume(behavior = ActorResumeBehavior.RETRY)
|
@Resume(behavior = ActorResumeBehavior.RETRY)
|
||||||
public record Wait(String ts, int refreshCount) implements ActorStep {}
|
public record Wait(String ts, int refreshCount) implements ActorStep {}
|
||||||
@Resume(behavior = ActorResumeBehavior.RESTART)
|
@Resume(behavior = ActorResumeBehavior.RETRY)
|
||||||
public record UpdateRefresh(int refreshCount) implements ActorStep {}
|
public record UpdateRefresh(int refreshCount, long msgId) implements ActorStep {
|
||||||
@Resume(behavior = ActorResumeBehavior.RESTART)
|
public UpdateRefresh(int refreshCount) {
|
||||||
public record UpdateClean() implements ActorStep {}
|
this(refreshCount, -1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Resume(behavior = ActorResumeBehavior.RETRY)
|
||||||
|
public record UpdateClean(long msgId) implements ActorStep {
|
||||||
|
public UpdateClean() {
|
||||||
|
this(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActorStep transition(ActorStep self) throws Exception {
|
public ActorStep transition(ActorStep self) throws Exception {
|
||||||
@ -73,17 +87,38 @@ public class UpdateRssActor extends RecordActorPrototype {
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case UpdateRefresh(int count) -> {
|
case UpdateRefresh(int count, long msgId) when msgId < 0 -> {
|
||||||
feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH);
|
long messageId = updateTaskOutbox.sendAsync("UpdateRefresh", "");
|
||||||
|
feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH, messageId);
|
||||||
// Increment the refresh count and schedule the next update
|
yield new UpdateRefresh(count, messageId);
|
||||||
yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), count + 1);
|
|
||||||
}
|
}
|
||||||
case UpdateClean() -> {
|
case UpdateRefresh(int count, long msgId) -> {
|
||||||
feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN);
|
var rsp = updateTaskOutbox.waitResponse(msgId, 12, TimeUnit.HOURS);
|
||||||
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
|
// Retry the update
|
||||||
|
yield new Error("Failed to update feeds: " + rsp.state());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// Reset the refresh count after a successful update
|
||||||
|
yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), count + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case UpdateClean(long msgId) when msgId < 0 -> {
|
||||||
|
long messageId = updateTaskOutbox.sendAsync("UpdateClean", "");
|
||||||
|
feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN, messageId);
|
||||||
|
|
||||||
// Reset the refresh count after a clean update
|
yield new UpdateClean(messageId);
|
||||||
yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), 0);
|
}
|
||||||
|
case UpdateClean(long msgId) -> {
|
||||||
|
var rsp = updateTaskOutbox.waitResponse(msgId, 12, TimeUnit.HOURS);
|
||||||
|
if (rsp.state() != MqMessageState.OK) {
|
||||||
|
// Retry the update
|
||||||
|
yield new Error("Failed to clean feeds: " + rsp.state());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// Reset the refresh count after a successful update
|
||||||
|
yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
default -> new Error("Unknown actor step: " + self);
|
default -> new Error("Unknown actor step: " + self);
|
||||||
};
|
};
|
||||||
|
@ -20,6 +20,7 @@ dependencies {
|
|||||||
implementation project(':code:common:model')
|
implementation project(':code:common:model')
|
||||||
implementation project(':code:common:config')
|
implementation project(':code:common:config')
|
||||||
implementation project(':code:common:service')
|
implementation project(':code:common:service')
|
||||||
|
implementation project(':code:libraries:message-queue')
|
||||||
|
|
||||||
implementation libs.bundles.slf4j
|
implementation libs.bundles.slf4j
|
||||||
|
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package nu.marginalia.api.feeds;
|
package nu.marginalia.api.feeds;
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import nu.marginalia.mq.outbox.MqOutbox;
|
||||||
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
||||||
import nu.marginalia.service.client.GrpcSingleNodeChannelPool;
|
import nu.marginalia.service.client.GrpcSingleNodeChannelPool;
|
||||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||||
@ -8,6 +10,7 @@ import nu.marginalia.service.discovery.property.ServicePartition;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
@ -16,15 +19,23 @@ public class FeedsClient {
|
|||||||
private static final Logger logger = LoggerFactory.getLogger(FeedsClient.class);
|
private static final Logger logger = LoggerFactory.getLogger(FeedsClient.class);
|
||||||
private final ExecutorService executorService = Executors.newCachedThreadPool();
|
private final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||||
private final GrpcSingleNodeChannelPool<FeedApiGrpc.FeedApiBlockingStub> channelPool;
|
private final GrpcSingleNodeChannelPool<FeedApiGrpc.FeedApiBlockingStub> channelPool;
|
||||||
|
private final MqPersistence mqPersistence;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public FeedsClient(GrpcChannelPoolFactory factory) {
|
public FeedsClient(GrpcChannelPoolFactory factory, MqPersistence mqPersistence) {
|
||||||
|
this.mqPersistence = mqPersistence;
|
||||||
// The client is only interested in the primary node
|
// The client is only interested in the primary node
|
||||||
var key = ServiceKey.forGrpcApi(FeedApiGrpc.class, ServicePartition.any());
|
var key = ServiceKey.forGrpcApi(FeedApiGrpc.class, ServicePartition.any());
|
||||||
this.channelPool = factory.createSingle(key, FeedApiGrpc::newBlockingStub);
|
this.channelPool = factory.createSingle(key, FeedApiGrpc::newBlockingStub);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/** Create an appropriately named outbox for the update actor requests */
|
||||||
|
public MqOutbox createOutbox(String callerName, int outboxNodeId) {
|
||||||
|
return new MqOutbox(mqPersistence, "update-rss-feeds", 1, callerName, outboxNodeId, UUID.randomUUID());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public CompletableFuture<RpcFeed> getFeed(int domainId) {
|
public CompletableFuture<RpcFeed> getFeed(int domainId) {
|
||||||
try {
|
try {
|
||||||
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeed)
|
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeed)
|
||||||
@ -36,10 +47,14 @@ public class FeedsClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateFeeds(RpcFeedUpdateMode mode) {
|
public void updateFeeds(RpcFeedUpdateMode mode, long msgId) {
|
||||||
try {
|
try {
|
||||||
channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds)
|
channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds)
|
||||||
.run(RpcUpdateRequest.newBuilder().setMode(mode).build());
|
.run(RpcUpdateRequest.newBuilder()
|
||||||
|
.setMode(mode)
|
||||||
|
.setMsgId(msgId)
|
||||||
|
.build()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
logger.error("API Exception", e);
|
logger.error("API Exception", e);
|
||||||
|
@ -16,6 +16,8 @@ message RpcDomainId {
|
|||||||
|
|
||||||
message RpcUpdateRequest {
|
message RpcUpdateRequest {
|
||||||
RpcFeedUpdateMode mode = 1;
|
RpcFeedUpdateMode mode = 1;
|
||||||
|
int64 msgId = 2; // Id for a message on the message queue, will be replied to with a dummy response when the task is done,
|
||||||
|
// if the message id is not positive, no response will be attempted to be sent.
|
||||||
}
|
}
|
||||||
|
|
||||||
enum RpcFeedUpdateMode {
|
enum RpcFeedUpdateMode {
|
||||||
|
@ -21,6 +21,7 @@ dependencies {
|
|||||||
implementation project(':code:common:model')
|
implementation project(':code:common:model')
|
||||||
implementation project(':code:common:db')
|
implementation project(':code:common:db')
|
||||||
implementation project(':code:libraries:blocking-thread-pool')
|
implementation project(':code:libraries:blocking-thread-pool')
|
||||||
|
implementation project(':code:libraries:message-queue')
|
||||||
|
|
||||||
implementation project(':code:execution:api')
|
implementation project(':code:execution:api')
|
||||||
|
|
||||||
|
@ -81,8 +81,7 @@ public class FeedFetcherService {
|
|||||||
public void updateFeeds(UpdateMode updateMode) throws IOException {
|
public void updateFeeds(UpdateMode updateMode) throws IOException {
|
||||||
if (updating) // Prevent concurrent updates
|
if (updating) // Prevent concurrent updates
|
||||||
{
|
{
|
||||||
logger.error("Already updating feeds, refusing to start another update");
|
throw new IllegalStateException("Already updating feeds, refusing to start another update");
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try (var writer = feedDb.createWriter();
|
try (var writer = feedDb.createWriter();
|
||||||
|
@ -5,18 +5,21 @@ import io.grpc.stub.StreamObserver;
|
|||||||
import nu.marginalia.api.feeds.*;
|
import nu.marginalia.api.feeds.*;
|
||||||
import nu.marginalia.db.DbDomainQueries;
|
import nu.marginalia.db.DbDomainQueries;
|
||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
|
import nu.marginalia.mq.task.MqLongRunningTask;
|
||||||
|
import nu.marginalia.mq.task.MqTaskResult;
|
||||||
import nu.marginalia.rss.db.FeedDb;
|
import nu.marginalia.rss.db.FeedDb;
|
||||||
import nu.marginalia.rss.model.FeedItems;
|
import nu.marginalia.rss.model.FeedItems;
|
||||||
import nu.marginalia.service.server.DiscoverableService;
|
import nu.marginalia.service.server.DiscoverableService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements DiscoverableService {
|
public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements DiscoverableService {
|
||||||
private final FeedDb feedDb;
|
private final FeedDb feedDb;
|
||||||
private final DbDomainQueries domainQueries;
|
private final DbDomainQueries domainQueries;
|
||||||
|
private final MqPersistence mqPersistence;
|
||||||
private final FeedFetcherService feedFetcherService;
|
private final FeedFetcherService feedFetcherService;
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(FeedsGrpcService.class);
|
private static final Logger logger = LoggerFactory.getLogger(FeedsGrpcService.class);
|
||||||
@ -24,9 +27,11 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
|
|||||||
@Inject
|
@Inject
|
||||||
public FeedsGrpcService(FeedDb feedDb,
|
public FeedsGrpcService(FeedDb feedDb,
|
||||||
DbDomainQueries domainQueries,
|
DbDomainQueries domainQueries,
|
||||||
|
MqPersistence mqPersistence,
|
||||||
FeedFetcherService feedFetcherService) {
|
FeedFetcherService feedFetcherService) {
|
||||||
this.feedDb = feedDb;
|
this.feedDb = feedDb;
|
||||||
this.domainQueries = domainQueries;
|
this.domainQueries = domainQueries;
|
||||||
|
this.mqPersistence = mqPersistence;
|
||||||
this.feedFetcherService = feedFetcherService;
|
this.feedFetcherService = feedFetcherService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -46,13 +51,14 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
|
|||||||
default -> throw new IllegalStateException("Unexpected value: " + request.getMode());
|
default -> throw new IllegalStateException("Unexpected value: " + request.getMode());
|
||||||
};
|
};
|
||||||
|
|
||||||
Thread.ofPlatform().start(() -> {
|
// Start a long-running task to update the feeds
|
||||||
try {
|
MqLongRunningTask
|
||||||
feedFetcherService.updateFeeds(updateMode);
|
.of(request.getMsgId(), "updateFeeds", mqPersistence)
|
||||||
} catch (IOException e) {
|
.asThread(() -> {
|
||||||
logger.error("Failed to update feeds", e);
|
feedFetcherService.updateFeeds(updateMode);
|
||||||
}
|
return new MqTaskResult.Success();
|
||||||
});
|
})
|
||||||
|
.start();
|
||||||
|
|
||||||
responseObserver.onNext(Empty.getDefaultInstance());
|
responseObserver.onNext(Empty.getDefaultInstance());
|
||||||
responseObserver.onCompleted();
|
responseObserver.onCompleted();
|
||||||
|
@ -5,8 +5,10 @@ import com.google.gson.Gson;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
import nu.marginalia.mq.MqMessageState;
|
|
||||||
import nu.marginalia.mq.MqMessage;
|
import nu.marginalia.mq.MqMessage;
|
||||||
|
import nu.marginalia.mq.MqMessageState;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
@ -24,6 +26,8 @@ public class MqPersistence {
|
|||||||
private final HikariDataSource dataSource;
|
private final HikariDataSource dataSource;
|
||||||
private final Gson gson;
|
private final Gson gson;
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(MqPersistence.class);
|
||||||
|
|
||||||
public MqPersistence(HikariDataSource dataSource) {
|
public MqPersistence(HikariDataSource dataSource) {
|
||||||
this.dataSource = dataSource;
|
this.dataSource = dataSource;
|
||||||
this.gson = null;
|
this.gson = null;
|
||||||
@ -197,6 +201,20 @@ public class MqPersistence {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Sends an error response to the message with the given id, this is a convenience wrapper for
|
||||||
|
* sendResponse() that send a generic error message. The message will be marked as 'ERR'.
|
||||||
|
* <p></p>
|
||||||
|
* If an exception is thrown while sending the response, it will be logged, but not rethrown
|
||||||
|
* to avoid creating exception handling pyramids. At this point, we've already given it a college try.
|
||||||
|
* */
|
||||||
|
public void sendErrorResponse(long msgId, String message, Throwable e) {
|
||||||
|
try {
|
||||||
|
sendResponse(msgId, MqMessageState.ERR, message + ": " + e.getMessage());
|
||||||
|
} catch (SQLException ex) {
|
||||||
|
logger.error("Failed to send error response", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/** Marks unclaimed messages addressed to this inbox with instanceUUID and tick,
|
/** Marks unclaimed messages addressed to this inbox with instanceUUID and tick,
|
||||||
* then returns the number of messages marked. This is an atomic operation that
|
* then returns the number of messages marked. This is an atomic operation that
|
||||||
|
@ -0,0 +1,162 @@
|
|||||||
|
package nu.marginalia.mq.task;
|
||||||
|
|
||||||
|
import nu.marginalia.mq.MqMessageState;
|
||||||
|
import nu.marginalia.mq.persistence.MqPersistence;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
/** A long-running task that can be executed asynchronously
|
||||||
|
* and report back to the message queue.
|
||||||
|
* <p></p>
|
||||||
|
* The idiomatic pattern is to create an outbox and send a message corresponding to the task,
|
||||||
|
* and then pass the message id along with the request to trigger the task over gRPC.
|
||||||
|
* <p></p>
|
||||||
|
* The gRPC service will spin off a thread and return immediately, while the task is executed
|
||||||
|
* in the background. The task can then report back to the message queue with the result
|
||||||
|
* of the task as it completes.
|
||||||
|
* */
|
||||||
|
public abstract class MqLongRunningTask implements Runnable {
|
||||||
|
|
||||||
|
/** Create a new task with the given message id, name, and persistence. If the msgId is
|
||||||
|
* not positive, a dummy implementation is provided that does not report to the message queue.
|
||||||
|
*/
|
||||||
|
public static MqLongRunningTask of(long msgId, String name, MqPersistence persistence) {
|
||||||
|
if (msgId <= 0) {
|
||||||
|
return new MqLongRunningTaskDummyImpl(name);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return new MqLongRunningTaskImpl(persistence, msgId, name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Creates a thread that will execute the task. The thread is not started automatically */
|
||||||
|
public Thread asThread(MqTaskFunction r) {
|
||||||
|
return Thread.ofPlatform().name(name()).start(() -> runNow(r));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Creates a future that will execute the task on the provided ExecutorService. */
|
||||||
|
public Future<Boolean> asFuture(ExecutorService executor, MqTaskFunction r) {
|
||||||
|
return executor.submit(() -> runNow(r));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Execute the task synchronously and return true if the task was successful */
|
||||||
|
public boolean runNow(MqTaskFunction r) {
|
||||||
|
try {
|
||||||
|
switch (r.run()) {
|
||||||
|
case MqTaskResult.Success success -> {
|
||||||
|
finish(success.message());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
case MqTaskResult.Failure failure -> fail(failure.message());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
fail(e);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract void finish();
|
||||||
|
abstract void finish(String message);
|
||||||
|
|
||||||
|
abstract void fail(String message);
|
||||||
|
abstract void fail(Throwable e);
|
||||||
|
|
||||||
|
public abstract String name();
|
||||||
|
}
|
||||||
|
|
||||||
|
class MqLongRunningTaskDummyImpl extends MqLongRunningTask {
|
||||||
|
private final String name;
|
||||||
|
|
||||||
|
MqLongRunningTaskDummyImpl(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void finish() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void finish(String message) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void fail(String message) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void fail(Throwable e) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String name() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class MqLongRunningTaskImpl extends MqLongRunningTask {
|
||||||
|
private final MqPersistence persistence;
|
||||||
|
private final long msgId;
|
||||||
|
private final String name;
|
||||||
|
|
||||||
|
MqLongRunningTaskImpl(MqPersistence persistence, long msgId, String name) {
|
||||||
|
this.persistence = persistence;
|
||||||
|
this.msgId = msgId;
|
||||||
|
this.name = name;
|
||||||
|
|
||||||
|
try {
|
||||||
|
persistence.updateMessageState(msgId, MqMessageState.ACK);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void finish() {
|
||||||
|
try {
|
||||||
|
persistence.sendResponse(msgId, MqMessageState.OK, "Success");
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void finish(String message) {
|
||||||
|
try {
|
||||||
|
persistence.sendResponse(msgId, MqMessageState.OK, message);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void fail(String message) {
|
||||||
|
try {
|
||||||
|
persistence.sendResponse(msgId, MqMessageState.ERR, message);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void fail(Throwable e) {
|
||||||
|
try {
|
||||||
|
persistence.sendResponse(msgId, MqMessageState.ERR, e.getMessage());
|
||||||
|
}
|
||||||
|
catch (Exception e2) {
|
||||||
|
throw new RuntimeException(e2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String name() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,5 @@
|
|||||||
|
package nu.marginalia.mq.task;
|
||||||
|
|
||||||
|
public interface MqTaskFunction {
|
||||||
|
MqTaskResult run() throws Exception;
|
||||||
|
}
|
@ -0,0 +1,14 @@
|
|||||||
|
package nu.marginalia.mq.task;
|
||||||
|
|
||||||
|
public sealed interface MqTaskResult {
|
||||||
|
record Success(String message) implements MqTaskResult {
|
||||||
|
public Success(){
|
||||||
|
this("Ok");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
record Failure(String message) implements MqTaskResult {
|
||||||
|
public Failure(Throwable e){
|
||||||
|
this(e.getClass().getSimpleName() + " : " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user