mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 04:58:59 +00:00
(feeds) Retire feedlot the feed bot, move RSS capture into the live-capture service
This commit is contained in:
parent
3d6c79ae5f
commit
bfeb9a4538
@ -11,4 +11,14 @@ public class FakeServiceHeartbeat implements ServiceHeartbeat {
|
||||
public void close() {}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceAdHocTaskHeartbeat createServiceAdHocTaskHeartbeat(String taskName) {
|
||||
return new ServiceAdHocTaskHeartbeat() {
|
||||
@Override
|
||||
public void progress(String step, int stepProgress, int stepCount) {}
|
||||
@Override
|
||||
public void close() {}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,7 @@
|
||||
package nu.marginalia.service.control;
|
||||
|
||||
public interface ServiceAdHocTaskHeartbeat extends AutoCloseable {
|
||||
void progress(String step, int progress, int total);
|
||||
|
||||
void close();
|
||||
}
|
@ -0,0 +1,190 @@
|
||||
package nu.marginalia.service.control;
|
||||
|
||||
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/** This object sends a heartbeat to the database every few seconds,
|
||||
* updating with the progress of a task within a service. Progress is tracked by providing
|
||||
* enumerations corresponding to the steps in the task. It's important they're arranged in the same
|
||||
* order as the steps in the task in order to get an accurate progress tracking.
|
||||
*/
|
||||
public class ServiceAdHocTaskHeartbeatImpl implements AutoCloseable, ServiceAdHocTaskHeartbeat {
|
||||
private final Logger logger = LoggerFactory.getLogger(ServiceAdHocTaskHeartbeatImpl.class);
|
||||
private final String taskName;
|
||||
private final String taskBase;
|
||||
private final int node;
|
||||
private final String instanceUUID;
|
||||
private final HikariDataSource dataSource;
|
||||
|
||||
|
||||
private final Thread runnerThread;
|
||||
private final int heartbeatInterval = Integer.getInteger("mcp.heartbeat.interval", 1);
|
||||
private final String serviceInstanceUUID;
|
||||
private int progress;
|
||||
|
||||
private volatile boolean running = false;
|
||||
private volatile String step = "-";
|
||||
|
||||
ServiceAdHocTaskHeartbeatImpl(ServiceConfiguration configuration,
|
||||
String taskName,
|
||||
HikariDataSource dataSource)
|
||||
{
|
||||
this.taskName = configuration.serviceName() + "." + taskName + ":" + configuration.node();
|
||||
this.taskBase = configuration.serviceName() + "." + taskName;
|
||||
this.node = configuration.node();
|
||||
this.dataSource = dataSource;
|
||||
|
||||
this.instanceUUID = UUID.randomUUID().toString();
|
||||
this.serviceInstanceUUID = configuration.instanceUuid().toString();
|
||||
|
||||
heartbeatInit();
|
||||
|
||||
runnerThread = new Thread(this::run);
|
||||
runnerThread.start();
|
||||
}
|
||||
|
||||
/** Update the progress of the task. This is a fast function that doesn't block;
|
||||
* the actual update is done in a separate thread.
|
||||
*
|
||||
* @param step The current step in the task.
|
||||
*/
|
||||
@Override
|
||||
public void progress(String step, int stepProgress, int stepCount) {
|
||||
this.step = step;
|
||||
|
||||
|
||||
// off by one since we calculate the progress based on the number of steps,
|
||||
// and Enum.ordinal() is zero-based (so the 5th step in a 5 step task is 4, not 5; resulting in the
|
||||
// final progress being 80% and not 100%)
|
||||
|
||||
this.progress = (int) Math.round(100. * stepProgress / (double) stepCount);
|
||||
|
||||
logger.info("ServiceTask {} progress: {}%", taskBase, progress);
|
||||
}
|
||||
|
||||
public void shutDown() {
|
||||
if (!running)
|
||||
return;
|
||||
|
||||
running = false;
|
||||
|
||||
try {
|
||||
runnerThread.join();
|
||||
heartbeatStop();
|
||||
}
|
||||
catch (InterruptedException|SQLException ex) {
|
||||
logger.warn("ServiceHeartbeat shutdown failed", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void run() {
|
||||
if (!running)
|
||||
running = true;
|
||||
else
|
||||
return;
|
||||
|
||||
try {
|
||||
while (running) {
|
||||
try {
|
||||
heartbeatUpdate();
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
logger.warn("ServiceHeartbeat failed to update", ex);
|
||||
}
|
||||
|
||||
TimeUnit.SECONDS.sleep(heartbeatInterval);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException ex) {
|
||||
logger.error("ServiceHeartbeat caught irrecoverable exception, killing service", ex);
|
||||
System.exit(255);
|
||||
}
|
||||
}
|
||||
|
||||
private void heartbeatInit() {
|
||||
try (var connection = dataSource.getConnection()) {
|
||||
try (var stmt = connection.prepareStatement(
|
||||
"""
|
||||
INSERT INTO TASK_HEARTBEAT (TASK_NAME, TASK_BASE, NODE, INSTANCE, SERVICE_INSTANCE, HEARTBEAT_TIME, STATUS)
|
||||
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP(6), 'STARTING')
|
||||
ON DUPLICATE KEY UPDATE
|
||||
INSTANCE = ?,
|
||||
SERVICE_INSTANCE = ?,
|
||||
HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
|
||||
STATUS = 'STARTING'
|
||||
"""
|
||||
))
|
||||
{
|
||||
stmt.setString(1, taskName);
|
||||
stmt.setString(2, taskBase);
|
||||
stmt.setInt(3, node);
|
||||
stmt.setString(4, instanceUUID);
|
||||
stmt.setString(5, serviceInstanceUUID);
|
||||
stmt.setString(6, instanceUUID);
|
||||
stmt.setString(7, serviceInstanceUUID);
|
||||
stmt.executeUpdate();
|
||||
}
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
logger.error("ServiceHeartbeat failed to initialize", ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void heartbeatUpdate() throws SQLException {
|
||||
try (var connection = dataSource.getConnection()) {
|
||||
try (var stmt = connection.prepareStatement(
|
||||
"""
|
||||
UPDATE TASK_HEARTBEAT
|
||||
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
|
||||
STATUS = 'RUNNING',
|
||||
PROGRESS = ?,
|
||||
STAGE_NAME = ?
|
||||
WHERE INSTANCE = ?
|
||||
""")
|
||||
)
|
||||
{
|
||||
stmt.setInt(1, progress);
|
||||
stmt.setString(2, step);
|
||||
stmt.setString(3, instanceUUID);
|
||||
stmt.executeUpdate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void heartbeatStop() throws SQLException {
|
||||
try (var connection = dataSource.getConnection()) {
|
||||
try (var stmt = connection.prepareStatement(
|
||||
"""
|
||||
UPDATE TASK_HEARTBEAT
|
||||
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
|
||||
STATUS='STOPPED',
|
||||
PROGRESS = ?,
|
||||
STAGE_NAME = ?
|
||||
WHERE INSTANCE = ?
|
||||
""")
|
||||
)
|
||||
{
|
||||
stmt.setInt(1, progress);
|
||||
stmt.setString( 2, step);
|
||||
stmt.setString( 3, instanceUUID);
|
||||
stmt.executeUpdate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
shutDown();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,4 +5,5 @@ import com.google.inject.ImplementedBy;
|
||||
@ImplementedBy(ServiceHeartbeatImpl.class)
|
||||
public interface ServiceHeartbeat {
|
||||
<T extends Enum<T>> ServiceTaskHeartbeat<T> createServiceTaskHeartbeat(Class<T> steps, String processName);
|
||||
ServiceAdHocTaskHeartbeat createServiceAdHocTaskHeartbeat(String taskName);
|
||||
}
|
||||
|
@ -54,6 +54,11 @@ public class ServiceHeartbeatImpl implements ServiceHeartbeat {
|
||||
return new ServiceTaskHeartbeatImpl<>(steps, configuration, processName, eventLog, dataSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceAdHocTaskHeartbeat createServiceAdHocTaskHeartbeat(String taskName) {
|
||||
return new ServiceAdHocTaskHeartbeatImpl(configuration, taskName, dataSource);
|
||||
}
|
||||
|
||||
|
||||
public void start() {
|
||||
if (!running) {
|
||||
|
@ -9,23 +9,25 @@ import nu.marginalia.executor.storage.FileStorageFile;
|
||||
import nu.marginalia.executor.upload.UploadDirContents;
|
||||
import nu.marginalia.executor.upload.UploadDirItem;
|
||||
import nu.marginalia.functions.execution.api.*;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
||||
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
|
||||
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.*;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
|
||||
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.*;
|
||||
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
|
||||
|
||||
@Singleton
|
||||
public class ExecutorClient {
|
||||
@ -163,8 +165,8 @@ public class ExecutorClient {
|
||||
* The endpoint is compatible with range requests.
|
||||
* */
|
||||
public URL remoteFileURL(FileStorage fileStorage, String path) {
|
||||
String uriPath = STR."/transfer/file/\{fileStorage.id()}";
|
||||
String uriQuery = STR."path=\{URLEncoder.encode(path, StandardCharsets.UTF_8)}";
|
||||
String uriPath = "/transfer/file/" + fileStorage.id();
|
||||
String uriQuery = "path=" + URLEncoder.encode(path, StandardCharsets.UTF_8);
|
||||
|
||||
var endpoints = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, fileStorage.node()));
|
||||
if (endpoints.isEmpty()) {
|
||||
|
@ -35,6 +35,7 @@ dependencies {
|
||||
implementation project(':code:libraries:term-frequency-dict')
|
||||
|
||||
implementation project(':code:functions:link-graph:api')
|
||||
implementation project(':code:functions:live-capture:api')
|
||||
implementation project(':code:functions:search-query')
|
||||
implementation project(':code:execution:api')
|
||||
|
||||
|
@ -22,7 +22,8 @@ public enum ExecutorActor {
|
||||
RESTORE_BACKUP,
|
||||
EXPORT_SAMPLE_DATA,
|
||||
DOWNLOAD_SAMPLE,
|
||||
SCRAPE_FEEDS;
|
||||
SCRAPE_FEEDS,
|
||||
UPDATE_RSS;
|
||||
|
||||
public String id() {
|
||||
return "fsm:" + name().toLowerCase();
|
||||
|
@ -50,7 +50,8 @@ public class ExecutorActorControlService {
|
||||
ExportSegmentationModelActor exportSegmentationModelActor,
|
||||
DownloadSampleActor downloadSampleActor,
|
||||
ScrapeFeedsActor scrapeFeedsActor,
|
||||
ExecutorActorStateMachines stateMachines) {
|
||||
ExecutorActorStateMachines stateMachines,
|
||||
UpdateRssActor updateRssActor) {
|
||||
this.messageQueueFactory = messageQueueFactory;
|
||||
this.eventLog = baseServiceParams.eventLog;
|
||||
this.stateMachines = stateMachines;
|
||||
@ -83,6 +84,7 @@ public class ExecutorActorControlService {
|
||||
register(ExecutorActor.DOWNLOAD_SAMPLE, downloadSampleActor);
|
||||
|
||||
register(ExecutorActor.SCRAPE_FEEDS, scrapeFeedsActor);
|
||||
register(ExecutorActor.UPDATE_RSS, updateRssActor);
|
||||
}
|
||||
|
||||
private void register(ExecutorActor process, RecordActorPrototype graph) {
|
||||
|
@ -0,0 +1,76 @@
|
||||
package nu.marginalia.actor.proc;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||
import nu.marginalia.actor.state.ActorResumeBehavior;
|
||||
import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.actor.state.Resume;
|
||||
import nu.marginalia.api.feeds.FeedsClient;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
public class UpdateRssActor extends RecordActorPrototype {
|
||||
|
||||
private final FeedsClient feedsClient;
|
||||
private final int nodeId;
|
||||
|
||||
private final Duration initialDelay = Duration.ofMinutes(5);
|
||||
private final Duration updateInterval = Duration.ofHours(35);
|
||||
|
||||
@Inject
|
||||
public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration) {
|
||||
super(gson);
|
||||
this.feedsClient = feedsClient;
|
||||
this.nodeId = serviceConfiguration.node();
|
||||
}
|
||||
|
||||
public record Initial() implements ActorStep {}
|
||||
@Resume(behavior = ActorResumeBehavior.RETRY)
|
||||
public record Wait(String ts) implements ActorStep {}
|
||||
@Resume(behavior = ActorResumeBehavior.RESTART)
|
||||
public record Update() implements ActorStep {}
|
||||
|
||||
|
||||
@Override
|
||||
public ActorStep transition(ActorStep self) throws Exception {
|
||||
return switch (self) {
|
||||
case Initial() -> {
|
||||
if (nodeId > 1) {
|
||||
// Only run on the first node
|
||||
yield new End();
|
||||
}
|
||||
else {
|
||||
// Wait for 5 minutes before starting the first update, to give the system time to start up properly
|
||||
yield new Wait(LocalDateTime.now().plus(initialDelay).toString());
|
||||
}
|
||||
}
|
||||
case Wait(String untilTs) -> {
|
||||
var until = LocalDateTime.parse(untilTs);
|
||||
var now = LocalDateTime.now();
|
||||
|
||||
long remaining = Duration.between(now, until).toMillis();
|
||||
|
||||
if (remaining > 0) {
|
||||
Thread.sleep(remaining);
|
||||
yield new Wait(untilTs);
|
||||
}
|
||||
else {
|
||||
yield new Update();
|
||||
}
|
||||
}
|
||||
case Update() -> {
|
||||
feedsClient.updateFeeds();
|
||||
yield new Wait(LocalDateTime.now().plus(updateInterval).toString());
|
||||
}
|
||||
default -> new Error("Unknown actor step: " + self);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String describe() {
|
||||
return "Periodically updates RSS and Atom feeds";
|
||||
}
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
plugins {
|
||||
id 'java'
|
||||
id 'jvm-test-suite'
|
||||
}
|
||||
|
||||
java {
|
||||
toolchain {
|
||||
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
|
||||
}
|
||||
}
|
||||
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
implementation libs.notnull
|
||||
implementation libs.gson
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
testImplementation libs.bundles.junit
|
||||
testImplementation libs.mockito
|
||||
|
||||
}
|
@ -1,58 +0,0 @@
|
||||
package nu.marginalia.feedlot;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import nu.marginalia.feedlot.model.FeedItems;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class FeedlotClient {
|
||||
private final String feedlotHost;
|
||||
private final int feedlotPort;
|
||||
private final Gson gson;
|
||||
private final HttpClient httpClient;
|
||||
private final Duration requestTimeout;
|
||||
|
||||
public FeedlotClient(String feedlotHost,
|
||||
int feedlotPort,
|
||||
Gson gson,
|
||||
Duration connectTimeout,
|
||||
Duration requestTimeout
|
||||
)
|
||||
{
|
||||
this.feedlotHost = feedlotHost;
|
||||
this.feedlotPort = feedlotPort;
|
||||
this.gson = gson;
|
||||
|
||||
httpClient = HttpClient.newBuilder()
|
||||
.executor(Executors.newCachedThreadPool())
|
||||
.connectTimeout(connectTimeout)
|
||||
.build();
|
||||
this.requestTimeout = requestTimeout;
|
||||
}
|
||||
|
||||
public CompletableFuture<FeedItems> getFeedItems(String domainName) {
|
||||
return httpClient.sendAsync(
|
||||
HttpRequest.newBuilder()
|
||||
.uri(URI.create("http://%s:%d/feed/%s".formatted(feedlotHost, feedlotPort, domainName)))
|
||||
.GET()
|
||||
.timeout(requestTimeout)
|
||||
.build(),
|
||||
HttpResponse.BodyHandlers.ofString()
|
||||
).thenApply(HttpResponse::body)
|
||||
.thenApply(this::parseFeedItems);
|
||||
}
|
||||
|
||||
private FeedItems parseFeedItems(String s) {
|
||||
return gson.fromJson(s, FeedItems.class);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
httpClient.close();
|
||||
}
|
||||
}
|
@ -1,17 +0,0 @@
|
||||
package nu.marginalia.feedlot.model;
|
||||
|
||||
public record FeedItem(String title, String date, String description, String url) {
|
||||
|
||||
public String pubDay() { // Extract the date from an ISO style date string
|
||||
if (date.length() > 10) {
|
||||
return date.substring(0, 10);
|
||||
}
|
||||
return date;
|
||||
}
|
||||
|
||||
public String descriptionSafe() {
|
||||
return description
|
||||
.replace("<", "<")
|
||||
.replace(">", ">");
|
||||
}
|
||||
}
|
@ -1,6 +0,0 @@
|
||||
package nu.marginalia.feedlot.model;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public record FeedItems(String domain, String feedUrl, String updated, List<FeedItem> items) {
|
||||
}
|
@ -1,20 +0,0 @@
|
||||
Client for [FeedlotTheFeedBot](https://github.com/MarginaliaSearch/FeedLotTheFeedBot),
|
||||
the RSS/Atom feed fetcher and cache for Marginalia Search.
|
||||
|
||||
This service is external to the Marginalia Search codebase,
|
||||
as it is not a core part of the search engine and has other
|
||||
utilities.
|
||||
|
||||
## Example
|
||||
|
||||
```java
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
var client = new FeedlotClient("localhost", 8080,
|
||||
gson,
|
||||
Duration.ofMillis(100), // connect timeout
|
||||
Duration.ofMillis(100)); // request timeout
|
||||
|
||||
CompleteableFuture<FeedItems> items = client.getFeedItems("www.marginalia.nu");
|
||||
```
|
@ -0,0 +1,49 @@
|
||||
package nu.marginalia.api.feeds;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
||||
import nu.marginalia.service.client.GrpcSingleNodeChannelPool;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class FeedsClient {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FeedsClient.class);
|
||||
private final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
private final GrpcSingleNodeChannelPool<FeedApiGrpc.FeedApiBlockingStub> channelPool;
|
||||
|
||||
@Inject
|
||||
public FeedsClient(GrpcChannelPoolFactory factory) {
|
||||
// The client is only interested in the primary node
|
||||
var key = ServiceKey.forGrpcApi(FeedApiGrpc.class, ServicePartition.any());
|
||||
this.channelPool = factory.createSingle(key, FeedApiGrpc::newBlockingStub);
|
||||
}
|
||||
|
||||
|
||||
public CompletableFuture<RpcFeed> getFeed(int domainId) {
|
||||
try {
|
||||
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeed)
|
||||
.async(executorService)
|
||||
.run(RpcDomainId.newBuilder().setDomainId(domainId).build());
|
||||
}
|
||||
catch (Exception e) {
|
||||
return CompletableFuture.failedFuture(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateFeeds() {
|
||||
try {
|
||||
channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds)
|
||||
.run(Empty.getDefaultInstance());
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("API Exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
syntax="proto3";
|
||||
package nu.marginalia.api.feeds;
|
||||
|
||||
option java_package="nu.marginalia.api.feeds";
|
||||
option java_multiple_files=true;
|
||||
|
||||
|
||||
service FeedApi {
|
||||
rpc getFeed(RpcDomainId) returns (RpcFeed) {}
|
||||
rpc updateFeeds(Empty) returns (Empty) {}
|
||||
}
|
||||
|
||||
message RpcDomainId {
|
||||
int32 domainId = 1;
|
||||
}
|
||||
|
||||
message RpcFeed {
|
||||
int32 domainId = 1;
|
||||
string domain = 2;
|
||||
string feedUrl = 3;
|
||||
string updated = 4;
|
||||
repeated RpcFeedItem items = 5;
|
||||
}
|
||||
|
||||
message RpcFeedItem {
|
||||
string title = 1;
|
||||
string date = 2;
|
||||
string description = 3;
|
||||
string url = 4;
|
||||
}
|
||||
|
||||
message Empty {}
|
@ -4,15 +4,12 @@ package nu.marginalia.api.livecapture;
|
||||
option java_package="nu.marginalia.api.livecapture";
|
||||
option java_multiple_files=true;
|
||||
|
||||
service LiveCaptureApi {
|
||||
rpc requestScreengrab(RpcDomainId) returns (Empty) {}
|
||||
}
|
||||
|
||||
message Void {
|
||||
}
|
||||
|
||||
message RpcDomainId {
|
||||
int32 domainId = 1;
|
||||
}
|
||||
|
||||
service LiveCaptureApi {
|
||||
rpc requestScreengrab(RpcDomainId) returns (Empty) {}
|
||||
}
|
||||
|
||||
message Empty {}
|
@ -21,6 +21,12 @@ dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:db')
|
||||
|
||||
implementation project(':code:execution:api')
|
||||
|
||||
implementation libs.jsoup
|
||||
implementation libs.rssreader
|
||||
implementation libs.opencsv
|
||||
implementation libs.sqlite
|
||||
implementation libs.bundles.slf4j
|
||||
implementation libs.commons.lang3
|
||||
|
||||
|
@ -0,0 +1,144 @@
|
||||
package nu.marginalia.rss.db;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.rss.model.FeedDefinition;
|
||||
import nu.marginalia.rss.model.FeedItems;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Singleton
|
||||
public class FeedDb {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FeedDb.class);
|
||||
|
||||
private static final String dbFileName = "rss-feeds.db";
|
||||
|
||||
private final Path readerDbPath;
|
||||
private volatile FeedDbReader reader;
|
||||
|
||||
private final boolean feedDbEnabled;
|
||||
|
||||
@Inject
|
||||
public FeedDb(ServiceConfiguration serviceConfiguration) {
|
||||
feedDbEnabled = serviceConfiguration.node() <= 1;
|
||||
readerDbPath = WmsaHome.getDataPath().resolve(dbFileName);
|
||||
|
||||
if (!feedDbEnabled) {
|
||||
logger.info("Feed database is disabled on this node");
|
||||
}
|
||||
else {
|
||||
try {
|
||||
reader = new FeedDbReader(readerDbPath);
|
||||
} catch (Exception e) {
|
||||
reader = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isEnabled() {
|
||||
return feedDbEnabled;
|
||||
}
|
||||
|
||||
public List<FeedDefinition> getAllFeeds() {
|
||||
if (!feedDbEnabled) {
|
||||
throw new IllegalStateException("Feed database is disabled on this node");
|
||||
}
|
||||
|
||||
// Capture the current reader to avoid concurrency issues
|
||||
FeedDbReader reader = this.reader;
|
||||
|
||||
try {
|
||||
if (reader != null) {
|
||||
return reader.getAllFeeds();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Error getting all feeds", e);
|
||||
}
|
||||
return List.of();
|
||||
}
|
||||
|
||||
@NotNull
|
||||
public FeedItems getFeed(EdgeDomain domain) {
|
||||
if (!feedDbEnabled) {
|
||||
throw new IllegalStateException("Feed database is disabled on this node");
|
||||
}
|
||||
|
||||
// Capture the current reader to avoid concurrency issues
|
||||
FeedDbReader reader = this.reader;
|
||||
try {
|
||||
if (reader != null) {
|
||||
return reader.getFeed(domain);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Error getting feed for " + domain, e);
|
||||
}
|
||||
return FeedItems.none();
|
||||
}
|
||||
|
||||
public Optional<String> getFeedAsJson(String domain) {
|
||||
if (!feedDbEnabled) {
|
||||
throw new IllegalStateException("Feed database is disabled on this node");
|
||||
}
|
||||
|
||||
// Capture the current reader to avoid concurrency issues
|
||||
FeedDbReader reader = this.reader;
|
||||
|
||||
try {
|
||||
if (reader != null) {
|
||||
return reader.getFeedAsJson(domain);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Error getting feed for " + domain, e);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public FeedDbWriter createWriter() {
|
||||
if (!feedDbEnabled) {
|
||||
throw new IllegalStateException("Feed database is disabled on this node");
|
||||
}
|
||||
|
||||
try {
|
||||
Path dbFile = Files.createTempFile(WmsaHome.getDataPath(), "rss-feeds", ".tmp.db");
|
||||
return new FeedDbWriter(dbFile);
|
||||
} catch (Exception e) {
|
||||
logger.error("Error creating new database writer", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void switchDb(FeedDbWriter writer) {
|
||||
if (!feedDbEnabled) {
|
||||
throw new IllegalStateException("Feed database is disabled on this node");
|
||||
}
|
||||
|
||||
try {
|
||||
logger.info("Switching to new feed database from " + writer.getDbPath() + " to " + readerDbPath);
|
||||
|
||||
writer.close();
|
||||
reader.close();
|
||||
|
||||
Files.move(writer.getDbPath(), readerDbPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
|
||||
|
||||
reader = new FeedDbReader(readerDbPath);
|
||||
} catch (Exception e) {
|
||||
logger.error("Fatal error switching to new feed database", e);
|
||||
|
||||
reader = null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,98 @@
|
||||
package nu.marginalia.rss.db;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.rss.model.FeedDefinition;
|
||||
import nu.marginalia.rss.model.FeedItems;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public class FeedDbReader implements AutoCloseable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FeedDbReader.class);
|
||||
private static final Gson gson = new GsonBuilder().create();
|
||||
private final Connection connection;
|
||||
|
||||
public FeedDbReader(Path dbPath) throws SQLException {
|
||||
String dbUrl = "jdbc:sqlite:" + dbPath.toAbsolutePath();
|
||||
|
||||
logger.info("Opening feed db at " + dbUrl);
|
||||
|
||||
connection = DriverManager.getConnection(dbUrl);
|
||||
|
||||
// Create table if it doesn't exist to avoid errors before any feeds have been fetched
|
||||
try (var stmt = connection.createStatement()) {
|
||||
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)");
|
||||
}
|
||||
}
|
||||
|
||||
public List<FeedDefinition> getAllFeeds() {
|
||||
List<FeedDefinition> ret = new ArrayList<>();
|
||||
|
||||
try (var stmt = connection.createStatement()) {
|
||||
var rs = stmt.executeQuery("""
|
||||
select
|
||||
json_extract(feed, '$.domain') as domain,
|
||||
json_extract(feed, '$.feedUrl') as url
|
||||
from feed
|
||||
""");
|
||||
|
||||
while (rs.next()) {
|
||||
ret.add(new FeedDefinition(rs.getString("domain"), rs.getString("url")));
|
||||
}
|
||||
|
||||
} catch (SQLException e) {
|
||||
logger.error("Error getting all feeds", e);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
public Optional<String> getFeedAsJson(String domain) {
|
||||
try (var stmt = connection.prepareStatement("SELECT FEED FROM feed WHERE DOMAIN = ?")) {
|
||||
stmt.setString(1, domain);
|
||||
|
||||
var rs = stmt.executeQuery();
|
||||
if (rs.next()) {
|
||||
return Optional.of(rs.getString(1));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.error("Error getting feed for " + domain, e);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public FeedItems getFeed(EdgeDomain domain) {
|
||||
try (var stmt = connection.prepareStatement("SELECT FEED FROM feed WHERE DOMAIN = ?")) {
|
||||
stmt.setString(1, domain.toString());
|
||||
var rs = stmt.executeQuery();
|
||||
|
||||
if (rs.next()) {
|
||||
return deserialize(rs.getString(1));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.error("Error getting feed for " + domain, e);
|
||||
}
|
||||
|
||||
return FeedItems.none();
|
||||
}
|
||||
|
||||
private FeedItems deserialize(String string) {
|
||||
return gson.fromJson(string, FeedItems.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws SQLException {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,75 @@
|
||||
package nu.marginalia.rss.db;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import nu.marginalia.rss.model.FeedItems;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
|
||||
public class FeedDbWriter implements AutoCloseable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FeedDbWriter.class);
|
||||
|
||||
private static final Gson gson = new GsonBuilder().create();
|
||||
|
||||
private final Connection connection;
|
||||
private final PreparedStatement insertFeedStmt;
|
||||
private final Path dbPath;
|
||||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
public FeedDbWriter(Path dbPath) throws SQLException {
|
||||
String dbUrl = "jdbc:sqlite:" + dbPath.toAbsolutePath();
|
||||
|
||||
this.dbPath = dbPath;
|
||||
|
||||
connection = DriverManager.getConnection(dbUrl);
|
||||
|
||||
try (var stmt = connection.createStatement()) {
|
||||
// Disable synchronous writes for speed. We don't care about recovery.
|
||||
//
|
||||
// If this operation fails we just retry the entire operation from scratch
|
||||
// with a new db file.
|
||||
|
||||
stmt.execute("PRAGMA synchronous = OFF");
|
||||
|
||||
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)");
|
||||
}
|
||||
|
||||
insertFeedStmt = connection.prepareStatement("INSERT INTO feed (domain, feed) VALUES (?, ?)");
|
||||
}
|
||||
|
||||
public Path getDbPath() {
|
||||
return dbPath;
|
||||
}
|
||||
|
||||
|
||||
public synchronized void saveFeed(FeedItems items) {
|
||||
try {
|
||||
insertFeedStmt.setString(1, items.domain().toLowerCase());
|
||||
insertFeedStmt.setString(2, serialize(items));
|
||||
insertFeedStmt.executeUpdate();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
logger.error("Error saving feed for " + items.domain(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private String serialize(FeedItems items) {
|
||||
return gson.toJson(items);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws SQLException {
|
||||
if (!closed) {
|
||||
insertFeedStmt.close();
|
||||
connection.close();
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,3 @@
|
||||
package nu.marginalia.rss.model;
|
||||
|
||||
public record FeedDefinition(String domain, String feedUrl) { }
|
@ -0,0 +1,62 @@
|
||||
package nu.marginalia.rss.model;
|
||||
|
||||
import com.apptasticsoftware.rssreader.Item;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jsoup.Jsoup;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Optional;
|
||||
|
||||
public record FeedItem(String title,
|
||||
String date,
|
||||
String description,
|
||||
String url) implements Comparable<FeedItem>
|
||||
{
|
||||
public static final int MAX_DESC_LENGTH = 255;
|
||||
public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
|
||||
|
||||
public static FeedItem fromItem(Item item) {
|
||||
String title = item.getTitle().orElse("");
|
||||
String date = getItemDate(item);
|
||||
String description = getItemDescription(item);
|
||||
String url = item.getLink().orElse("");
|
||||
|
||||
return new FeedItem(title, date, description, url);
|
||||
}
|
||||
|
||||
private static String getItemDescription(Item item) {
|
||||
Optional<String> description = item.getDescription();
|
||||
if (description.isEmpty())
|
||||
return "";
|
||||
|
||||
String rawDescription = description.get();
|
||||
if (rawDescription.indexOf('<') >= 0) {
|
||||
rawDescription = Jsoup.parseBodyFragment(rawDescription).text();
|
||||
}
|
||||
|
||||
return StringUtils.truncate(rawDescription, MAX_DESC_LENGTH);
|
||||
}
|
||||
|
||||
// e.g. http://fabiensanglard.net/rss.xml does dates like this: 1 Apr 2021 00:00:00 +0000
|
||||
private static final DateTimeFormatter extraFormatter = DateTimeFormatter.ofPattern("d MMM yyyy HH:mm:ss Z");
|
||||
private static String getItemDate(Item item) {
|
||||
Optional<ZonedDateTime> zonedDateTime = Optional.empty();
|
||||
try {
|
||||
zonedDateTime = item.getPubDateZonedDateTime();
|
||||
}
|
||||
catch (Exception e) {
|
||||
zonedDateTime = item.getPubDate()
|
||||
.map(extraFormatter::parse)
|
||||
.map(ZonedDateTime::from);
|
||||
}
|
||||
|
||||
return zonedDateTime.map(date -> date.format(DATE_FORMAT)).orElse("");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull FeedItem o) {
|
||||
return o.date.compareTo(date);
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package nu.marginalia.rss.model;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public record FeedItems(String domain,
|
||||
String feedUrl,
|
||||
String updated,
|
||||
List<FeedItem> items) {
|
||||
|
||||
// List.of() is immutable, so we can use the same instance for all empty FeedItems
|
||||
private static final FeedItems EMPTY = new FeedItems("", "","1970-01-01T00:00:00.000+00000", List.of());
|
||||
public static FeedItems none() {
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return items.isEmpty();
|
||||
}
|
||||
|
||||
public Optional<FeedItem> getLatest() {
|
||||
if (items.isEmpty())
|
||||
return Optional.empty();
|
||||
|
||||
return Optional.of(
|
||||
items.getFirst()
|
||||
);
|
||||
}
|
||||
|
||||
public Optional<String> getLatestDate() {
|
||||
return getLatest().map(FeedItem::date);
|
||||
}
|
||||
}
|
@ -0,0 +1,205 @@
|
||||
package nu.marginalia.rss.svc;
|
||||
|
||||
import com.apptasticsoftware.rssreader.RssReader;
|
||||
import com.google.inject.Inject;
|
||||
import com.opencsv.CSVReader;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.executor.client.ExecutorClient;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.rss.db.FeedDb;
|
||||
import nu.marginalia.rss.model.FeedDefinition;
|
||||
import nu.marginalia.rss.model.FeedItem;
|
||||
import nu.marginalia.rss.model.FeedItems;
|
||||
import nu.marginalia.service.control.ServiceHeartbeat;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.sql.SQLException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
public class FeedFetcherService {
|
||||
private static final int MAX_FEED_ITEMS = 10;
|
||||
private static final Logger logger = LoggerFactory.getLogger(FeedFetcherService.class);
|
||||
|
||||
private final RssReader rssReader = new RssReader(
|
||||
HttpClient.newBuilder().executor(Executors.newWorkStealingPool(16)).build()
|
||||
);
|
||||
|
||||
private final FeedDb feedDb;
|
||||
private final FileStorageService fileStorageService;
|
||||
private final NodeConfigurationService nodeConfigurationService;
|
||||
private final ServiceHeartbeat serviceHeartbeat;
|
||||
private final ExecutorClient executorClient;
|
||||
|
||||
private volatile boolean updating;
|
||||
|
||||
@Inject
|
||||
public FeedFetcherService(FeedDb feedDb,
|
||||
FileStorageService fileStorageService,
|
||||
NodeConfigurationService nodeConfigurationService,
|
||||
ServiceHeartbeat serviceHeartbeat,
|
||||
ExecutorClient executorClient)
|
||||
{
|
||||
this.feedDb = feedDb;
|
||||
this.fileStorageService = fileStorageService;
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
this.serviceHeartbeat = serviceHeartbeat;
|
||||
this.executorClient = executorClient;
|
||||
}
|
||||
|
||||
public void updateFeeds() throws IOException {
|
||||
rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher");
|
||||
|
||||
if (updating) // Prevent concurrent updates
|
||||
{
|
||||
logger.error("Already updating feeds, refusing to start another update");
|
||||
return;
|
||||
}
|
||||
|
||||
try (var writer = feedDb.createWriter()) {
|
||||
updating = true;
|
||||
var definitions = readDefinitions();
|
||||
|
||||
logger.info("Found {} feed definitions", definitions.size());
|
||||
|
||||
for (var feed: definitions) {
|
||||
var items = fetchFeed(feed);
|
||||
if (!items.isEmpty()) {
|
||||
writer.saveFeed(items);
|
||||
}
|
||||
}
|
||||
|
||||
feedDb.switchDb(writer);
|
||||
|
||||
} catch (SQLException e) {
|
||||
logger.error("Error updating feeds", e);
|
||||
}
|
||||
finally {
|
||||
updating = false;
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<FeedDefinition> readDefinitions() throws IOException {
|
||||
Collection<FileStorage> storages = getLatestFeedStorages();
|
||||
List<FeedDefinition> feedDefinitionList = new ArrayList<>();
|
||||
|
||||
// Download and parse feeds.csv.gz from each relevant storage
|
||||
int updated = 0;
|
||||
try (var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")) {
|
||||
for (var storage : storages) {
|
||||
var url = executorClient.remoteFileURL(storage, "feeds.csv.gz");
|
||||
|
||||
heartbeat.progress("Fetch RSS/Atom feeds", 0, storages.size());
|
||||
|
||||
try (var feedStream = new GZIPInputStream(url.openStream())) {
|
||||
CSVReader reader = new CSVReader(new java.io.InputStreamReader(feedStream));
|
||||
|
||||
for (String[] row : reader) {
|
||||
if (row.length < 3) {
|
||||
continue;
|
||||
}
|
||||
var domain = row[0].trim();
|
||||
var feedUrl = row[2].trim();
|
||||
|
||||
feedDefinitionList.add(new FeedDefinition(domain, feedUrl));
|
||||
}
|
||||
|
||||
heartbeat.progress("Fetch RSS/Atom feeds", ++updated, storages.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return feedDefinitionList;
|
||||
}
|
||||
|
||||
private Collection<FileStorage> getLatestFeedStorages() {
|
||||
// Find the newest feed storage for each node
|
||||
|
||||
Map<Integer, FileStorage> newestStorageByNode = new HashMap<>();
|
||||
|
||||
for (var node : nodeConfigurationService.getAll()) {
|
||||
int nodeId = node.node();
|
||||
|
||||
for (var storage: fileStorageService.getEachFileStorage(nodeId, FileStorageType.EXPORT)) {
|
||||
if (!storage.description().startsWith("Feeds "))
|
||||
continue;
|
||||
|
||||
newestStorageByNode.compute(storage.node(), new KeepNewerFeedStorage(storage));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return newestStorageByNode.values();
|
||||
}
|
||||
|
||||
|
||||
private static class KeepNewerFeedStorage implements BiFunction<Integer, FileStorage, FileStorage> {
|
||||
private final FileStorage newValue;
|
||||
|
||||
private KeepNewerFeedStorage(FileStorage newValue) {
|
||||
this.newValue = newValue;
|
||||
}
|
||||
|
||||
public FileStorage apply(Integer node, @Nullable FileStorage oldValue) {
|
||||
if (oldValue == null) return newValue;
|
||||
|
||||
return newValue.createDateTime().isAfter(oldValue.createDateTime())
|
||||
? newValue
|
||||
: oldValue;
|
||||
}
|
||||
}
|
||||
|
||||
public FeedItems fetchFeed(FeedDefinition definition) {
|
||||
try {
|
||||
var items = rssReader.read(definition.feedUrl())
|
||||
.map(FeedItem::fromItem)
|
||||
.filter(new IsFeedItemDateValid())
|
||||
.sorted()
|
||||
.limit(MAX_FEED_ITEMS)
|
||||
.toList();
|
||||
|
||||
return new FeedItems(
|
||||
definition.domain(),
|
||||
definition.feedUrl(),
|
||||
LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
|
||||
items);
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.warn("Failed to read feed {}: {}", definition.feedUrl(), e.getMessage());
|
||||
|
||||
logger.debug("Exception", e);
|
||||
return FeedItems.none();
|
||||
}
|
||||
}
|
||||
|
||||
private static class IsFeedItemDateValid implements Predicate<FeedItem> {
|
||||
private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
|
||||
|
||||
public boolean test(FeedItem item) {
|
||||
var date = item.date();
|
||||
|
||||
if (date.isBlank()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (date.compareTo(today) > 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,93 @@
|
||||
package nu.marginalia.rss.svc;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import nu.marginalia.api.feeds.Empty;
|
||||
import nu.marginalia.api.feeds.FeedApiGrpc;
|
||||
import nu.marginalia.api.feeds.RpcDomainId;
|
||||
import nu.marginalia.api.feeds.RpcFeed;
|
||||
import nu.marginalia.db.DbDomainQueries;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.rss.db.FeedDb;
|
||||
import nu.marginalia.rss.model.FeedItems;
|
||||
import nu.marginalia.service.server.DiscoverableService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
|
||||
public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements DiscoverableService {
|
||||
private final FeedDb feedDb;
|
||||
private final DbDomainQueries domainQueries;
|
||||
private final FeedFetcherService feedFetcherService;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FeedsGrpcService.class);
|
||||
|
||||
@Inject
|
||||
public FeedsGrpcService(FeedDb feedDb,
|
||||
DbDomainQueries domainQueries,
|
||||
FeedFetcherService feedFetcherService) {
|
||||
this.feedDb = feedDb;
|
||||
this.domainQueries = domainQueries;
|
||||
this.feedFetcherService = feedFetcherService;
|
||||
}
|
||||
|
||||
// Ensure that the service is only registered if it is enabled
|
||||
@Override
|
||||
public boolean shouldRegisterService() {
|
||||
return feedDb.isEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateFeeds(Empty request,
|
||||
StreamObserver<Empty> responseObserver)
|
||||
{
|
||||
Thread.ofPlatform().start(() -> {
|
||||
try {
|
||||
feedFetcherService.updateFeeds();
|
||||
} catch (IOException e) {
|
||||
logger.error("Failed to update feeds", e);
|
||||
}
|
||||
});
|
||||
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getFeed(RpcDomainId request,
|
||||
StreamObserver<RpcFeed> responseObserver)
|
||||
{
|
||||
if (!feedDb.isEnabled()) {
|
||||
responseObserver.onError(new IllegalStateException("Feed database is disabled on this node"));
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<EdgeDomain> domainName = domainQueries.getDomain(request.getDomainId());
|
||||
if (domainName.isEmpty()) {
|
||||
responseObserver.onError(new IllegalArgumentException("Domain not found"));
|
||||
return;
|
||||
}
|
||||
|
||||
FeedItems feedItems = feedDb.getFeed(domainName.get());
|
||||
|
||||
RpcFeed.Builder retB = RpcFeed.newBuilder()
|
||||
.setDomainId(request.getDomainId())
|
||||
.setDomain(domainName.get().toString())
|
||||
.setFeedUrl(feedItems.feedUrl())
|
||||
.setUpdated(feedItems.updated());
|
||||
|
||||
for (var item : feedItems.items()) {
|
||||
retB.addItemsBuilder()
|
||||
.setTitle(item.title())
|
||||
.setUrl(item.url())
|
||||
.setDescription(item.description())
|
||||
.setDate(item.date())
|
||||
.build();
|
||||
}
|
||||
|
||||
responseObserver.onNext(retB.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
}
|
@ -53,7 +53,6 @@ dependencies {
|
||||
|
||||
implementation project(':code:features-search:screenshots')
|
||||
implementation project(':code:features-search:random-websites')
|
||||
implementation project(':code:features-search:feedlot-client')
|
||||
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
|
@ -1,15 +1,10 @@
|
||||
package nu.marginalia.search;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
import nu.marginalia.LanguageModels;
|
||||
import nu.marginalia.WebsiteUrl;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.renderer.config.HandlebarsConfigurator;
|
||||
import nu.marginalia.feedlot.FeedlotClient;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
public class SearchModule extends AbstractModule {
|
||||
|
||||
@ -22,14 +17,4 @@ public class SearchModule extends AbstractModule {
|
||||
System.getProperty("search.websiteUrl", "https://search.marginalia.nu/")));
|
||||
}
|
||||
|
||||
@Provides
|
||||
public FeedlotClient provideFeedlotClient() {
|
||||
return new FeedlotClient(
|
||||
System.getProperty("ext-svc-feedlot-bindAddress", "feedlot"),
|
||||
Integer.getInteger("ext-svc-feedlot-port", 80),
|
||||
GsonFactory.get(),
|
||||
Duration.ofMillis(250),
|
||||
Duration.ofMillis(100)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -4,10 +4,11 @@ import com.google.inject.Inject;
|
||||
import nu.marginalia.api.domains.DomainInfoClient;
|
||||
import nu.marginalia.api.domains.model.DomainInformation;
|
||||
import nu.marginalia.api.domains.model.SimilarDomain;
|
||||
import nu.marginalia.api.feeds.FeedsClient;
|
||||
import nu.marginalia.api.feeds.RpcFeed;
|
||||
import nu.marginalia.api.feeds.RpcFeedItem;
|
||||
import nu.marginalia.api.livecapture.LiveCaptureClient;
|
||||
import nu.marginalia.db.DbDomainQueries;
|
||||
import nu.marginalia.feedlot.FeedlotClient;
|
||||
import nu.marginalia.feedlot.model.FeedItems;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.renderer.MustacheRenderer;
|
||||
import nu.marginalia.renderer.RendererFactory;
|
||||
@ -37,7 +38,7 @@ public class SearchSiteInfoService {
|
||||
private final SearchFlagSiteService flagSiteService;
|
||||
private final DbDomainQueries domainQueries;
|
||||
private final MustacheRenderer<Object> renderer;
|
||||
private final FeedlotClient feedlotClient;
|
||||
private final FeedsClient feedsClient;
|
||||
private final LiveCaptureClient liveCaptureClient;
|
||||
private final ScreenshotService screenshotService;
|
||||
|
||||
@ -47,7 +48,7 @@ public class SearchSiteInfoService {
|
||||
RendererFactory rendererFactory,
|
||||
SearchFlagSiteService flagSiteService,
|
||||
DbDomainQueries domainQueries,
|
||||
FeedlotClient feedlotClient,
|
||||
FeedsClient feedsClient,
|
||||
LiveCaptureClient liveCaptureClient,
|
||||
ScreenshotService screenshotService) throws IOException
|
||||
{
|
||||
@ -58,7 +59,7 @@ public class SearchSiteInfoService {
|
||||
|
||||
this.renderer = rendererFactory.renderer("search/site-info/site-info");
|
||||
|
||||
this.feedlotClient = feedlotClient;
|
||||
this.feedsClient = feedsClient;
|
||||
this.liveCaptureClient = liveCaptureClient;
|
||||
this.screenshotService = screenshotService;
|
||||
}
|
||||
@ -135,26 +136,29 @@ public class SearchSiteInfoService {
|
||||
final Future<DomainInformation> domainInfoFuture;
|
||||
final Future<List<SimilarDomain>> similarSetFuture;
|
||||
final Future<List<SimilarDomain>> linkingDomainsFuture;
|
||||
|
||||
final CompletableFuture<RpcFeed> feedItemsFuture;
|
||||
String url = "https://" + domainName + "/";
|
||||
|
||||
boolean hasScreenshot = screenshotService.hasScreenshot(domainId);
|
||||
|
||||
var feedItemsFuture = feedlotClient.getFeedItems(domainName);
|
||||
|
||||
if (domainId < 0) {
|
||||
domainInfoFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
|
||||
similarSetFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
|
||||
linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
|
||||
feedItemsFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
|
||||
}
|
||||
else if (!domainInfoClient.isAccepting()) {
|
||||
domainInfoFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
|
||||
similarSetFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
|
||||
linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
|
||||
feedItemsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
|
||||
}
|
||||
else {
|
||||
domainInfoFuture = domainInfoClient.domainInformation(domainId);
|
||||
similarSetFuture = domainInfoClient.similarDomains(domainId, 25);
|
||||
linkingDomainsFuture = domainInfoClient.linkedDomains(domainId, 25);
|
||||
feedItemsFuture = feedsClient.getFeed(domainId);
|
||||
}
|
||||
|
||||
List<UrlDetails> sampleResults = searchOperator.doSiteSearch(domainName, domainId,5);
|
||||
@ -162,13 +166,6 @@ public class SearchSiteInfoService {
|
||||
url = sampleResults.getFirst().url.withPathAndParam("/", null).toString();
|
||||
}
|
||||
|
||||
FeedItems feedItems = null;
|
||||
try {
|
||||
feedItems = feedItemsFuture.get();
|
||||
} catch (Exception e) {
|
||||
logger.debug("Failed to get feed items for {}: {}", domainName, e.getMessage());
|
||||
}
|
||||
|
||||
var result = new SiteInfoWithContext(domainName,
|
||||
domainId,
|
||||
url,
|
||||
@ -176,7 +173,7 @@ public class SearchSiteInfoService {
|
||||
waitForFuture(domainInfoFuture, () -> createDummySiteInfo(domainName)),
|
||||
waitForFuture(similarSetFuture, List::of),
|
||||
waitForFuture(linkingDomainsFuture, List::of),
|
||||
feedItems,
|
||||
waitForFuture(feedItemsFuture.thenApply(FeedItems::new), () -> FeedItems.dummyValue(domainName)),
|
||||
sampleResults
|
||||
);
|
||||
|
||||
@ -356,6 +353,43 @@ public class SearchSiteInfoService {
|
||||
}
|
||||
}
|
||||
|
||||
public record FeedItem(String title, String date, String description, String url) {
|
||||
|
||||
public FeedItem(RpcFeedItem rpcFeedItem) {
|
||||
this(rpcFeedItem.getTitle(),
|
||||
rpcFeedItem.getDate(),
|
||||
rpcFeedItem.getDescription(),
|
||||
rpcFeedItem.getUrl());
|
||||
}
|
||||
|
||||
public String pubDay() { // Extract the date from an ISO style date string
|
||||
if (date.length() > 10) {
|
||||
return date.substring(0, 10);
|
||||
}
|
||||
return date;
|
||||
}
|
||||
|
||||
public String descriptionSafe() {
|
||||
return description
|
||||
.replace("<", "<")
|
||||
.replace(">", ">");
|
||||
}
|
||||
}
|
||||
|
||||
public record FeedItems(String domain, String feedUrl, String updated, List<FeedItem> items) {
|
||||
|
||||
public static FeedItems dummyValue(String domain) {
|
||||
return new FeedItems(domain, "", "", List.of());
|
||||
}
|
||||
|
||||
public FeedItems(RpcFeed rpcFeedItems) {
|
||||
this(rpcFeedItems.getDomain(),
|
||||
rpcFeedItems.getFeedUrl(),
|
||||
rpcFeedItems.getUpdated(),
|
||||
rpcFeedItems.getItemsList().stream().map(FeedItem::new).toList());
|
||||
}
|
||||
}
|
||||
|
||||
public record ReportDomain(
|
||||
Map<String, Boolean> view,
|
||||
String domain,
|
||||
|
@ -8,6 +8,7 @@ import nu.marginalia.functions.domains.DomainInfoGrpcService;
|
||||
import nu.marginalia.functions.math.MathGrpcService;
|
||||
import nu.marginalia.livecapture.LiveCaptureGrpcService;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.rss.svc.FeedsGrpcService;
|
||||
import nu.marginalia.screenshot.ScreenshotService;
|
||||
import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.server.BaseServiceParams;
|
||||
@ -31,10 +32,15 @@ public class AssistantService extends Service {
|
||||
ScreenshotService screenshotService,
|
||||
DomainInfoGrpcService domainInfoGrpcService,
|
||||
LiveCaptureGrpcService liveCaptureGrpcService,
|
||||
FeedsGrpcService feedsGrpcService,
|
||||
MathGrpcService mathGrpcService,
|
||||
Suggestions suggestions)
|
||||
{
|
||||
super(params, ServicePartition.any(), List.of(domainInfoGrpcService, mathGrpcService, liveCaptureGrpcService));
|
||||
super(params, ServicePartition.any(),
|
||||
List.of(domainInfoGrpcService,
|
||||
mathGrpcService,
|
||||
liveCaptureGrpcService,
|
||||
feedsGrpcService));
|
||||
|
||||
this.suggestions = suggestions;
|
||||
|
||||
|
@ -56,7 +56,6 @@ include 'code:libraries:message-queue'
|
||||
|
||||
include 'code:features-search:screenshots'
|
||||
include 'code:features-search:random-websites'
|
||||
include 'code:features-search:feedlot-client'
|
||||
|
||||
include 'code:processes:converting-process:ft-anchor-keywords'
|
||||
include 'code:execution:data-extractors'
|
||||
@ -155,6 +154,8 @@ dependencyResolutionManagement {
|
||||
library('prometheus-server', 'io.prometheus', 'simpleclient_httpserver').version('0.16.0')
|
||||
library('prometheus-hotspot', 'io.prometheus', 'simpleclient_hotspot').version('0.16.0')
|
||||
|
||||
library('rssreader', 'com.apptasticsoftware', 'rssreader').version('3.5.0')
|
||||
|
||||
library('slf4j.api', 'org.slf4j', 'slf4j-api').version('1.7.36')
|
||||
library('slf4j.jdk14', 'org.slf4j', 'slf4j-jdk14').version('2.0.3')
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user