From e24a98390cc3b866091f3a6af053da6ccf893629 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Sat, 9 Nov 2024 18:43:47 +0100 Subject: [PATCH] (feed) Update API to allow specifying clean vs refresh update Move the logic deciding which operation to perform into the actor, updating its state graph to incorporate a counter that runs a clean update once in a blue moon. --- .../marginalia/actor/proc/UpdateRssActor.java | 40 ++++++++++++++----- .../nu/marginalia/api/feeds/FeedsClient.java | 4 +- .../api/src/main/protobuf/feeds.proto | 11 ++++- .../rss/svc/FeedFetcherService.java | 16 ++++---- .../marginalia/rss/svc/FeedsGrpcService.java | 17 ++++---- 5 files changed, 61 insertions(+), 27 deletions(-) diff --git a/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java b/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java index 7774e6ab..7b1441f7 100644 --- a/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java +++ b/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java @@ -7,6 +7,7 @@ import nu.marginalia.actor.state.ActorResumeBehavior; import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.Resume; import nu.marginalia.api.feeds.FeedsClient; +import nu.marginalia.api.feeds.RpcFeedUpdateMode; import nu.marginalia.service.module.ServiceConfiguration; import java.time.Duration; @@ -19,6 +20,7 @@ public class UpdateRssActor extends RecordActorPrototype { private final Duration initialDelay = Duration.ofMinutes(5); private final Duration updateInterval = Duration.ofHours(24); + private final int cleanInterval = 60; @Inject public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration) { @@ -29,10 +31,11 @@ public class UpdateRssActor extends RecordActorPrototype { public record Initial() implements ActorStep {} @Resume(behavior = ActorResumeBehavior.RETRY) - public record Wait(String ts) implements ActorStep {} + public record Wait(String ts, int refreshCount) implements ActorStep {} @Resume(behavior = ActorResumeBehavior.RESTART) - public record Update() implements ActorStep {} - + public record UpdateRefresh(int refreshCount) implements ActorStep {} + @Resume(behavior = ActorResumeBehavior.RESTART) + public record UpdateClean() implements ActorStep {} @Override public ActorStep transition(ActorStep self) throws Exception { @@ -44,10 +47,10 @@ public class UpdateRssActor extends RecordActorPrototype { } else { // Wait for 5 minutes before starting the first update, to give the system time to start up properly - yield new Wait(LocalDateTime.now().plus(initialDelay).toString()); + yield new Wait(LocalDateTime.now().plus(initialDelay).toString(), 0); } } - case Wait(String untilTs) -> { + case Wait(String untilTs, int count) -> { var until = LocalDateTime.parse(untilTs); var now = LocalDateTime.now(); @@ -55,15 +58,32 @@ public class UpdateRssActor extends RecordActorPrototype { if (remaining > 0) { Thread.sleep(remaining); - yield new Wait(untilTs); + yield new Wait(untilTs, count); } else { - yield new Update(); + + // Once every `cleanInterval` updates, do a clean update; + // otherwise do a refresh update + if (count > cleanInterval) { + yield new UpdateClean(); + } + else { + yield new UpdateRefresh(count); + } + } } - case Update() -> { - feedsClient.updateFeeds(); - yield new Wait(LocalDateTime.now().plus(updateInterval).toString()); + case UpdateRefresh(int count) -> { + feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH); + + // Increment the refresh count and schedule the next update + yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), count + 1); + } + case UpdateClean() -> { + feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN); + + // Reset the refresh count after a clean update + yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), 0); } default -> new Error("Unknown actor step: " + self); }; diff --git a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java index 4ec5a199..be3f93dd 100644 --- a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java +++ b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java @@ -36,10 +36,10 @@ public class FeedsClient { } } - public void updateFeeds() { + public void updateFeeds(RpcFeedUpdateMode mode) { try { channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds) - .run(Empty.getDefaultInstance()); + .run(RpcUpdateRequest.newBuilder().setMode(mode).build()); } catch (Exception e) { logger.error("API Exception", e); diff --git a/code/functions/live-capture/api/src/main/protobuf/feeds.proto b/code/functions/live-capture/api/src/main/protobuf/feeds.proto index d31d9d47..2417afc1 100644 --- a/code/functions/live-capture/api/src/main/protobuf/feeds.proto +++ b/code/functions/live-capture/api/src/main/protobuf/feeds.proto @@ -7,13 +7,22 @@ option java_multiple_files=true; service FeedApi { rpc getFeed(RpcDomainId) returns (RpcFeed) {} - rpc updateFeeds(Empty) returns (Empty) {} + rpc updateFeeds(RpcUpdateRequest) returns (Empty) {} } message RpcDomainId { int32 domainId = 1; } +message RpcUpdateRequest { + RpcFeedUpdateMode mode = 1; +} + +enum RpcFeedUpdateMode { + CLEAN = 0; // Start over with a new database from system rss exports + REFRESH = 1; // Refresh known feeds +} + message RpcFeed { int32 domainId = 1; string domain = 2; diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java index f9ca3079..288e0457 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.net.http.HttpClient; import java.sql.SQLException; import java.time.Duration; -import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; @@ -74,7 +73,12 @@ public class FeedFetcherService { rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher"); } - public void updateFeeds() throws IOException { + public enum UpdateMode { + CLEAN, + REFRESH + }; + + public void updateFeeds(UpdateMode updateMode) throws IOException { if (updating) // Prevent concurrent updates { logger.error("Already updating feeds, refusing to start another update"); @@ -91,11 +95,9 @@ public class FeedFetcherService { Collection definitions = feedDb.getAllFeeds(); - // If we didn't get any definitions, or approximately every other month, read them from the system - // to get the latest feeds. As the feeds known by the system have a lot of dead links, we don't - // want to do this too often. - final LocalDate today = LocalDate.now(); - if (definitions == null || (today.getDayOfMonth() == 1 && (today.getMonthValue() % 2) == 0)) { + // If we didn't get any definitions, or a clean update is requested, read the definitions from the system + // instead + if (definitions == null || updateMode == UpdateMode.CLEAN) { definitions = readDefinitionsFromSystem(); } diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java index d833f9bd..6944fc24 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java @@ -2,10 +2,7 @@ package nu.marginalia.rss.svc; import com.google.inject.Inject; import io.grpc.stub.StreamObserver; -import nu.marginalia.api.feeds.Empty; -import nu.marginalia.api.feeds.FeedApiGrpc; -import nu.marginalia.api.feeds.RpcDomainId; -import nu.marginalia.api.feeds.RpcFeed; +import nu.marginalia.api.feeds.*; import nu.marginalia.db.DbDomainQueries; import nu.marginalia.model.EdgeDomain; import nu.marginalia.rss.db.FeedDb; @@ -40,12 +37,18 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis } @Override - public void updateFeeds(Empty request, - StreamObserver responseObserver) + public void updateFeeds(RpcUpdateRequest request, + StreamObserver responseObserver) { + FeedFetcherService.UpdateMode updateMode = switch(request.getMode()) { + case CLEAN -> FeedFetcherService.UpdateMode.CLEAN; + case REFRESH -> FeedFetcherService.UpdateMode.REFRESH; + default -> throw new IllegalStateException("Unexpected value: " + request.getMode()); + }; + Thread.ofPlatform().start(() -> { try { - feedFetcherService.updateFeeds(); + feedFetcherService.updateFeeds(updateMode); } catch (IOException e) { logger.error("Failed to update feeds", e); }