(feed) Update API to allow specifying clean vs refresh update

Move the logic deciding which operation to perform into the actor, updating its state graph to incorporate a counter that runs a clean update once in a blue moon.
This commit is contained in:
Viktor Lofgren 2024-11-09 18:43:47 +01:00
parent 6f858cd627
commit e24a98390c
5 changed files with 61 additions and 27 deletions

View File

@ -7,6 +7,7 @@ import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume; import nu.marginalia.actor.state.Resume;
import nu.marginalia.api.feeds.FeedsClient; import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.api.feeds.RpcFeedUpdateMode;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
import java.time.Duration; import java.time.Duration;
@ -19,6 +20,7 @@ public class UpdateRssActor extends RecordActorPrototype {
private final Duration initialDelay = Duration.ofMinutes(5); private final Duration initialDelay = Duration.ofMinutes(5);
private final Duration updateInterval = Duration.ofHours(24); private final Duration updateInterval = Duration.ofHours(24);
private final int cleanInterval = 60;
@Inject @Inject
public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration) { public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration) {
@ -29,10 +31,11 @@ public class UpdateRssActor extends RecordActorPrototype {
public record Initial() implements ActorStep {} public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY) @Resume(behavior = ActorResumeBehavior.RETRY)
public record Wait(String ts) implements ActorStep {} public record Wait(String ts, int refreshCount) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RESTART) @Resume(behavior = ActorResumeBehavior.RESTART)
public record Update() implements ActorStep {} public record UpdateRefresh(int refreshCount) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RESTART)
public record UpdateClean() implements ActorStep {}
@Override @Override
public ActorStep transition(ActorStep self) throws Exception { public ActorStep transition(ActorStep self) throws Exception {
@ -44,10 +47,10 @@ public class UpdateRssActor extends RecordActorPrototype {
} }
else { else {
// Wait for 5 minutes before starting the first update, to give the system time to start up properly // Wait for 5 minutes before starting the first update, to give the system time to start up properly
yield new Wait(LocalDateTime.now().plus(initialDelay).toString()); yield new Wait(LocalDateTime.now().plus(initialDelay).toString(), 0);
} }
} }
case Wait(String untilTs) -> { case Wait(String untilTs, int count) -> {
var until = LocalDateTime.parse(untilTs); var until = LocalDateTime.parse(untilTs);
var now = LocalDateTime.now(); var now = LocalDateTime.now();
@ -55,15 +58,32 @@ public class UpdateRssActor extends RecordActorPrototype {
if (remaining > 0) { if (remaining > 0) {
Thread.sleep(remaining); Thread.sleep(remaining);
yield new Wait(untilTs); yield new Wait(untilTs, count);
} }
else { else {
yield new Update();
// Once every `cleanInterval` updates, do a clean update;
// otherwise do a refresh update
if (count > cleanInterval) {
yield new UpdateClean();
}
else {
yield new UpdateRefresh(count);
}
} }
} }
case Update() -> { case UpdateRefresh(int count) -> {
feedsClient.updateFeeds(); feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH);
yield new Wait(LocalDateTime.now().plus(updateInterval).toString());
// Increment the refresh count and schedule the next update
yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), count + 1);
}
case UpdateClean() -> {
feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN);
// Reset the refresh count after a clean update
yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), 0);
} }
default -> new Error("Unknown actor step: " + self); default -> new Error("Unknown actor step: " + self);
}; };

View File

@ -36,10 +36,10 @@ public class FeedsClient {
} }
} }
public void updateFeeds() { public void updateFeeds(RpcFeedUpdateMode mode) {
try { try {
channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds) channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds)
.run(Empty.getDefaultInstance()); .run(RpcUpdateRequest.newBuilder().setMode(mode).build());
} }
catch (Exception e) { catch (Exception e) {
logger.error("API Exception", e); logger.error("API Exception", e);

View File

@ -7,13 +7,22 @@ option java_multiple_files=true;
service FeedApi { service FeedApi {
rpc getFeed(RpcDomainId) returns (RpcFeed) {} rpc getFeed(RpcDomainId) returns (RpcFeed) {}
rpc updateFeeds(Empty) returns (Empty) {} rpc updateFeeds(RpcUpdateRequest) returns (Empty) {}
} }
message RpcDomainId { message RpcDomainId {
int32 domainId = 1; int32 domainId = 1;
} }
message RpcUpdateRequest {
RpcFeedUpdateMode mode = 1;
}
enum RpcFeedUpdateMode {
CLEAN = 0; // Start over with a new database from system rss exports
REFRESH = 1; // Refresh known feeds
}
message RpcFeed { message RpcFeed {
int32 domainId = 1; int32 domainId = 1;
string domain = 2; string domain = 2;

View File

@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.http.HttpClient; import java.net.http.HttpClient;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Duration; import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
@ -74,7 +73,12 @@ public class FeedFetcherService {
rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher"); rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher");
} }
public void updateFeeds() throws IOException { public enum UpdateMode {
CLEAN,
REFRESH
};
public void updateFeeds(UpdateMode updateMode) throws IOException {
if (updating) // Prevent concurrent updates if (updating) // Prevent concurrent updates
{ {
logger.error("Already updating feeds, refusing to start another update"); logger.error("Already updating feeds, refusing to start another update");
@ -91,11 +95,9 @@ public class FeedFetcherService {
Collection<FeedDefinition> definitions = feedDb.getAllFeeds(); Collection<FeedDefinition> definitions = feedDb.getAllFeeds();
// If we didn't get any definitions, or approximately every other month, read them from the system // If we didn't get any definitions, or a clean update is requested, read the definitions from the system
// to get the latest feeds. As the feeds known by the system have a lot of dead links, we don't // instead
// want to do this too often. if (definitions == null || updateMode == UpdateMode.CLEAN) {
final LocalDate today = LocalDate.now();
if (definitions == null || (today.getDayOfMonth() == 1 && (today.getMonthValue() % 2) == 0)) {
definitions = readDefinitionsFromSystem(); definitions = readDefinitionsFromSystem();
} }

View File

@ -2,10 +2,7 @@ package nu.marginalia.rss.svc;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import nu.marginalia.api.feeds.Empty; import nu.marginalia.api.feeds.*;
import nu.marginalia.api.feeds.FeedApiGrpc;
import nu.marginalia.api.feeds.RpcDomainId;
import nu.marginalia.api.feeds.RpcFeed;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.rss.db.FeedDb; import nu.marginalia.rss.db.FeedDb;
@ -40,12 +37,18 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
} }
@Override @Override
public void updateFeeds(Empty request, public void updateFeeds(RpcUpdateRequest request,
StreamObserver<Empty> responseObserver) StreamObserver<Empty> responseObserver)
{ {
FeedFetcherService.UpdateMode updateMode = switch(request.getMode()) {
case CLEAN -> FeedFetcherService.UpdateMode.CLEAN;
case REFRESH -> FeedFetcherService.UpdateMode.REFRESH;
default -> throw new IllegalStateException("Unexpected value: " + request.getMode());
};
Thread.ofPlatform().start(() -> { Thread.ofPlatform().start(() -> {
try { try {
feedFetcherService.updateFeeds(); feedFetcherService.updateFeeds(updateMode);
} catch (IOException e) { } catch (IOException e) {
logger.error("Failed to update feeds", e); logger.error("Failed to update feeds", e);
} }