diff --git a/code/common/service/java/nu/marginalia/service/control/FakeServiceHeartbeat.java b/code/common/service/java/nu/marginalia/service/control/FakeServiceHeartbeat.java index c0c732b9..c0231bf5 100644 --- a/code/common/service/java/nu/marginalia/service/control/FakeServiceHeartbeat.java +++ b/code/common/service/java/nu/marginalia/service/control/FakeServiceHeartbeat.java @@ -11,4 +11,14 @@ public class FakeServiceHeartbeat implements ServiceHeartbeat { public void close() {} }; } + + @Override + public ServiceAdHocTaskHeartbeat createServiceAdHocTaskHeartbeat(String taskName) { + return new ServiceAdHocTaskHeartbeat() { + @Override + public void progress(String step, int stepProgress, int stepCount) {} + @Override + public void close() {} + }; + } } diff --git a/code/common/service/java/nu/marginalia/service/control/ServiceAdHocTaskHeartbeat.java b/code/common/service/java/nu/marginalia/service/control/ServiceAdHocTaskHeartbeat.java new file mode 100644 index 00000000..906477e2 --- /dev/null +++ b/code/common/service/java/nu/marginalia/service/control/ServiceAdHocTaskHeartbeat.java @@ -0,0 +1,7 @@ +package nu.marginalia.service.control; + +public interface ServiceAdHocTaskHeartbeat extends AutoCloseable { + void progress(String step, int progress, int total); + + void close(); +} diff --git a/code/common/service/java/nu/marginalia/service/control/ServiceAdHocTaskHeartbeatImpl.java b/code/common/service/java/nu/marginalia/service/control/ServiceAdHocTaskHeartbeatImpl.java new file mode 100644 index 00000000..f7436596 --- /dev/null +++ b/code/common/service/java/nu/marginalia/service/control/ServiceAdHocTaskHeartbeatImpl.java @@ -0,0 +1,190 @@ +package nu.marginalia.service.control; + + +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.service.module.ServiceConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** This object sends a heartbeat to the database every few seconds, + * updating with the progress of a task within a service. Progress is tracked by providing + * enumerations corresponding to the steps in the task. It's important they're arranged in the same + * order as the steps in the task in order to get an accurate progress tracking. + */ +public class ServiceAdHocTaskHeartbeatImpl implements AutoCloseable, ServiceAdHocTaskHeartbeat { + private final Logger logger = LoggerFactory.getLogger(ServiceAdHocTaskHeartbeatImpl.class); + private final String taskName; + private final String taskBase; + private final int node; + private final String instanceUUID; + private final HikariDataSource dataSource; + + + private final Thread runnerThread; + private final int heartbeatInterval = Integer.getInteger("mcp.heartbeat.interval", 1); + private final String serviceInstanceUUID; + private int progress; + + private volatile boolean running = false; + private volatile String step = "-"; + + ServiceAdHocTaskHeartbeatImpl(ServiceConfiguration configuration, + String taskName, + HikariDataSource dataSource) + { + this.taskName = configuration.serviceName() + "." + taskName + ":" + configuration.node(); + this.taskBase = configuration.serviceName() + "." + taskName; + this.node = configuration.node(); + this.dataSource = dataSource; + + this.instanceUUID = UUID.randomUUID().toString(); + this.serviceInstanceUUID = configuration.instanceUuid().toString(); + + heartbeatInit(); + + runnerThread = new Thread(this::run); + runnerThread.start(); + } + + /** Update the progress of the task. This is a fast function that doesn't block; + * the actual update is done in a separate thread. + * + * @param step The current step in the task. + */ + @Override + public void progress(String step, int stepProgress, int stepCount) { + this.step = step; + + + // off by one since we calculate the progress based on the number of steps, + // and Enum.ordinal() is zero-based (so the 5th step in a 5 step task is 4, not 5; resulting in the + // final progress being 80% and not 100%) + + this.progress = (int) Math.round(100. * stepProgress / (double) stepCount); + + logger.info("ServiceTask {} progress: {}%", taskBase, progress); + } + + public void shutDown() { + if (!running) + return; + + running = false; + + try { + runnerThread.join(); + heartbeatStop(); + } + catch (InterruptedException|SQLException ex) { + logger.warn("ServiceHeartbeat shutdown failed", ex); + } + } + + private void run() { + if (!running) + running = true; + else + return; + + try { + while (running) { + try { + heartbeatUpdate(); + } + catch (SQLException ex) { + logger.warn("ServiceHeartbeat failed to update", ex); + } + + TimeUnit.SECONDS.sleep(heartbeatInterval); + } + } + catch (InterruptedException ex) { + logger.error("ServiceHeartbeat caught irrecoverable exception, killing service", ex); + System.exit(255); + } + } + + private void heartbeatInit() { + try (var connection = dataSource.getConnection()) { + try (var stmt = connection.prepareStatement( + """ + INSERT INTO TASK_HEARTBEAT (TASK_NAME, TASK_BASE, NODE, INSTANCE, SERVICE_INSTANCE, HEARTBEAT_TIME, STATUS) + VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP(6), 'STARTING') + ON DUPLICATE KEY UPDATE + INSTANCE = ?, + SERVICE_INSTANCE = ?, + HEARTBEAT_TIME = CURRENT_TIMESTAMP(6), + STATUS = 'STARTING' + """ + )) + { + stmt.setString(1, taskName); + stmt.setString(2, taskBase); + stmt.setInt(3, node); + stmt.setString(4, instanceUUID); + stmt.setString(5, serviceInstanceUUID); + stmt.setString(6, instanceUUID); + stmt.setString(7, serviceInstanceUUID); + stmt.executeUpdate(); + } + } + catch (SQLException ex) { + logger.error("ServiceHeartbeat failed to initialize", ex); + throw new RuntimeException(ex); + } + + } + + private void heartbeatUpdate() throws SQLException { + try (var connection = dataSource.getConnection()) { + try (var stmt = connection.prepareStatement( + """ + UPDATE TASK_HEARTBEAT + SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6), + STATUS = 'RUNNING', + PROGRESS = ?, + STAGE_NAME = ? + WHERE INSTANCE = ? + """) + ) + { + stmt.setInt(1, progress); + stmt.setString(2, step); + stmt.setString(3, instanceUUID); + stmt.executeUpdate(); + } + } + } + + private void heartbeatStop() throws SQLException { + try (var connection = dataSource.getConnection()) { + try (var stmt = connection.prepareStatement( + """ + UPDATE TASK_HEARTBEAT + SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6), + STATUS='STOPPED', + PROGRESS = ?, + STAGE_NAME = ? + WHERE INSTANCE = ? + """) + ) + { + stmt.setInt(1, progress); + stmt.setString( 2, step); + stmt.setString( 3, instanceUUID); + stmt.executeUpdate(); + } + } + } + + @Override + public void close() { + shutDown(); + } + +} + diff --git a/code/common/service/java/nu/marginalia/service/control/ServiceHeartbeat.java b/code/common/service/java/nu/marginalia/service/control/ServiceHeartbeat.java index 992e42e9..9bb60c9f 100644 --- a/code/common/service/java/nu/marginalia/service/control/ServiceHeartbeat.java +++ b/code/common/service/java/nu/marginalia/service/control/ServiceHeartbeat.java @@ -5,4 +5,5 @@ import com.google.inject.ImplementedBy; @ImplementedBy(ServiceHeartbeatImpl.class) public interface ServiceHeartbeat { > ServiceTaskHeartbeat createServiceTaskHeartbeat(Class steps, String processName); + ServiceAdHocTaskHeartbeat createServiceAdHocTaskHeartbeat(String taskName); } diff --git a/code/common/service/java/nu/marginalia/service/control/ServiceHeartbeatImpl.java b/code/common/service/java/nu/marginalia/service/control/ServiceHeartbeatImpl.java index 4a6f1c71..02ff74f9 100644 --- a/code/common/service/java/nu/marginalia/service/control/ServiceHeartbeatImpl.java +++ b/code/common/service/java/nu/marginalia/service/control/ServiceHeartbeatImpl.java @@ -54,6 +54,11 @@ public class ServiceHeartbeatImpl implements ServiceHeartbeat { return new ServiceTaskHeartbeatImpl<>(steps, configuration, processName, eventLog, dataSource); } + @Override + public ServiceAdHocTaskHeartbeat createServiceAdHocTaskHeartbeat(String taskName) { + return new ServiceAdHocTaskHeartbeatImpl(configuration, taskName, dataSource); + } + public void start() { if (!running) { diff --git a/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java b/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java index 55b152b1..020c29b0 100644 --- a/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java +++ b/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java @@ -9,23 +9,25 @@ import nu.marginalia.executor.storage.FileStorageFile; import nu.marginalia.executor.upload.UploadDirContents; import nu.marginalia.executor.upload.UploadDirItem; import nu.marginalia.functions.execution.api.*; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.client.GrpcChannelPoolFactory; import nu.marginalia.service.client.GrpcMultiNodeChannelPool; import nu.marginalia.service.discovery.ServiceRegistryIf; import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServicePartition; -import nu.marginalia.service.ServiceId; import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorageId; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.*; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.List; -import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.*; +import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub; @Singleton public class ExecutorClient { @@ -163,8 +165,8 @@ public class ExecutorClient { * The endpoint is compatible with range requests. * */ public URL remoteFileURL(FileStorage fileStorage, String path) { - String uriPath = STR."/transfer/file/\{fileStorage.id()}"; - String uriQuery = STR."path=\{URLEncoder.encode(path, StandardCharsets.UTF_8)}"; + String uriPath = "/transfer/file/" + fileStorage.id(); + String uriQuery = "path=" + URLEncoder.encode(path, StandardCharsets.UTF_8); var endpoints = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, fileStorage.node())); if (endpoints.isEmpty()) { diff --git a/code/execution/build.gradle b/code/execution/build.gradle index ae22f2ea..4a28f540 100644 --- a/code/execution/build.gradle +++ b/code/execution/build.gradle @@ -35,6 +35,7 @@ dependencies { implementation project(':code:libraries:term-frequency-dict') implementation project(':code:functions:link-graph:api') + implementation project(':code:functions:live-capture:api') implementation project(':code:functions:search-query') implementation project(':code:execution:api') diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActor.java b/code/execution/java/nu/marginalia/actor/ExecutorActor.java index 58add09e..e3b2308b 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActor.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActor.java @@ -22,7 +22,8 @@ public enum ExecutorActor { RESTORE_BACKUP, EXPORT_SAMPLE_DATA, DOWNLOAD_SAMPLE, - SCRAPE_FEEDS; + SCRAPE_FEEDS, + UPDATE_RSS; public String id() { return "fsm:" + name().toLowerCase(); diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java index 836382b2..8834a59b 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java @@ -50,7 +50,8 @@ public class ExecutorActorControlService { ExportSegmentationModelActor exportSegmentationModelActor, DownloadSampleActor downloadSampleActor, ScrapeFeedsActor scrapeFeedsActor, - ExecutorActorStateMachines stateMachines) { + ExecutorActorStateMachines stateMachines, + UpdateRssActor updateRssActor) { this.messageQueueFactory = messageQueueFactory; this.eventLog = baseServiceParams.eventLog; this.stateMachines = stateMachines; @@ -83,6 +84,7 @@ public class ExecutorActorControlService { register(ExecutorActor.DOWNLOAD_SAMPLE, downloadSampleActor); register(ExecutorActor.SCRAPE_FEEDS, scrapeFeedsActor); + register(ExecutorActor.UPDATE_RSS, updateRssActor); } private void register(ExecutorActor process, RecordActorPrototype graph) { diff --git a/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java b/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java new file mode 100644 index 00000000..7ab87835 --- /dev/null +++ b/code/execution/java/nu/marginalia/actor/proc/UpdateRssActor.java @@ -0,0 +1,76 @@ +package nu.marginalia.actor.proc; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import nu.marginalia.actor.prototype.RecordActorPrototype; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.actor.state.Resume; +import nu.marginalia.api.feeds.FeedsClient; +import nu.marginalia.service.module.ServiceConfiguration; + +import java.time.Duration; +import java.time.LocalDateTime; + +public class UpdateRssActor extends RecordActorPrototype { + + private final FeedsClient feedsClient; + private final int nodeId; + + private final Duration initialDelay = Duration.ofMinutes(5); + private final Duration updateInterval = Duration.ofHours(35); + + @Inject + public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration) { + super(gson); + this.feedsClient = feedsClient; + this.nodeId = serviceConfiguration.node(); + } + + public record Initial() implements ActorStep {} + @Resume(behavior = ActorResumeBehavior.RETRY) + public record Wait(String ts) implements ActorStep {} + @Resume(behavior = ActorResumeBehavior.RESTART) + public record Update() implements ActorStep {} + + + @Override + public ActorStep transition(ActorStep self) throws Exception { + return switch (self) { + case Initial() -> { + if (nodeId > 1) { + // Only run on the first node + yield new End(); + } + else { + // Wait for 5 minutes before starting the first update, to give the system time to start up properly + yield new Wait(LocalDateTime.now().plus(initialDelay).toString()); + } + } + case Wait(String untilTs) -> { + var until = LocalDateTime.parse(untilTs); + var now = LocalDateTime.now(); + + long remaining = Duration.between(now, until).toMillis(); + + if (remaining > 0) { + Thread.sleep(remaining); + yield new Wait(untilTs); + } + else { + yield new Update(); + } + } + case Update() -> { + feedsClient.updateFeeds(); + yield new Wait(LocalDateTime.now().plus(updateInterval).toString()); + } + default -> new Error("Unknown actor step: " + self); + }; + } + + @Override + public String describe() { + return "Periodically updates RSS and Atom feeds"; + } +} diff --git a/code/features-search/feedlot-client/build.gradle b/code/features-search/feedlot-client/build.gradle deleted file mode 100644 index d7a430e9..00000000 --- a/code/features-search/feedlot-client/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -plugins { - id 'java' - id 'jvm-test-suite' -} - -java { - toolchain { - languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion)) - } -} - -apply from: "$rootProject.projectDir/srcsets.gradle" - -dependencies { - implementation libs.bundles.slf4j - - implementation libs.notnull - implementation libs.gson - - testImplementation libs.bundles.slf4j.test - testImplementation libs.bundles.junit - testImplementation libs.mockito - -} \ No newline at end of file diff --git a/code/features-search/feedlot-client/java/nu/marginalia/feedlot/FeedlotClient.java b/code/features-search/feedlot-client/java/nu/marginalia/feedlot/FeedlotClient.java deleted file mode 100644 index d247a8e2..00000000 --- a/code/features-search/feedlot-client/java/nu/marginalia/feedlot/FeedlotClient.java +++ /dev/null @@ -1,58 +0,0 @@ -package nu.marginalia.feedlot; - -import com.google.gson.Gson; -import nu.marginalia.feedlot.model.FeedItems; - -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.time.Duration; -import java.util.concurrent.Executors; -import java.util.concurrent.CompletableFuture; - -public class FeedlotClient { - private final String feedlotHost; - private final int feedlotPort; - private final Gson gson; - private final HttpClient httpClient; - private final Duration requestTimeout; - - public FeedlotClient(String feedlotHost, - int feedlotPort, - Gson gson, - Duration connectTimeout, - Duration requestTimeout - ) - { - this.feedlotHost = feedlotHost; - this.feedlotPort = feedlotPort; - this.gson = gson; - - httpClient = HttpClient.newBuilder() - .executor(Executors.newCachedThreadPool()) - .connectTimeout(connectTimeout) - .build(); - this.requestTimeout = requestTimeout; - } - - public CompletableFuture getFeedItems(String domainName) { - return httpClient.sendAsync( - HttpRequest.newBuilder() - .uri(URI.create("http://%s:%d/feed/%s".formatted(feedlotHost, feedlotPort, domainName))) - .GET() - .timeout(requestTimeout) - .build(), - HttpResponse.BodyHandlers.ofString() - ).thenApply(HttpResponse::body) - .thenApply(this::parseFeedItems); - } - - private FeedItems parseFeedItems(String s) { - return gson.fromJson(s, FeedItems.class); - } - - public void stop() { - httpClient.close(); - } -} diff --git a/code/features-search/feedlot-client/java/nu/marginalia/feedlot/model/FeedItem.java b/code/features-search/feedlot-client/java/nu/marginalia/feedlot/model/FeedItem.java deleted file mode 100644 index 95ea8fe3..00000000 --- a/code/features-search/feedlot-client/java/nu/marginalia/feedlot/model/FeedItem.java +++ /dev/null @@ -1,17 +0,0 @@ -package nu.marginalia.feedlot.model; - -public record FeedItem(String title, String date, String description, String url) { - - public String pubDay() { // Extract the date from an ISO style date string - if (date.length() > 10) { - return date.substring(0, 10); - } - return date; - } - - public String descriptionSafe() { - return description - .replace("<", "<") - .replace(">", ">"); - } -} diff --git a/code/features-search/feedlot-client/java/nu/marginalia/feedlot/model/FeedItems.java b/code/features-search/feedlot-client/java/nu/marginalia/feedlot/model/FeedItems.java deleted file mode 100644 index fcf06345..00000000 --- a/code/features-search/feedlot-client/java/nu/marginalia/feedlot/model/FeedItems.java +++ /dev/null @@ -1,6 +0,0 @@ -package nu.marginalia.feedlot.model; - -import java.util.List; - -public record FeedItems(String domain, String feedUrl, String updated, List items) { -} diff --git a/code/features-search/feedlot-client/readme.md b/code/features-search/feedlot-client/readme.md deleted file mode 100644 index 76fafff8..00000000 --- a/code/features-search/feedlot-client/readme.md +++ /dev/null @@ -1,20 +0,0 @@ -Client for [FeedlotTheFeedBot](https://github.com/MarginaliaSearch/FeedLotTheFeedBot), -the RSS/Atom feed fetcher and cache for Marginalia Search. - -This service is external to the Marginalia Search codebase, -as it is not a core part of the search engine and has other -utilities. - -## Example - -```java - -import java.time.Duration; - -var client = new FeedlotClient("localhost", 8080, - gson, - Duration.ofMillis(100), // connect timeout - Duration.ofMillis(100)); // request timeout - -CompleteableFuture items = client.getFeedItems("www.marginalia.nu"); -``` \ No newline at end of file diff --git a/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java new file mode 100644 index 00000000..4ec5a199 --- /dev/null +++ b/code/functions/live-capture/api/java/nu/marginalia/api/feeds/FeedsClient.java @@ -0,0 +1,49 @@ +package nu.marginalia.api.feeds; + +import com.google.inject.Inject; +import nu.marginalia.service.client.GrpcChannelPoolFactory; +import nu.marginalia.service.client.GrpcSingleNodeChannelPool; +import nu.marginalia.service.discovery.property.ServiceKey; +import nu.marginalia.service.discovery.property.ServicePartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class FeedsClient { + private static final Logger logger = LoggerFactory.getLogger(FeedsClient.class); + private final ExecutorService executorService = Executors.newCachedThreadPool(); + private final GrpcSingleNodeChannelPool channelPool; + + @Inject + public FeedsClient(GrpcChannelPoolFactory factory) { + // The client is only interested in the primary node + var key = ServiceKey.forGrpcApi(FeedApiGrpc.class, ServicePartition.any()); + this.channelPool = factory.createSingle(key, FeedApiGrpc::newBlockingStub); + } + + + public CompletableFuture getFeed(int domainId) { + try { + return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeed) + .async(executorService) + .run(RpcDomainId.newBuilder().setDomainId(domainId).build()); + } + catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + + public void updateFeeds() { + try { + channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds) + .run(Empty.getDefaultInstance()); + } + catch (Exception e) { + logger.error("API Exception", e); + } + } + +} diff --git a/code/functions/live-capture/api/src/main/protobuf/feeds.proto b/code/functions/live-capture/api/src/main/protobuf/feeds.proto new file mode 100644 index 00000000..d31d9d47 --- /dev/null +++ b/code/functions/live-capture/api/src/main/protobuf/feeds.proto @@ -0,0 +1,32 @@ +syntax="proto3"; +package nu.marginalia.api.feeds; + +option java_package="nu.marginalia.api.feeds"; +option java_multiple_files=true; + + +service FeedApi { + rpc getFeed(RpcDomainId) returns (RpcFeed) {} + rpc updateFeeds(Empty) returns (Empty) {} +} + +message RpcDomainId { + int32 domainId = 1; +} + +message RpcFeed { + int32 domainId = 1; + string domain = 2; + string feedUrl = 3; + string updated = 4; + repeated RpcFeedItem items = 5; +} + +message RpcFeedItem { + string title = 1; + string date = 2; + string description = 3; + string url = 4; +} + +message Empty {} \ No newline at end of file diff --git a/code/functions/live-capture/api/src/main/protobuf/live-capture.proto b/code/functions/live-capture/api/src/main/protobuf/live-capture.proto index 752de691..b4bff05d 100644 --- a/code/functions/live-capture/api/src/main/protobuf/live-capture.proto +++ b/code/functions/live-capture/api/src/main/protobuf/live-capture.proto @@ -4,15 +4,12 @@ package nu.marginalia.api.livecapture; option java_package="nu.marginalia.api.livecapture"; option java_multiple_files=true; -service LiveCaptureApi { - rpc requestScreengrab(RpcDomainId) returns (Empty) {} -} - -message Void { -} - message RpcDomainId { int32 domainId = 1; } +service LiveCaptureApi { + rpc requestScreengrab(RpcDomainId) returns (Empty) {} +} + message Empty {} \ No newline at end of file diff --git a/code/functions/live-capture/build.gradle b/code/functions/live-capture/build.gradle index 169c7e23..d5fa647a 100644 --- a/code/functions/live-capture/build.gradle +++ b/code/functions/live-capture/build.gradle @@ -21,6 +21,12 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:db') + implementation project(':code:execution:api') + + implementation libs.jsoup + implementation libs.rssreader + implementation libs.opencsv + implementation libs.sqlite implementation libs.bundles.slf4j implementation libs.commons.lang3 diff --git a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java new file mode 100644 index 00000000..f8c08b45 --- /dev/null +++ b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java @@ -0,0 +1,144 @@ +package nu.marginalia.rss.db; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.WmsaHome; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.rss.model.FeedDefinition; +import nu.marginalia.rss.model.FeedItems; +import nu.marginalia.service.module.ServiceConfiguration; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.List; +import java.util.Optional; + +@Singleton +public class FeedDb { + private static final Logger logger = LoggerFactory.getLogger(FeedDb.class); + + private static final String dbFileName = "rss-feeds.db"; + + private final Path readerDbPath; + private volatile FeedDbReader reader; + + private final boolean feedDbEnabled; + + @Inject + public FeedDb(ServiceConfiguration serviceConfiguration) { + feedDbEnabled = serviceConfiguration.node() <= 1; + readerDbPath = WmsaHome.getDataPath().resolve(dbFileName); + + if (!feedDbEnabled) { + logger.info("Feed database is disabled on this node"); + } + else { + try { + reader = new FeedDbReader(readerDbPath); + } catch (Exception e) { + reader = null; + } + } + } + + public boolean isEnabled() { + return feedDbEnabled; + } + + public List getAllFeeds() { + if (!feedDbEnabled) { + throw new IllegalStateException("Feed database is disabled on this node"); + } + + // Capture the current reader to avoid concurrency issues + FeedDbReader reader = this.reader; + + try { + if (reader != null) { + return reader.getAllFeeds(); + } + } + catch (Exception e) { + logger.error("Error getting all feeds", e); + } + return List.of(); + } + + @NotNull + public FeedItems getFeed(EdgeDomain domain) { + if (!feedDbEnabled) { + throw new IllegalStateException("Feed database is disabled on this node"); + } + + // Capture the current reader to avoid concurrency issues + FeedDbReader reader = this.reader; + try { + if (reader != null) { + return reader.getFeed(domain); + } + } + catch (Exception e) { + logger.error("Error getting feed for " + domain, e); + } + return FeedItems.none(); + } + + public Optional getFeedAsJson(String domain) { + if (!feedDbEnabled) { + throw new IllegalStateException("Feed database is disabled on this node"); + } + + // Capture the current reader to avoid concurrency issues + FeedDbReader reader = this.reader; + + try { + if (reader != null) { + return reader.getFeedAsJson(domain); + } + } + catch (Exception e) { + logger.error("Error getting feed for " + domain, e); + } + return Optional.empty(); + } + + public FeedDbWriter createWriter() { + if (!feedDbEnabled) { + throw new IllegalStateException("Feed database is disabled on this node"); + } + + try { + Path dbFile = Files.createTempFile(WmsaHome.getDataPath(), "rss-feeds", ".tmp.db"); + return new FeedDbWriter(dbFile); + } catch (Exception e) { + logger.error("Error creating new database writer", e); + return null; + } + } + + public void switchDb(FeedDbWriter writer) { + if (!feedDbEnabled) { + throw new IllegalStateException("Feed database is disabled on this node"); + } + + try { + logger.info("Switching to new feed database from " + writer.getDbPath() + " to " + readerDbPath); + + writer.close(); + reader.close(); + + Files.move(writer.getDbPath(), readerDbPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE); + + reader = new FeedDbReader(readerDbPath); + } catch (Exception e) { + logger.error("Fatal error switching to new feed database", e); + + reader = null; + } + } + +} diff --git a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java new file mode 100644 index 00000000..888eb731 --- /dev/null +++ b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java @@ -0,0 +1,98 @@ +package nu.marginalia.rss.db; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.rss.model.FeedDefinition; +import nu.marginalia.rss.model.FeedItems; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class FeedDbReader implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(FeedDbReader.class); + private static final Gson gson = new GsonBuilder().create(); + private final Connection connection; + + public FeedDbReader(Path dbPath) throws SQLException { + String dbUrl = "jdbc:sqlite:" + dbPath.toAbsolutePath(); + + logger.info("Opening feed db at " + dbUrl); + + connection = DriverManager.getConnection(dbUrl); + + // Create table if it doesn't exist to avoid errors before any feeds have been fetched + try (var stmt = connection.createStatement()) { + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)"); + } + } + + public List getAllFeeds() { + List ret = new ArrayList<>(); + + try (var stmt = connection.createStatement()) { + var rs = stmt.executeQuery(""" + select + json_extract(feed, '$.domain') as domain, + json_extract(feed, '$.feedUrl') as url + from feed + """); + + while (rs.next()) { + ret.add(new FeedDefinition(rs.getString("domain"), rs.getString("url"))); + } + + } catch (SQLException e) { + logger.error("Error getting all feeds", e); + } + + return ret; + } + + public Optional getFeedAsJson(String domain) { + try (var stmt = connection.prepareStatement("SELECT FEED FROM feed WHERE DOMAIN = ?")) { + stmt.setString(1, domain); + + var rs = stmt.executeQuery(); + if (rs.next()) { + return Optional.of(rs.getString(1)); + } + } catch (SQLException e) { + logger.error("Error getting feed for " + domain, e); + } + return Optional.empty(); + } + + public FeedItems getFeed(EdgeDomain domain) { + try (var stmt = connection.prepareStatement("SELECT FEED FROM feed WHERE DOMAIN = ?")) { + stmt.setString(1, domain.toString()); + var rs = stmt.executeQuery(); + + if (rs.next()) { + return deserialize(rs.getString(1)); + } + } catch (SQLException e) { + logger.error("Error getting feed for " + domain, e); + } + + return FeedItems.none(); + } + + private FeedItems deserialize(String string) { + return gson.fromJson(string, FeedItems.class); + } + + @Override + public void close() throws SQLException { + connection.close(); + } + + +} diff --git a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java new file mode 100644 index 00000000..fcf1c363 --- /dev/null +++ b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java @@ -0,0 +1,75 @@ +package nu.marginalia.rss.db; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import nu.marginalia.rss.model.FeedItems; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class FeedDbWriter implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(FeedDbWriter.class); + + private static final Gson gson = new GsonBuilder().create(); + + private final Connection connection; + private final PreparedStatement insertFeedStmt; + private final Path dbPath; + + private volatile boolean closed = false; + + public FeedDbWriter(Path dbPath) throws SQLException { + String dbUrl = "jdbc:sqlite:" + dbPath.toAbsolutePath(); + + this.dbPath = dbPath; + + connection = DriverManager.getConnection(dbUrl); + + try (var stmt = connection.createStatement()) { + // Disable synchronous writes for speed. We don't care about recovery. + // + // If this operation fails we just retry the entire operation from scratch + // with a new db file. + + stmt.execute("PRAGMA synchronous = OFF"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)"); + } + + insertFeedStmt = connection.prepareStatement("INSERT INTO feed (domain, feed) VALUES (?, ?)"); + } + + public Path getDbPath() { + return dbPath; + } + + + public synchronized void saveFeed(FeedItems items) { + try { + insertFeedStmt.setString(1, items.domain().toLowerCase()); + insertFeedStmt.setString(2, serialize(items)); + insertFeedStmt.executeUpdate(); + } + catch (SQLException e) { + logger.error("Error saving feed for " + items.domain(), e); + } + } + + private String serialize(FeedItems items) { + return gson.toJson(items); + } + + @Override + public void close() throws SQLException { + if (!closed) { + insertFeedStmt.close(); + connection.close(); + closed = true; + } + } +} diff --git a/code/functions/live-capture/java/nu/marginalia/rss/model/FeedDefinition.java b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedDefinition.java new file mode 100644 index 00000000..dd7cce32 --- /dev/null +++ b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedDefinition.java @@ -0,0 +1,3 @@ +package nu.marginalia.rss.model; + +public record FeedDefinition(String domain, String feedUrl) { } diff --git a/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItem.java b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItem.java new file mode 100644 index 00000000..119a32cb --- /dev/null +++ b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItem.java @@ -0,0 +1,62 @@ +package nu.marginalia.rss.model; + +import com.apptasticsoftware.rssreader.Item; +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; +import org.jsoup.Jsoup; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Optional; + +public record FeedItem(String title, + String date, + String description, + String url) implements Comparable +{ + public static final int MAX_DESC_LENGTH = 255; + public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + + public static FeedItem fromItem(Item item) { + String title = item.getTitle().orElse(""); + String date = getItemDate(item); + String description = getItemDescription(item); + String url = item.getLink().orElse(""); + + return new FeedItem(title, date, description, url); + } + + private static String getItemDescription(Item item) { + Optional description = item.getDescription(); + if (description.isEmpty()) + return ""; + + String rawDescription = description.get(); + if (rawDescription.indexOf('<') >= 0) { + rawDescription = Jsoup.parseBodyFragment(rawDescription).text(); + } + + return StringUtils.truncate(rawDescription, MAX_DESC_LENGTH); + } + + // e.g. http://fabiensanglard.net/rss.xml does dates like this: 1 Apr 2021 00:00:00 +0000 + private static final DateTimeFormatter extraFormatter = DateTimeFormatter.ofPattern("d MMM yyyy HH:mm:ss Z"); + private static String getItemDate(Item item) { + Optional zonedDateTime = Optional.empty(); + try { + zonedDateTime = item.getPubDateZonedDateTime(); + } + catch (Exception e) { + zonedDateTime = item.getPubDate() + .map(extraFormatter::parse) + .map(ZonedDateTime::from); + } + + return zonedDateTime.map(date -> date.format(DATE_FORMAT)).orElse(""); + } + + @Override + public int compareTo(@NotNull FeedItem o) { + return o.date.compareTo(date); + } +} diff --git a/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItems.java b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItems.java new file mode 100644 index 00000000..3a8f0676 --- /dev/null +++ b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedItems.java @@ -0,0 +1,33 @@ +package nu.marginalia.rss.model; + +import java.util.List; +import java.util.Optional; + +public record FeedItems(String domain, + String feedUrl, + String updated, + List items) { + + // List.of() is immutable, so we can use the same instance for all empty FeedItems + private static final FeedItems EMPTY = new FeedItems("", "","1970-01-01T00:00:00.000+00000", List.of()); + public static FeedItems none() { + return EMPTY; + } + + public boolean isEmpty() { + return items.isEmpty(); + } + + public Optional getLatest() { + if (items.isEmpty()) + return Optional.empty(); + + return Optional.of( + items.getFirst() + ); + } + + public Optional getLatestDate() { + return getLatest().map(FeedItem::date); + } +} diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java new file mode 100644 index 00000000..658310c0 --- /dev/null +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java @@ -0,0 +1,205 @@ +package nu.marginalia.rss.svc; + +import com.apptasticsoftware.rssreader.RssReader; +import com.google.inject.Inject; +import com.opencsv.CSVReader; +import nu.marginalia.WmsaHome; +import nu.marginalia.executor.client.ExecutorClient; +import nu.marginalia.nodecfg.NodeConfigurationService; +import nu.marginalia.rss.db.FeedDb; +import nu.marginalia.rss.model.FeedDefinition; +import nu.marginalia.rss.model.FeedItem; +import nu.marginalia.rss.model.FeedItems; +import nu.marginalia.service.control.ServiceHeartbeat; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorage; +import nu.marginalia.storage.model.FileStorageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.http.HttpClient; +import java.sql.SQLException; +import java.time.LocalDateTime; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.function.BiFunction; +import java.util.function.Predicate; +import java.util.zip.GZIPInputStream; + +public class FeedFetcherService { + private static final int MAX_FEED_ITEMS = 10; + private static final Logger logger = LoggerFactory.getLogger(FeedFetcherService.class); + + private final RssReader rssReader = new RssReader( + HttpClient.newBuilder().executor(Executors.newWorkStealingPool(16)).build() + ); + + private final FeedDb feedDb; + private final FileStorageService fileStorageService; + private final NodeConfigurationService nodeConfigurationService; + private final ServiceHeartbeat serviceHeartbeat; + private final ExecutorClient executorClient; + + private volatile boolean updating; + + @Inject + public FeedFetcherService(FeedDb feedDb, + FileStorageService fileStorageService, + NodeConfigurationService nodeConfigurationService, + ServiceHeartbeat serviceHeartbeat, + ExecutorClient executorClient) + { + this.feedDb = feedDb; + this.fileStorageService = fileStorageService; + this.nodeConfigurationService = nodeConfigurationService; + this.serviceHeartbeat = serviceHeartbeat; + this.executorClient = executorClient; + } + + public void updateFeeds() throws IOException { + rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher"); + + if (updating) // Prevent concurrent updates + { + logger.error("Already updating feeds, refusing to start another update"); + return; + } + + try (var writer = feedDb.createWriter()) { + updating = true; + var definitions = readDefinitions(); + + logger.info("Found {} feed definitions", definitions.size()); + + for (var feed: definitions) { + var items = fetchFeed(feed); + if (!items.isEmpty()) { + writer.saveFeed(items); + } + } + + feedDb.switchDb(writer); + + } catch (SQLException e) { + logger.error("Error updating feeds", e); + } + finally { + updating = false; + } + } + + public Collection readDefinitions() throws IOException { + Collection storages = getLatestFeedStorages(); + List feedDefinitionList = new ArrayList<>(); + + // Download and parse feeds.csv.gz from each relevant storage + int updated = 0; + try (var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")) { + for (var storage : storages) { + var url = executorClient.remoteFileURL(storage, "feeds.csv.gz"); + + heartbeat.progress("Fetch RSS/Atom feeds", 0, storages.size()); + + try (var feedStream = new GZIPInputStream(url.openStream())) { + CSVReader reader = new CSVReader(new java.io.InputStreamReader(feedStream)); + + for (String[] row : reader) { + if (row.length < 3) { + continue; + } + var domain = row[0].trim(); + var feedUrl = row[2].trim(); + + feedDefinitionList.add(new FeedDefinition(domain, feedUrl)); + } + + heartbeat.progress("Fetch RSS/Atom feeds", ++updated, storages.size()); + } + } + } + + return feedDefinitionList; + } + + private Collection getLatestFeedStorages() { + // Find the newest feed storage for each node + + Map newestStorageByNode = new HashMap<>(); + + for (var node : nodeConfigurationService.getAll()) { + int nodeId = node.node(); + + for (var storage: fileStorageService.getEachFileStorage(nodeId, FileStorageType.EXPORT)) { + if (!storage.description().startsWith("Feeds ")) + continue; + + newestStorageByNode.compute(storage.node(), new KeepNewerFeedStorage(storage)); + } + + } + + return newestStorageByNode.values(); + } + + + private static class KeepNewerFeedStorage implements BiFunction { + private final FileStorage newValue; + + private KeepNewerFeedStorage(FileStorage newValue) { + this.newValue = newValue; + } + + public FileStorage apply(Integer node, @Nullable FileStorage oldValue) { + if (oldValue == null) return newValue; + + return newValue.createDateTime().isAfter(oldValue.createDateTime()) + ? newValue + : oldValue; + } + } + + public FeedItems fetchFeed(FeedDefinition definition) { + try { + var items = rssReader.read(definition.feedUrl()) + .map(FeedItem::fromItem) + .filter(new IsFeedItemDateValid()) + .sorted() + .limit(MAX_FEED_ITEMS) + .toList(); + + return new FeedItems( + definition.domain(), + definition.feedUrl(), + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), + items); + + } catch (Exception e) { + logger.warn("Failed to read feed {}: {}", definition.feedUrl(), e.getMessage()); + + logger.debug("Exception", e); + return FeedItems.none(); + } + } + + private static class IsFeedItemDateValid implements Predicate { + private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME); + + public boolean test(FeedItem item) { + var date = item.date(); + + if (date.isBlank()) { + return false; + } + + if (date.compareTo(today) > 0) { + return false; + } + + return true; + } + } +} diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java new file mode 100644 index 00000000..d833f9bd --- /dev/null +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedsGrpcService.java @@ -0,0 +1,93 @@ +package nu.marginalia.rss.svc; + +import com.google.inject.Inject; +import io.grpc.stub.StreamObserver; +import nu.marginalia.api.feeds.Empty; +import nu.marginalia.api.feeds.FeedApiGrpc; +import nu.marginalia.api.feeds.RpcDomainId; +import nu.marginalia.api.feeds.RpcFeed; +import nu.marginalia.db.DbDomainQueries; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.rss.db.FeedDb; +import nu.marginalia.rss.model.FeedItems; +import nu.marginalia.service.server.DiscoverableService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Optional; + +public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements DiscoverableService { + private final FeedDb feedDb; + private final DbDomainQueries domainQueries; + private final FeedFetcherService feedFetcherService; + + private static final Logger logger = LoggerFactory.getLogger(FeedsGrpcService.class); + + @Inject + public FeedsGrpcService(FeedDb feedDb, + DbDomainQueries domainQueries, + FeedFetcherService feedFetcherService) { + this.feedDb = feedDb; + this.domainQueries = domainQueries; + this.feedFetcherService = feedFetcherService; + } + + // Ensure that the service is only registered if it is enabled + @Override + public boolean shouldRegisterService() { + return feedDb.isEnabled(); + } + + @Override + public void updateFeeds(Empty request, + StreamObserver responseObserver) + { + Thread.ofPlatform().start(() -> { + try { + feedFetcherService.updateFeeds(); + } catch (IOException e) { + logger.error("Failed to update feeds", e); + } + }); + + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void getFeed(RpcDomainId request, + StreamObserver responseObserver) + { + if (!feedDb.isEnabled()) { + responseObserver.onError(new IllegalStateException("Feed database is disabled on this node")); + return; + } + + Optional domainName = domainQueries.getDomain(request.getDomainId()); + if (domainName.isEmpty()) { + responseObserver.onError(new IllegalArgumentException("Domain not found")); + return; + } + + FeedItems feedItems = feedDb.getFeed(domainName.get()); + + RpcFeed.Builder retB = RpcFeed.newBuilder() + .setDomainId(request.getDomainId()) + .setDomain(domainName.get().toString()) + .setFeedUrl(feedItems.feedUrl()) + .setUpdated(feedItems.updated()); + + for (var item : feedItems.items()) { + retB.addItemsBuilder() + .setTitle(item.title()) + .setUrl(item.url()) + .setDescription(item.description()) + .setDate(item.date()) + .build(); + } + + responseObserver.onNext(retB.build()); + responseObserver.onCompleted(); + } +} diff --git a/code/services-application/search-service/build.gradle b/code/services-application/search-service/build.gradle index e2d6cb42..55c2aa90 100644 --- a/code/services-application/search-service/build.gradle +++ b/code/services-application/search-service/build.gradle @@ -53,7 +53,6 @@ dependencies { implementation project(':code:features-search:screenshots') implementation project(':code:features-search:random-websites') - implementation project(':code:features-search:feedlot-client') implementation libs.bundles.slf4j diff --git a/code/services-application/search-service/java/nu/marginalia/search/SearchModule.java b/code/services-application/search-service/java/nu/marginalia/search/SearchModule.java index 52d1cbea..5e55aa2a 100644 --- a/code/services-application/search-service/java/nu/marginalia/search/SearchModule.java +++ b/code/services-application/search-service/java/nu/marginalia/search/SearchModule.java @@ -1,15 +1,10 @@ package nu.marginalia.search; import com.google.inject.AbstractModule; -import com.google.inject.Provides; import nu.marginalia.LanguageModels; import nu.marginalia.WebsiteUrl; import nu.marginalia.WmsaHome; -import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.renderer.config.HandlebarsConfigurator; -import nu.marginalia.feedlot.FeedlotClient; - -import java.time.Duration; public class SearchModule extends AbstractModule { @@ -22,14 +17,4 @@ public class SearchModule extends AbstractModule { System.getProperty("search.websiteUrl", "https://search.marginalia.nu/"))); } - @Provides - public FeedlotClient provideFeedlotClient() { - return new FeedlotClient( - System.getProperty("ext-svc-feedlot-bindAddress", "feedlot"), - Integer.getInteger("ext-svc-feedlot-port", 80), - GsonFactory.get(), - Duration.ofMillis(250), - Duration.ofMillis(100) - ); - } } diff --git a/code/services-application/search-service/java/nu/marginalia/search/svc/SearchSiteInfoService.java b/code/services-application/search-service/java/nu/marginalia/search/svc/SearchSiteInfoService.java index cf73c211..f56c3b79 100644 --- a/code/services-application/search-service/java/nu/marginalia/search/svc/SearchSiteInfoService.java +++ b/code/services-application/search-service/java/nu/marginalia/search/svc/SearchSiteInfoService.java @@ -4,10 +4,11 @@ import com.google.inject.Inject; import nu.marginalia.api.domains.DomainInfoClient; import nu.marginalia.api.domains.model.DomainInformation; import nu.marginalia.api.domains.model.SimilarDomain; +import nu.marginalia.api.feeds.FeedsClient; +import nu.marginalia.api.feeds.RpcFeed; +import nu.marginalia.api.feeds.RpcFeedItem; import nu.marginalia.api.livecapture.LiveCaptureClient; import nu.marginalia.db.DbDomainQueries; -import nu.marginalia.feedlot.FeedlotClient; -import nu.marginalia.feedlot.model.FeedItems; import nu.marginalia.model.EdgeDomain; import nu.marginalia.renderer.MustacheRenderer; import nu.marginalia.renderer.RendererFactory; @@ -37,7 +38,7 @@ public class SearchSiteInfoService { private final SearchFlagSiteService flagSiteService; private final DbDomainQueries domainQueries; private final MustacheRenderer renderer; - private final FeedlotClient feedlotClient; + private final FeedsClient feedsClient; private final LiveCaptureClient liveCaptureClient; private final ScreenshotService screenshotService; @@ -47,7 +48,7 @@ public class SearchSiteInfoService { RendererFactory rendererFactory, SearchFlagSiteService flagSiteService, DbDomainQueries domainQueries, - FeedlotClient feedlotClient, + FeedsClient feedsClient, LiveCaptureClient liveCaptureClient, ScreenshotService screenshotService) throws IOException { @@ -58,7 +59,7 @@ public class SearchSiteInfoService { this.renderer = rendererFactory.renderer("search/site-info/site-info"); - this.feedlotClient = feedlotClient; + this.feedsClient = feedsClient; this.liveCaptureClient = liveCaptureClient; this.screenshotService = screenshotService; } @@ -135,26 +136,29 @@ public class SearchSiteInfoService { final Future domainInfoFuture; final Future> similarSetFuture; final Future> linkingDomainsFuture; - + final CompletableFuture feedItemsFuture; String url = "https://" + domainName + "/"; boolean hasScreenshot = screenshotService.hasScreenshot(domainId); - var feedItemsFuture = feedlotClient.getFeedItems(domainName); + if (domainId < 0) { domainInfoFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID")); similarSetFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID")); linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID")); + feedItemsFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID")); } else if (!domainInfoClient.isAccepting()) { domainInfoFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable")); similarSetFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable")); linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable")); + feedItemsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable")); } else { domainInfoFuture = domainInfoClient.domainInformation(domainId); similarSetFuture = domainInfoClient.similarDomains(domainId, 25); linkingDomainsFuture = domainInfoClient.linkedDomains(domainId, 25); + feedItemsFuture = feedsClient.getFeed(domainId); } List sampleResults = searchOperator.doSiteSearch(domainName, domainId,5); @@ -162,13 +166,6 @@ public class SearchSiteInfoService { url = sampleResults.getFirst().url.withPathAndParam("/", null).toString(); } - FeedItems feedItems = null; - try { - feedItems = feedItemsFuture.get(); - } catch (Exception e) { - logger.debug("Failed to get feed items for {}: {}", domainName, e.getMessage()); - } - var result = new SiteInfoWithContext(domainName, domainId, url, @@ -176,7 +173,7 @@ public class SearchSiteInfoService { waitForFuture(domainInfoFuture, () -> createDummySiteInfo(domainName)), waitForFuture(similarSetFuture, List::of), waitForFuture(linkingDomainsFuture, List::of), - feedItems, + waitForFuture(feedItemsFuture.thenApply(FeedItems::new), () -> FeedItems.dummyValue(domainName)), sampleResults ); @@ -356,6 +353,43 @@ public class SearchSiteInfoService { } } + public record FeedItem(String title, String date, String description, String url) { + + public FeedItem(RpcFeedItem rpcFeedItem) { + this(rpcFeedItem.getTitle(), + rpcFeedItem.getDate(), + rpcFeedItem.getDescription(), + rpcFeedItem.getUrl()); + } + + public String pubDay() { // Extract the date from an ISO style date string + if (date.length() > 10) { + return date.substring(0, 10); + } + return date; + } + + public String descriptionSafe() { + return description + .replace("<", "<") + .replace(">", ">"); + } + } + + public record FeedItems(String domain, String feedUrl, String updated, List items) { + + public static FeedItems dummyValue(String domain) { + return new FeedItems(domain, "", "", List.of()); + } + + public FeedItems(RpcFeed rpcFeedItems) { + this(rpcFeedItems.getDomain(), + rpcFeedItems.getFeedUrl(), + rpcFeedItems.getUpdated(), + rpcFeedItems.getItemsList().stream().map(FeedItem::new).toList()); + } + } + public record ReportDomain( Map view, String domain, diff --git a/code/services-core/assistant-service/java/nu/marginalia/assistant/AssistantService.java b/code/services-core/assistant-service/java/nu/marginalia/assistant/AssistantService.java index 9bff6c38..107daa9d 100644 --- a/code/services-core/assistant-service/java/nu/marginalia/assistant/AssistantService.java +++ b/code/services-core/assistant-service/java/nu/marginalia/assistant/AssistantService.java @@ -8,6 +8,7 @@ import nu.marginalia.functions.domains.DomainInfoGrpcService; import nu.marginalia.functions.math.MathGrpcService; import nu.marginalia.livecapture.LiveCaptureGrpcService; import nu.marginalia.model.gson.GsonFactory; +import nu.marginalia.rss.svc.FeedsGrpcService; import nu.marginalia.screenshot.ScreenshotService; import nu.marginalia.service.discovery.property.ServicePartition; import nu.marginalia.service.server.BaseServiceParams; @@ -31,10 +32,15 @@ public class AssistantService extends Service { ScreenshotService screenshotService, DomainInfoGrpcService domainInfoGrpcService, LiveCaptureGrpcService liveCaptureGrpcService, + FeedsGrpcService feedsGrpcService, MathGrpcService mathGrpcService, Suggestions suggestions) { - super(params, ServicePartition.any(), List.of(domainInfoGrpcService, mathGrpcService, liveCaptureGrpcService)); + super(params, ServicePartition.any(), + List.of(domainInfoGrpcService, + mathGrpcService, + liveCaptureGrpcService, + feedsGrpcService)); this.suggestions = suggestions; diff --git a/settings.gradle b/settings.gradle index 7c87f7b9..80508146 100644 --- a/settings.gradle +++ b/settings.gradle @@ -56,7 +56,6 @@ include 'code:libraries:message-queue' include 'code:features-search:screenshots' include 'code:features-search:random-websites' -include 'code:features-search:feedlot-client' include 'code:processes:converting-process:ft-anchor-keywords' include 'code:execution:data-extractors' @@ -155,6 +154,8 @@ dependencyResolutionManagement { library('prometheus-server', 'io.prometheus', 'simpleclient_httpserver').version('0.16.0') library('prometheus-hotspot', 'io.prometheus', 'simpleclient_hotspot').version('0.16.0') + library('rssreader', 'com.apptasticsoftware', 'rssreader').version('3.5.0') + library('slf4j.api', 'org.slf4j', 'slf4j-api').version('1.7.36') library('slf4j.jdk14', 'org.slf4j', 'slf4j-jdk14').version('2.0.3')