From c728a1e2f2d00be172f54a7a4134e0268f0ea2ff Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 18 Nov 2024 14:59:32 +0100 Subject: [PATCH] (rss) Add endpoint for extracting URLs changed withing a timespan. --- .../nu/marginalia/api/feeds/FeedsClient.java | 31 +++++++++++------ .../api/src/main/protobuf/feeds.proto | 10 ++++++ .../java/nu/marginalia/rss/db/FeedDb.java | 17 ++++++++++ .../nu/marginalia/rss/db/FeedDbReader.java | 25 ++++++++++++++ .../nu/marginalia/rss/model/FeedItem.java | 5 +++ .../nu/marginalia/rss/model/FeedItems.java | 14 -------- .../marginalia/rss/svc/FeedsGrpcService.java | 23 +++++++++++++ .../marginalia/rss/db/FeedDbReaderTest.java | 34 +++++++++++++++++++ 8 files changed, 135 insertions(+), 24 deletions(-) create mode 100644 code/functions/live-capture/test/nu/marginalia/rss/db/FeedDbReaderTest.java diff --git a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java index 309926d1..c66e34c9 100644 --- a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java +++ b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java @@ -11,10 +11,15 @@ import nu.marginalia.service.discovery.property.ServicePartition; import nu.marginalia.service.module.ServiceConfiguration; import javax.annotation.CheckReturnValue; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; @Singleton public class FeedsClient { @@ -46,17 +51,23 @@ public class FeedsClient { } } + public void getUpdatedDomains(Instant since, Consumer consumer) throws ExecutionException, InterruptedException { + channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getUpdatedLinks) + .run(RpcUpdatedLinksRequest.newBuilder().setSinceEpochMillis(since.toEpochMilli()).build()) + .forEachRemaining(rsp -> consumer.accept(new UpdatedDomain(rsp))); + } + + public record UpdatedDomain(String domain, List urls) { + public UpdatedDomain(RpcUpdatedLinksResponse rsp) { + this(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())); + } + } + /** Get the hash of the feed data, for identifying when the data has been updated */ - public CompletableFuture getFeedDataHash() { - try { - return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash) - .async(executorService) - .run(Empty.getDefaultInstance()) - .thenApply(RpcFeedDataHash::getHash); - } - catch (Exception e) { - return CompletableFuture.failedFuture(e); - } + public String getFeedDataHash() { + return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash) + .run(Empty.getDefaultInstance()) + .getHash(); } /** Update the feeds, return a message ID for the update */ diff --git a/code/functions/live-capture/api/src/main/protobuf/feeds.proto b/code/functions/live-capture/api/src/main/protobuf/feeds.proto index 60bdee9d..c3512832 100644 --- a/code/functions/live-capture/api/src/main/protobuf/feeds.proto +++ b/code/functions/live-capture/api/src/main/protobuf/feeds.proto @@ -9,6 +9,16 @@ service FeedApi { rpc getFeed(RpcDomainId) returns (RpcFeed) {} rpc getFeedDataHash(Empty) returns (RpcFeedDataHash) {} rpc updateFeeds(RpcUpdateRequest) returns (Empty) {} + rpc getUpdatedLinks(RpcUpdatedLinksRequest) returns (stream RpcUpdatedLinksResponse) {} +} + +message RpcUpdatedLinksRequest { + int64 sinceEpochMillis = 1; +} + +message RpcUpdatedLinksResponse { + string domain = 1; + repeated string url = 2; } message RpcFeedDataHash { diff --git a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java index 38079933..921b4e9c 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java @@ -16,9 +16,11 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.security.MessageDigest; +import java.time.Instant; import java.util.Base64; import java.util.List; import java.util.Optional; +import java.util.function.BiConsumer; @Singleton public class FeedDb { @@ -171,4 +173,19 @@ public class FeedDb { return Base64.getEncoder().encodeToString(digest.digest()); } + + public void getLinksUpdatedSince(Instant since, BiConsumer> consumer) throws Exception { + if (!feedDbEnabled) { + throw new IllegalStateException("Feed database is disabled on this node"); + } + + // Capture the current reader to avoid concurrency issues + FeedDbReader reader = this.reader; + + if (reader == null) { + throw new NullPointerException("Reader is not available"); + } + + reader.getLinksUpdatedSince(since, consumer); + } } diff --git a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java index c810742d..c86ab8f4 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java @@ -12,9 +12,11 @@ import java.nio.file.Path; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.function.BiConsumer; public class FeedDbReader implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(FeedDbReader.class); @@ -99,4 +101,27 @@ public class FeedDbReader implements AutoCloseable { } + public void getLinksUpdatedSince(Instant since, BiConsumer> consumer) { + try (var stmt = connection.prepareStatement("SELECT FEED FROM feed")) { + var rs = stmt.executeQuery(); + + while (rs.next()) { + FeedItems items = deserialize(rs.getString(1)); + + List urls = new ArrayList<>(); + for (var item : items.items()) { + if (item.getUpdateTimeZD().toInstant().isAfter(since)) { + urls.add(item.url()); + } + } + + if (!urls.isEmpty()) { + consumer.accept(items.domain(), new ArrayList<>(urls)); + urls.clear(); + } + } + } catch (SQLException e) { + logger.error("Error getting updated links", e); + } + } } diff --git a/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItem.java b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItem.java index 119a32cb..e55ed6b6 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItem.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItem.java @@ -55,6 +55,11 @@ public record FeedItem(String title, return zonedDateTime.map(date -> date.format(DATE_FORMAT)).orElse(""); } + public ZonedDateTime getUpdateTimeZD() { + return ZonedDateTime.parse(date, DATE_FORMAT); + } + + @Override public int compareTo(@NotNull FeedItem o) { return o.date.compareTo(date); diff --git a/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItems.java b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItems.java index 3a8f0676..23a5d0f9 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItems.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItems.java @@ -1,7 +1,6 @@ package nu.marginalia.rss.model; import java.util.List; -import java.util.Optional; public record FeedItems(String domain, String feedUrl, @@ -17,17 +16,4 @@ public record FeedItems(String domain, public boolean isEmpty() { return items.isEmpty(); } - - public Optional getLatest() { - if (items.isEmpty()) - return Optional.empty(); - - return Optional.of( - items.getFirst() - ); - } - - public Optional getLatestDate() { - return getLatest().map(FeedItem::date); - } } diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java index d19d5b9d..1c5ca4a7 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java @@ -14,6 +14,8 @@ import nu.marginalia.service.server.DiscoverableService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; +import java.util.List; import java.util.Optional; public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements DiscoverableService { @@ -82,6 +84,27 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis } } + @Override + public void getUpdatedLinks(RpcUpdatedLinksRequest request, StreamObserver responseObserver) { + Instant since = Instant.ofEpochMilli(request.getSinceEpochMillis()); + + try { + feedDb.getLinksUpdatedSince(since, (String domain, List urls) -> { + RpcUpdatedLinksResponse rsp = RpcUpdatedLinksResponse.newBuilder() + .setDomain(domain) + .addAllUrl(urls) + .build(); + responseObserver.onNext(rsp); + }); + + responseObserver.onCompleted(); + } + catch (Exception e) { + logger.error("Error getting updated links", e); + responseObserver.onError(e); + } + } + @Override public void getFeed(RpcDomainId request, StreamObserver responseObserver) diff --git a/code/functions/live-capture/test/nu/marginalia/rss/db/FeedDbReaderTest.java b/code/functions/live-capture/test/nu/marginalia/rss/db/FeedDbReaderTest.java new file mode 100644 index 00000000..e698c03f --- /dev/null +++ b/code/functions/live-capture/test/nu/marginalia/rss/db/FeedDbReaderTest.java @@ -0,0 +1,34 @@ +package nu.marginalia.rss.db; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.nio.file.Path; +import java.sql.SQLException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class FeedDbReaderTest { + + @Tag("flaky") // will only work on ~vlofgren, not on CI; remove test when this feature is stable + @Test + void getLinksUpdatedSince() throws SQLException { + var reader = new FeedDbReader(Path.of("/home/vlofgren/rss-feeds.db")); + Map> links = new HashMap<>(); + + reader.getLinksUpdatedSince(Instant.now().minus(10, ChronoUnit.DAYS), links::put); + + System.out.println(links.size()); + for (var link : links.values()) { + if (link.size() < 2) { + System.out.println(link); + } + } + + reader.close(); + + } +} \ No newline at end of file