(rss) Add endpoint for extracting URLs changed withing a timespan.

This commit is contained in:
Viktor Lofgren 2024-11-18 14:59:32 +01:00
parent d874d76a09
commit c728a1e2f2
8 changed files with 135 additions and 24 deletions

View File

@ -11,10 +11,15 @@ import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.module.ServiceConfiguration;
import javax.annotation.CheckReturnValue;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Singleton
public class FeedsClient {
@ -46,17 +51,23 @@ public class FeedsClient {
}
}
public void getUpdatedDomains(Instant since, Consumer<UpdatedDomain> consumer) throws ExecutionException, InterruptedException {
channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getUpdatedLinks)
.run(RpcUpdatedLinksRequest.newBuilder().setSinceEpochMillis(since.toEpochMilli()).build())
.forEachRemaining(rsp -> consumer.accept(new UpdatedDomain(rsp)));
}
public record UpdatedDomain(String domain, List<String> urls) {
public UpdatedDomain(RpcUpdatedLinksResponse rsp) {
this(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()));
}
}
/** Get the hash of the feed data, for identifying when the data has been updated */
public CompletableFuture<String> getFeedDataHash() {
try {
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)
.async(executorService)
.run(Empty.getDefaultInstance())
.thenApply(RpcFeedDataHash::getHash);
}
catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
public String getFeedDataHash() {
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)
.run(Empty.getDefaultInstance())
.getHash();
}
/** Update the feeds, return a message ID for the update */

View File

@ -9,6 +9,16 @@ service FeedApi {
rpc getFeed(RpcDomainId) returns (RpcFeed) {}
rpc getFeedDataHash(Empty) returns (RpcFeedDataHash) {}
rpc updateFeeds(RpcUpdateRequest) returns (Empty) {}
rpc getUpdatedLinks(RpcUpdatedLinksRequest) returns (stream RpcUpdatedLinksResponse) {}
}
message RpcUpdatedLinksRequest {
int64 sinceEpochMillis = 1;
}
message RpcUpdatedLinksResponse {
string domain = 1;
repeated string url = 2;
}
message RpcFeedDataHash {

View File

@ -16,9 +16,11 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.MessageDigest;
import java.time.Instant;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
@Singleton
public class FeedDb {
@ -171,4 +173,19 @@ public class FeedDb {
return Base64.getEncoder().encodeToString(digest.digest());
}
public void getLinksUpdatedSince(Instant since, BiConsumer<String, List<String>> consumer) throws Exception {
if (!feedDbEnabled) {
throw new IllegalStateException("Feed database is disabled on this node");
}
// Capture the current reader to avoid concurrency issues
FeedDbReader reader = this.reader;
if (reader == null) {
throw new NullPointerException("Reader is not available");
}
reader.getLinksUpdatedSince(since, consumer);
}
}

View File

@ -12,9 +12,11 @@ import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
public class FeedDbReader implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(FeedDbReader.class);
@ -99,4 +101,27 @@ public class FeedDbReader implements AutoCloseable {
}
public void getLinksUpdatedSince(Instant since, BiConsumer<String, List<String>> consumer) {
try (var stmt = connection.prepareStatement("SELECT FEED FROM feed")) {
var rs = stmt.executeQuery();
while (rs.next()) {
FeedItems items = deserialize(rs.getString(1));
List<String> urls = new ArrayList<>();
for (var item : items.items()) {
if (item.getUpdateTimeZD().toInstant().isAfter(since)) {
urls.add(item.url());
}
}
if (!urls.isEmpty()) {
consumer.accept(items.domain(), new ArrayList<>(urls));
urls.clear();
}
}
} catch (SQLException e) {
logger.error("Error getting updated links", e);
}
}
}

View File

@ -55,6 +55,11 @@ public record FeedItem(String title,
return zonedDateTime.map(date -> date.format(DATE_FORMAT)).orElse("");
}
public ZonedDateTime getUpdateTimeZD() {
return ZonedDateTime.parse(date, DATE_FORMAT);
}
@Override
public int compareTo(@NotNull FeedItem o) {
return o.date.compareTo(date);

View File

@ -1,7 +1,6 @@
package nu.marginalia.rss.model;
import java.util.List;
import java.util.Optional;
public record FeedItems(String domain,
String feedUrl,
@ -17,17 +16,4 @@ public record FeedItems(String domain,
public boolean isEmpty() {
return items.isEmpty();
}
public Optional<FeedItem> getLatest() {
if (items.isEmpty())
return Optional.empty();
return Optional.of(
items.getFirst()
);
}
public Optional<String> getLatestDate() {
return getLatest().map(FeedItem::date);
}
}

View File

@ -14,6 +14,8 @@ import nu.marginalia.service.server.DiscoverableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements DiscoverableService {
@ -82,6 +84,27 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
}
}
@Override
public void getUpdatedLinks(RpcUpdatedLinksRequest request, StreamObserver<RpcUpdatedLinksResponse> responseObserver) {
Instant since = Instant.ofEpochMilli(request.getSinceEpochMillis());
try {
feedDb.getLinksUpdatedSince(since, (String domain, List<String> urls) -> {
RpcUpdatedLinksResponse rsp = RpcUpdatedLinksResponse.newBuilder()
.setDomain(domain)
.addAllUrl(urls)
.build();
responseObserver.onNext(rsp);
});
responseObserver.onCompleted();
}
catch (Exception e) {
logger.error("Error getting updated links", e);
responseObserver.onError(e);
}
}
@Override
public void getFeed(RpcDomainId request,
StreamObserver<RpcFeed> responseObserver)

View File

@ -0,0 +1,34 @@
package nu.marginalia.rss.db;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class FeedDbReaderTest {
@Tag("flaky") // will only work on ~vlofgren, not on CI; remove test when this feature is stable
@Test
void getLinksUpdatedSince() throws SQLException {
var reader = new FeedDbReader(Path.of("/home/vlofgren/rss-feeds.db"));
Map<String, List<String>> links = new HashMap<>();
reader.getLinksUpdatedSince(Instant.now().minus(10, ChronoUnit.DAYS), links::put);
System.out.println(links.size());
for (var link : links.values()) {
if (link.size() < 2) {
System.out.println(link);
}
}
reader.close();
}
}