Merge pull request #122 from MarginaliaSearch/fetch-rss-feeds

Automatic RSS feed polling
This commit is contained in:
Viktor 2024-11-10 18:35:28 +01:00 committed by GitHub
commit a31a3b53c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 1565 additions and 199 deletions

View File

@ -11,4 +11,14 @@ public class FakeServiceHeartbeat implements ServiceHeartbeat {
public void close() {}
};
}
@Override
public ServiceAdHocTaskHeartbeat createServiceAdHocTaskHeartbeat(String taskName) {
return new ServiceAdHocTaskHeartbeat() {
@Override
public void progress(String step, int stepProgress, int stepCount) {}
@Override
public void close() {}
};
}
}

View File

@ -0,0 +1,7 @@
package nu.marginalia.service.control;
public interface ServiceAdHocTaskHeartbeat extends AutoCloseable {
void progress(String step, int progress, int total);
void close();
}

View File

@ -0,0 +1,190 @@
package nu.marginalia.service.control;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/** This object sends a heartbeat to the database every few seconds,
* updating with the progress of a task within a service. Progress is tracked by providing
* enumerations corresponding to the steps in the task. It's important they're arranged in the same
* order as the steps in the task in order to get an accurate progress tracking.
*/
public class ServiceAdHocTaskHeartbeatImpl implements AutoCloseable, ServiceAdHocTaskHeartbeat {
private final Logger logger = LoggerFactory.getLogger(ServiceAdHocTaskHeartbeatImpl.class);
private final String taskName;
private final String taskBase;
private final int node;
private final String instanceUUID;
private final HikariDataSource dataSource;
private final Thread runnerThread;
private final int heartbeatInterval = Integer.getInteger("mcp.heartbeat.interval", 1);
private final String serviceInstanceUUID;
private int progress;
private volatile boolean running = false;
private volatile String step = "-";
ServiceAdHocTaskHeartbeatImpl(ServiceConfiguration configuration,
String taskName,
HikariDataSource dataSource)
{
this.taskName = configuration.serviceName() + "." + taskName + ":" + configuration.node();
this.taskBase = configuration.serviceName() + "." + taskName;
this.node = configuration.node();
this.dataSource = dataSource;
this.instanceUUID = UUID.randomUUID().toString();
this.serviceInstanceUUID = configuration.instanceUuid().toString();
heartbeatInit();
runnerThread = new Thread(this::run);
runnerThread.start();
}
/** Update the progress of the task. This is a fast function that doesn't block;
* the actual update is done in a separate thread.
*
* @param step The current step in the task.
*/
@Override
public void progress(String step, int stepProgress, int stepCount) {
this.step = step;
// off by one since we calculate the progress based on the number of steps,
// and Enum.ordinal() is zero-based (so the 5th step in a 5 step task is 4, not 5; resulting in the
// final progress being 80% and not 100%)
this.progress = (int) Math.round(100. * stepProgress / (double) stepCount);
logger.info("ServiceTask {} progress: {}%", taskBase, progress);
}
public void shutDown() {
if (!running)
return;
running = false;
try {
runnerThread.join();
heartbeatStop();
}
catch (InterruptedException|SQLException ex) {
logger.warn("ServiceHeartbeat shutdown failed", ex);
}
}
private void run() {
if (!running)
running = true;
else
return;
try {
while (running) {
try {
heartbeatUpdate();
}
catch (SQLException ex) {
logger.warn("ServiceHeartbeat failed to update", ex);
}
TimeUnit.SECONDS.sleep(heartbeatInterval);
}
}
catch (InterruptedException ex) {
logger.error("ServiceHeartbeat caught irrecoverable exception, killing service", ex);
System.exit(255);
}
}
private void heartbeatInit() {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
INSERT INTO TASK_HEARTBEAT (TASK_NAME, TASK_BASE, NODE, INSTANCE, SERVICE_INSTANCE, HEARTBEAT_TIME, STATUS)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP(6), 'STARTING')
ON DUPLICATE KEY UPDATE
INSTANCE = ?,
SERVICE_INSTANCE = ?,
HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
STATUS = 'STARTING'
"""
))
{
stmt.setString(1, taskName);
stmt.setString(2, taskBase);
stmt.setInt(3, node);
stmt.setString(4, instanceUUID);
stmt.setString(5, serviceInstanceUUID);
stmt.setString(6, instanceUUID);
stmt.setString(7, serviceInstanceUUID);
stmt.executeUpdate();
}
}
catch (SQLException ex) {
logger.error("ServiceHeartbeat failed to initialize", ex);
throw new RuntimeException(ex);
}
}
private void heartbeatUpdate() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE TASK_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
STATUS = 'RUNNING',
PROGRESS = ?,
STAGE_NAME = ?
WHERE INSTANCE = ?
""")
)
{
stmt.setInt(1, progress);
stmt.setString(2, step);
stmt.setString(3, instanceUUID);
stmt.executeUpdate();
}
}
}
private void heartbeatStop() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE TASK_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
STATUS='STOPPED',
PROGRESS = ?,
STAGE_NAME = ?
WHERE INSTANCE = ?
""")
)
{
stmt.setInt(1, progress);
stmt.setString( 2, step);
stmt.setString( 3, instanceUUID);
stmt.executeUpdate();
}
}
}
@Override
public void close() {
shutDown();
}
}

View File

@ -5,4 +5,5 @@ import com.google.inject.ImplementedBy;
@ImplementedBy(ServiceHeartbeatImpl.class)
public interface ServiceHeartbeat {
<T extends Enum<T>> ServiceTaskHeartbeat<T> createServiceTaskHeartbeat(Class<T> steps, String processName);
ServiceAdHocTaskHeartbeat createServiceAdHocTaskHeartbeat(String taskName);
}

View File

@ -54,6 +54,11 @@ public class ServiceHeartbeatImpl implements ServiceHeartbeat {
return new ServiceTaskHeartbeatImpl<>(steps, configuration, processName, eventLog, dataSource);
}
@Override
public ServiceAdHocTaskHeartbeat createServiceAdHocTaskHeartbeat(String taskName) {
return new ServiceAdHocTaskHeartbeatImpl(configuration, taskName, dataSource);
}
public void start() {
if (!running) {

View File

@ -9,23 +9,25 @@ import nu.marginalia.executor.storage.FileStorageFile;
import nu.marginalia.executor.upload.UploadDirContents;
import nu.marginalia.executor.upload.UploadDirItem;
import nu.marginalia.functions.execution.api.*;
import nu.marginalia.service.ServiceId;
import nu.marginalia.service.client.GrpcChannelPoolFactory;
import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.discovery.property.ServiceKey;
import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.ServiceId;
import nu.marginalia.storage.model.FileStorage;
import nu.marginalia.storage.model.FileStorageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.*;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.*;
import static nu.marginalia.functions.execution.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
@Singleton
public class ExecutorClient {
@ -163,8 +165,8 @@ public class ExecutorClient {
* The endpoint is compatible with range requests.
* */
public URL remoteFileURL(FileStorage fileStorage, String path) {
String uriPath = STR."/transfer/file/\{fileStorage.id()}";
String uriQuery = STR."path=\{URLEncoder.encode(path, StandardCharsets.UTF_8)}";
String uriPath = "/transfer/file/" + fileStorage.id();
String uriQuery = "path=" + URLEncoder.encode(path, StandardCharsets.UTF_8);
var endpoints = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, fileStorage.node()));
if (endpoints.isEmpty()) {

View File

@ -35,6 +35,7 @@ dependencies {
implementation project(':code:libraries:term-frequency-dict')
implementation project(':code:functions:link-graph:api')
implementation project(':code:functions:live-capture:api')
implementation project(':code:functions:search-query')
implementation project(':code:execution:api')

View File

@ -22,7 +22,8 @@ public enum ExecutorActor {
RESTORE_BACKUP,
EXPORT_SAMPLE_DATA,
DOWNLOAD_SAMPLE,
SCRAPE_FEEDS;
SCRAPE_FEEDS,
UPDATE_RSS;
public String id() {
return "fsm:" + name().toLowerCase();

View File

@ -50,7 +50,8 @@ public class ExecutorActorControlService {
ExportSegmentationModelActor exportSegmentationModelActor,
DownloadSampleActor downloadSampleActor,
ScrapeFeedsActor scrapeFeedsActor,
ExecutorActorStateMachines stateMachines) {
ExecutorActorStateMachines stateMachines,
UpdateRssActor updateRssActor) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
this.stateMachines = stateMachines;
@ -83,6 +84,7 @@ public class ExecutorActorControlService {
register(ExecutorActor.DOWNLOAD_SAMPLE, downloadSampleActor);
register(ExecutorActor.SCRAPE_FEEDS, scrapeFeedsActor);
register(ExecutorActor.UPDATE_RSS, updateRssActor);
}
private void register(ExecutorActor process, RecordActorPrototype graph) {

View File

@ -0,0 +1,134 @@
package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.api.feeds.RpcFeedUpdateMode;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.module.ServiceConfiguration;
import java.time.Duration;
import java.time.LocalDateTime;
public class UpdateRssActor extends RecordActorPrototype {
private final FeedsClient feedsClient;
private final int nodeId;
private final Duration initialDelay = Duration.ofMinutes(5);
private final Duration updateInterval = Duration.ofHours(24);
private final int cleanInterval = 60;
private final MqPersistence persistence;
@Inject
public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration, MqPersistence persistence) {
super(gson);
this.feedsClient = feedsClient;
this.nodeId = serviceConfiguration.node();
this.persistence = persistence;
}
public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Wait(String ts, int refreshCount) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record UpdateRefresh(int refreshCount, long msgId) implements ActorStep {
public UpdateRefresh(int refreshCount) {
this(refreshCount, -1);
}
}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record UpdateClean(long msgId) implements ActorStep {
public UpdateClean() {
this(-1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial() -> {
if (nodeId > 1) {
// Only run on the first node
yield new End();
}
else {
// Wait for 5 minutes before starting the first update, to give the system time to start up properly
yield new Wait(LocalDateTime.now().plus(initialDelay).toString(), 0);
}
}
case Wait(String untilTs, int count) -> {
var until = LocalDateTime.parse(untilTs);
var now = LocalDateTime.now();
long remaining = Duration.between(now, until).toMillis();
if (remaining > 0) {
Thread.sleep(remaining);
yield new Wait(untilTs, count);
}
else {
// Once every `cleanInterval` updates, do a clean update;
// otherwise do a refresh update
if (count > cleanInterval) {
yield new UpdateClean();
}
else {
yield new UpdateRefresh(count);
}
}
}
case UpdateRefresh(int count, long msgId) when msgId < 0 -> {
long messageId = feedsClient.updateFeeds(RpcFeedUpdateMode.REFRESH);
yield new UpdateRefresh(count, messageId);
}
case UpdateRefresh(int count, long msgId) -> {
MqMessage msg = persistence.waitForMessageTerminalState(msgId, Duration.ofSeconds(10), Duration.ofHours(12));
if (msg == null) {
// Retry the update
yield new Error("Failed to update feeds: message not found");
} else if (msg.state() != MqMessageState.OK) {
// Retry the update
yield new Error("Failed to update feeds: " + msg.state());
}
else {
// Increment the refresh count
yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), count + 1);
}
}
case UpdateClean(long msgId) when msgId < 0 -> {
long messageId = feedsClient.updateFeeds(RpcFeedUpdateMode.CLEAN);
yield new UpdateClean(messageId);
}
case UpdateClean(long msgId) -> {
MqMessage msg = persistence.waitForMessageTerminalState(msgId, Duration.ofSeconds(10), Duration.ofHours(12));
if (msg == null) {
// Retry the update
yield new Error("Failed to update feeds: message not found");
} else if (msg.state() != MqMessageState.OK) {
// Retry the update
yield new Error("Failed to update feeds: " + msg.state());
}
else {
// Reset the refresh count after a successful update
yield new Wait(LocalDateTime.now().plus(updateInterval).toString(), 0);
}
}
default -> new Error("Unknown actor step: " + self);
};
}
@Override
public String describe() {
return "Periodically updates RSS and Atom feeds";
}
}

View File

@ -1,24 +0,0 @@
plugins {
id 'java'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation libs.bundles.slf4j
implementation libs.notnull
implementation libs.gson
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}

View File

@ -1,58 +0,0 @@
package nu.marginalia.feedlot;
import com.google.gson.Gson;
import nu.marginalia.feedlot.model.FeedItems;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;
public class FeedlotClient {
private final String feedlotHost;
private final int feedlotPort;
private final Gson gson;
private final HttpClient httpClient;
private final Duration requestTimeout;
public FeedlotClient(String feedlotHost,
int feedlotPort,
Gson gson,
Duration connectTimeout,
Duration requestTimeout
)
{
this.feedlotHost = feedlotHost;
this.feedlotPort = feedlotPort;
this.gson = gson;
httpClient = HttpClient.newBuilder()
.executor(Executors.newCachedThreadPool())
.connectTimeout(connectTimeout)
.build();
this.requestTimeout = requestTimeout;
}
public CompletableFuture<FeedItems> getFeedItems(String domainName) {
return httpClient.sendAsync(
HttpRequest.newBuilder()
.uri(URI.create("http://%s:%d/feed/%s".formatted(feedlotHost, feedlotPort, domainName)))
.GET()
.timeout(requestTimeout)
.build(),
HttpResponse.BodyHandlers.ofString()
).thenApply(HttpResponse::body)
.thenApply(this::parseFeedItems);
}
private FeedItems parseFeedItems(String s) {
return gson.fromJson(s, FeedItems.class);
}
public void stop() {
httpClient.close();
}
}

View File

@ -1,17 +0,0 @@
package nu.marginalia.feedlot.model;
public record FeedItem(String title, String date, String description, String url) {
public String pubDay() { // Extract the date from an ISO style date string
if (date.length() > 10) {
return date.substring(0, 10);
}
return date;
}
public String descriptionSafe() {
return description
.replace("<", "&lt;")
.replace(">", "&gt;");
}
}

View File

@ -1,6 +0,0 @@
package nu.marginalia.feedlot.model;
import java.util.List;
public record FeedItems(String domain, String feedUrl, String updated, List<FeedItem> items) {
}

View File

@ -1,20 +0,0 @@
Client for [FeedlotTheFeedBot](https://github.com/MarginaliaSearch/FeedLotTheFeedBot),
the RSS/Atom feed fetcher and cache for Marginalia Search.
This service is external to the Marginalia Search codebase,
as it is not a core part of the search engine and has other
utilities.
## Example
```java
import java.time.Duration;
var client = new FeedlotClient("localhost", 8080,
gson,
Duration.ofMillis(100), // connect timeout
Duration.ofMillis(100)); // request timeout
CompleteableFuture<FeedItems> items = client.getFeedItems("www.marginalia.nu");
```

View File

@ -20,6 +20,7 @@ dependencies {
implementation project(':code:common:model')
implementation project(':code:common:config')
implementation project(':code:common:service')
implementation project(':code:libraries:message-queue')
implementation libs.bundles.slf4j

View File

@ -0,0 +1,65 @@
package nu.marginalia.api.feeds;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.client.GrpcChannelPoolFactory;
import nu.marginalia.service.client.GrpcSingleNodeChannelPool;
import nu.marginalia.service.discovery.property.ServiceKey;
import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.module.ServiceConfiguration;
import javax.annotation.CheckReturnValue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Singleton
public class FeedsClient {
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final GrpcSingleNodeChannelPool<FeedApiGrpc.FeedApiBlockingStub> channelPool;
private final MqOutbox updateFeedsOutbox;
@Inject
public FeedsClient(GrpcChannelPoolFactory factory, MqPersistence mqPersistence, ServiceConfiguration serviceConfiguration) {
// The client is only interested in the primary node
var key = ServiceKey.forGrpcApi(FeedApiGrpc.class, ServicePartition.any());
this.channelPool = factory.createSingle(key, FeedApiGrpc::newBlockingStub);
this.updateFeedsOutbox = new MqOutbox(mqPersistence,
"update-rss-feeds", 0,
serviceConfiguration.serviceName(), serviceConfiguration.node(),
UUID.randomUUID());
}
public CompletableFuture<RpcFeed> getFeed(int domainId) {
try {
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeed)
.async(executorService)
.run(RpcDomainId.newBuilder().setDomainId(domainId).build());
}
catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
/** Update the feeds, return a message ID for the update */
@CheckReturnValue
public long updateFeeds(RpcFeedUpdateMode mode) throws Exception {
// Create a message for the {@link MqLongRunningTask} paradigm to use for tracking the task
long msgId = updateFeedsOutbox.sendAsync("updateFeeds", "");
channelPool.call(FeedApiGrpc.FeedApiBlockingStub::updateFeeds)
.run(RpcUpdateRequest.newBuilder()
.setMode(mode)
.setMsgId(msgId)
.build()
);
return msgId;
}
}

View File

@ -0,0 +1,43 @@
syntax="proto3";
package nu.marginalia.api.feeds;
option java_package="nu.marginalia.api.feeds";
option java_multiple_files=true;
service FeedApi {
rpc getFeed(RpcDomainId) returns (RpcFeed) {}
rpc updateFeeds(RpcUpdateRequest) returns (Empty) {}
}
message RpcDomainId {
int32 domainId = 1;
}
message RpcUpdateRequest {
RpcFeedUpdateMode mode = 1;
int64 msgId = 2; // Id for a message on the message queue, will be replied to with a dummy response when the task is done,
// if the message id is not positive, no response will be attempted to be sent.
}
enum RpcFeedUpdateMode {
CLEAN = 0; // Start over with a new database from system rss exports
REFRESH = 1; // Refresh known feeds
}
message RpcFeed {
int32 domainId = 1;
string domain = 2;
string feedUrl = 3;
string updated = 4;
repeated RpcFeedItem items = 5;
}
message RpcFeedItem {
string title = 1;
string date = 2;
string description = 3;
string url = 4;
}
message Empty {}

View File

@ -4,15 +4,12 @@ package nu.marginalia.api.livecapture;
option java_package="nu.marginalia.api.livecapture";
option java_multiple_files=true;
service LiveCaptureApi {
rpc requestScreengrab(RpcDomainId) returns (Empty) {}
}
message Void {
}
message RpcDomainId {
int32 domainId = 1;
}
service LiveCaptureApi {
rpc requestScreengrab(RpcDomainId) returns (Empty) {}
}
message Empty {}

View File

@ -20,7 +20,15 @@ dependencies {
implementation project(':code:common:service')
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:libraries:blocking-thread-pool')
implementation project(':code:libraries:message-queue')
implementation project(':code:execution:api')
implementation libs.jsoup
implementation libs.rssreader
implementation libs.opencsv
implementation libs.sqlite
implementation libs.bundles.slf4j
implementation libs.commons.lang3

View File

@ -0,0 +1,144 @@
package nu.marginalia.rss.db;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.WmsaHome;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.rss.model.FeedDefinition;
import nu.marginalia.rss.model.FeedItems;
import nu.marginalia.service.module.ServiceConfiguration;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.Optional;
@Singleton
public class FeedDb {
private static final Logger logger = LoggerFactory.getLogger(FeedDb.class);
private static final String dbFileName = "rss-feeds.db";
private final Path readerDbPath;
private volatile FeedDbReader reader;
private final boolean feedDbEnabled;
@Inject
public FeedDb(ServiceConfiguration serviceConfiguration) {
feedDbEnabled = serviceConfiguration.node() <= 1;
readerDbPath = WmsaHome.getDataPath().resolve(dbFileName);
if (!feedDbEnabled) {
logger.info("Feed database is disabled on this node");
}
else {
try {
reader = new FeedDbReader(readerDbPath);
} catch (Exception e) {
reader = null;
}
}
}
public boolean isEnabled() {
return feedDbEnabled;
}
public List<FeedDefinition> getAllFeeds() {
if (!feedDbEnabled) {
throw new IllegalStateException("Feed database is disabled on this node");
}
// Capture the current reader to avoid concurrency issues
FeedDbReader reader = this.reader;
try {
if (reader != null) {
return reader.getAllFeeds();
}
}
catch (Exception e) {
logger.error("Error getting all feeds", e);
}
return List.of();
}
@NotNull
public FeedItems getFeed(EdgeDomain domain) {
if (!feedDbEnabled) {
throw new IllegalStateException("Feed database is disabled on this node");
}
// Capture the current reader to avoid concurrency issues
FeedDbReader reader = this.reader;
try {
if (reader != null) {
return reader.getFeed(domain);
}
}
catch (Exception e) {
logger.error("Error getting feed for " + domain, e);
}
return FeedItems.none();
}
public Optional<String> getFeedAsJson(String domain) {
if (!feedDbEnabled) {
throw new IllegalStateException("Feed database is disabled on this node");
}
// Capture the current reader to avoid concurrency issues
FeedDbReader reader = this.reader;
try {
if (reader != null) {
return reader.getFeedAsJson(domain);
}
}
catch (Exception e) {
logger.error("Error getting feed for " + domain, e);
}
return Optional.empty();
}
public FeedDbWriter createWriter() {
if (!feedDbEnabled) {
throw new IllegalStateException("Feed database is disabled on this node");
}
try {
Path dbFile = Files.createTempFile(WmsaHome.getDataPath(), "rss-feeds", ".tmp.db");
return new FeedDbWriter(dbFile);
} catch (Exception e) {
logger.error("Error creating new database writer", e);
return null;
}
}
public void switchDb(FeedDbWriter writer) {
if (!feedDbEnabled) {
throw new IllegalStateException("Feed database is disabled on this node");
}
try {
logger.info("Switching to new feed database from " + writer.getDbPath() + " to " + readerDbPath);
writer.close();
reader.close();
Files.move(writer.getDbPath(), readerDbPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
reader = new FeedDbReader(readerDbPath);
} catch (Exception e) {
logger.error("Fatal error switching to new feed database", e);
reader = null;
}
}
}

View File

@ -0,0 +1,102 @@
package nu.marginalia.rss.db;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.rss.model.FeedDefinition;
import nu.marginalia.rss.model.FeedItems;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class FeedDbReader implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(FeedDbReader.class);
private static final Gson gson = new GsonBuilder().create();
private final Connection connection;
public FeedDbReader(Path dbPath) throws SQLException {
String dbUrl = "jdbc:sqlite:" + dbPath.toAbsolutePath();
logger.info("Opening feed db at " + dbUrl);
connection = DriverManager.getConnection(dbUrl);
// Create table if it doesn't exist to avoid errors before any feeds have been fetched
try (var stmt = connection.createStatement()) {
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)");
}
}
public List<FeedDefinition> getAllFeeds() {
List<FeedDefinition> ret = new ArrayList<>();
try (var stmt = connection.createStatement()) {
var rs = stmt.executeQuery("""
select
json_extract(feed, '$.domain') as domain,
json_extract(feed, '$.feedUrl') as url,
json_extract(feed, '$.updated') as updated
from feed
""");
while (rs.next()) {
ret.add(new FeedDefinition(
rs.getString("domain"),
rs.getString("url"),
rs.getString("updated")));
}
} catch (SQLException e) {
logger.error("Error getting all feeds", e);
}
return ret;
}
public Optional<String> getFeedAsJson(String domain) {
try (var stmt = connection.prepareStatement("SELECT FEED FROM feed WHERE DOMAIN = ?")) {
stmt.setString(1, domain);
var rs = stmt.executeQuery();
if (rs.next()) {
return Optional.of(rs.getString(1));
}
} catch (SQLException e) {
logger.error("Error getting feed for " + domain, e);
}
return Optional.empty();
}
public FeedItems getFeed(EdgeDomain domain) {
try (var stmt = connection.prepareStatement("SELECT FEED FROM feed WHERE DOMAIN = ?")) {
stmt.setString(1, domain.toString());
var rs = stmt.executeQuery();
if (rs.next()) {
return deserialize(rs.getString(1));
}
} catch (SQLException e) {
logger.error("Error getting feed for " + domain, e);
}
return FeedItems.none();
}
private FeedItems deserialize(String string) {
return gson.fromJson(string, FeedItems.class);
}
@Override
public void close() throws SQLException {
connection.close();
}
}

View File

@ -0,0 +1,68 @@
package nu.marginalia.rss.db;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import nu.marginalia.rss.model.FeedItems;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class FeedDbWriter implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(FeedDbWriter.class);
private static final Gson gson = new GsonBuilder().create();
private final Connection connection;
private final PreparedStatement insertFeedStmt;
private final Path dbPath;
private volatile boolean closed = false;
public FeedDbWriter(Path dbPath) throws SQLException {
String dbUrl = "jdbc:sqlite:" + dbPath.toAbsolutePath();
this.dbPath = dbPath;
connection = DriverManager.getConnection(dbUrl);
try (var stmt = connection.createStatement()) {
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)");
}
insertFeedStmt = connection.prepareStatement("INSERT INTO feed (domain, feed) VALUES (?, ?)");
}
public Path getDbPath() {
return dbPath;
}
public synchronized void saveFeed(FeedItems items) {
try {
insertFeedStmt.setString(1, items.domain().toLowerCase());
insertFeedStmt.setString(2, serialize(items));
insertFeedStmt.executeUpdate();
}
catch (SQLException e) {
logger.error("Error saving feed for " + items.domain(), e);
}
}
private String serialize(FeedItems items) {
return gson.toJson(items);
}
@Override
public void close() throws SQLException {
if (!closed) {
insertFeedStmt.close();
connection.close();
closed = true;
}
}
}

View File

@ -0,0 +1,28 @@
package nu.marginalia.rss.model;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.ZonedDateTime;
public record FeedDefinition(
String domain,
String feedUrl,
@Nullable String updated)
{
private static final Duration defaultDuration = Duration.ofDays(30);
public Duration durationSinceUpdated() {
if (updated == null) { // Default to 30 days if no update time is available
return defaultDuration;
}
try {
return Duration.between(ZonedDateTime.parse(updated), ZonedDateTime.now());
}
catch (Exception e) {
return defaultDuration;
}
}
}

View File

@ -0,0 +1,62 @@
package nu.marginalia.rss.model;
import com.apptasticsoftware.rssreader.Item;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jsoup.Jsoup;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
public record FeedItem(String title,
String date,
String description,
String url) implements Comparable<FeedItem>
{
public static final int MAX_DESC_LENGTH = 255;
public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
public static FeedItem fromItem(Item item) {
String title = item.getTitle().orElse("");
String date = getItemDate(item);
String description = getItemDescription(item);
String url = item.getLink().orElse("");
return new FeedItem(title, date, description, url);
}
private static String getItemDescription(Item item) {
Optional<String> description = item.getDescription();
if (description.isEmpty())
return "";
String rawDescription = description.get();
if (rawDescription.indexOf('<') >= 0) {
rawDescription = Jsoup.parseBodyFragment(rawDescription).text();
}
return StringUtils.truncate(rawDescription, MAX_DESC_LENGTH);
}
// e.g. http://fabiensanglard.net/rss.xml does dates like this: 1 Apr 2021 00:00:00 +0000
private static final DateTimeFormatter extraFormatter = DateTimeFormatter.ofPattern("d MMM yyyy HH:mm:ss Z");
private static String getItemDate(Item item) {
Optional<ZonedDateTime> zonedDateTime = Optional.empty();
try {
zonedDateTime = item.getPubDateZonedDateTime();
}
catch (Exception e) {
zonedDateTime = item.getPubDate()
.map(extraFormatter::parse)
.map(ZonedDateTime::from);
}
return zonedDateTime.map(date -> date.format(DATE_FORMAT)).orElse("");
}
@Override
public int compareTo(@NotNull FeedItem o) {
return o.date.compareTo(date);
}
}

View File

@ -0,0 +1,33 @@
package nu.marginalia.rss.model;
import java.util.List;
import java.util.Optional;
public record FeedItems(String domain,
String feedUrl,
String updated,
List<FeedItem> items) {
// List.of() is immutable, so we can use the same instance for all empty FeedItems
private static final FeedItems EMPTY = new FeedItems("", "","1970-01-01T00:00:00.000+00000", List.of());
public static FeedItems none() {
return EMPTY;
}
public boolean isEmpty() {
return items.isEmpty();
}
public Optional<FeedItem> getLatest() {
if (items.isEmpty())
return Optional.empty();
return Optional.of(
items.getFirst()
);
}
public Optional<String> getLatestDate() {
return getLatest().map(FeedItem::date);
}
}

View File

@ -0,0 +1,269 @@
package nu.marginalia.rss.svc;
import com.apptasticsoftware.rssreader.RssReader;
import com.google.inject.Inject;
import com.opencsv.CSVReader;
import nu.marginalia.WmsaHome;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.rss.db.FeedDb;
import nu.marginalia.rss.model.FeedDefinition;
import nu.marginalia.rss.model.FeedItem;
import nu.marginalia.rss.model.FeedItems;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage;
import nu.marginalia.storage.model.FileStorageType;
import nu.marginalia.util.SimpleBlockingThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.http.HttpClient;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.zip.GZIPInputStream;
public class FeedFetcherService {
private static final int MAX_FEED_ITEMS = 10;
private static final Logger logger = LoggerFactory.getLogger(FeedFetcherService.class);
private final RssReader rssReader = new RssReader(
HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.executor(Executors.newCachedThreadPool())
.followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_2)
.build()
);
private final FeedDb feedDb;
private final FileStorageService fileStorageService;
private final NodeConfigurationService nodeConfigurationService;
private final ServiceHeartbeat serviceHeartbeat;
private final ExecutorClient executorClient;
private volatile boolean updating;
@Inject
public FeedFetcherService(FeedDb feedDb,
FileStorageService fileStorageService,
NodeConfigurationService nodeConfigurationService,
ServiceHeartbeat serviceHeartbeat,
ExecutorClient executorClient)
{
this.feedDb = feedDb;
this.fileStorageService = fileStorageService;
this.nodeConfigurationService = nodeConfigurationService;
this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient;
rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher");
}
public enum UpdateMode {
CLEAN,
REFRESH
};
public void updateFeeds(UpdateMode updateMode) throws IOException {
if (updating) // Prevent concurrent updates
{
throw new IllegalStateException("Already updating feeds, refusing to start another update");
}
try (var writer = feedDb.createWriter();
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
) {
updating = true;
// Read the feed definitions from the database, if they are not available, read them from the system's
// RSS exports instead
Collection<FeedDefinition> definitions = feedDb.getAllFeeds();
// If we didn't get any definitions, or a clean update is requested, read the definitions from the system
// instead
if (definitions == null || updateMode == UpdateMode.CLEAN) {
definitions = readDefinitionsFromSystem();
}
logger.info("Found {} feed definitions", definitions.size());
final AtomicInteger definitionsUpdated = new AtomicInteger(0);
final int totalDefinitions = definitions.size();
SimpleBlockingThreadPool executor = new SimpleBlockingThreadPool("FeedFetcher", 64, 4);
for (var feed : definitions) {
executor.submitQuietly(() -> {
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
// If we have existing data, we might skip updating it with a probability that increases with time,
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
// on our end
if (!oldData.isEmpty()) {
Duration duration = feed.durationSinceUpdated();
long daysSinceUpdate = duration.toDays();
if (daysSinceUpdate > 2 && ThreadLocalRandom.current()
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)
{
// Skip updating this feed, just write the old data back instead
writer.saveFeed(oldData);
return;
}
}
var items = fetchFeed(feed);
if (!items.isEmpty()) {
writer.saveFeed(items);
}
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
// Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
}
});
}
executor.shutDown();
// Wait for the executor to finish, but give up after 60 minutes to avoid hanging indefinitely
for (int waitedMinutes = 0; waitedMinutes < 60; waitedMinutes++) {
if (executor.awaitTermination(1, TimeUnit.MINUTES)) break;
}
executor.shutDownNow();
// Wait for any in-progress writes to finish before switching the database
// in case we ended up murdering the writer with shutDownNow. It's a very
// slim chance but this increases the odds of a clean switch over.
TimeUnit.SECONDS.sleep(5);
feedDb.switchDb(writer);
} catch (SQLException|InterruptedException e) {
logger.error("Error updating feeds", e);
}
finally {
updating = false;
}
}
public Collection<FeedDefinition> readDefinitionsFromSystem() throws IOException {
Collection<FileStorage> storages = getLatestFeedStorages();
List<FeedDefinition> feedDefinitionList = new ArrayList<>();
for (var storage : storages) {
var url = executorClient.remoteFileURL(storage, "feeds.csv.gz");
try (var feedStream = new GZIPInputStream(url.openStream())) {
CSVReader reader = new CSVReader(new java.io.InputStreamReader(feedStream));
for (String[] row : reader) {
if (row.length < 3) {
continue;
}
var domain = row[0].trim();
var feedUrl = row[2].trim();
feedDefinitionList.add(new FeedDefinition(domain, feedUrl, null));
}
}
}
return feedDefinitionList;
}
private Collection<FileStorage> getLatestFeedStorages() {
// Find the newest feed storage for each node
Map<Integer, FileStorage> newestStorageByNode = new HashMap<>();
for (var node : nodeConfigurationService.getAll()) {
int nodeId = node.node();
for (var storage: fileStorageService.getEachFileStorage(nodeId, FileStorageType.EXPORT)) {
if (!storage.description().startsWith("Feeds "))
continue;
newestStorageByNode.compute(storage.node(), new KeepNewerFeedStorage(storage));
}
}
return newestStorageByNode.values();
}
private static class KeepNewerFeedStorage implements BiFunction<Integer, FileStorage, FileStorage> {
private final FileStorage newValue;
private KeepNewerFeedStorage(FileStorage newValue) {
this.newValue = newValue;
}
public FileStorage apply(Integer node, @Nullable FileStorage oldValue) {
if (oldValue == null) return newValue;
return newValue.createDateTime().isAfter(oldValue.createDateTime())
? newValue
: oldValue;
}
}
public FeedItems fetchFeed(FeedDefinition definition) {
try {
var items = rssReader.read(definition.feedUrl())
.map(FeedItem::fromItem)
.filter(new IsFeedItemDateValid())
.sorted()
.limit(MAX_FEED_ITEMS)
.toList();
return new FeedItems(
definition.domain(),
definition.feedUrl(),
LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
items);
} catch (Exception e) {
logger.debug("Exception", e);
return FeedItems.none();
}
}
private static class IsFeedItemDateValid implements Predicate<FeedItem> {
private final String today = ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
public boolean test(FeedItem item) {
var date = item.date();
if (date.isBlank()) {
return false;
}
if (date.compareTo(today) > 0) {
return false;
}
return true;
}
}
}

View File

@ -0,0 +1,102 @@
package nu.marginalia.rss.svc;
import com.google.inject.Inject;
import io.grpc.stub.StreamObserver;
import nu.marginalia.api.feeds.*;
import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mq.task.MqLongRunningTask;
import nu.marginalia.mq.task.MqTaskResult;
import nu.marginalia.rss.db.FeedDb;
import nu.marginalia.rss.model.FeedItems;
import nu.marginalia.service.server.DiscoverableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements DiscoverableService {
private final FeedDb feedDb;
private final DbDomainQueries domainQueries;
private final MqPersistence mqPersistence;
private final FeedFetcherService feedFetcherService;
private static final Logger logger = LoggerFactory.getLogger(FeedsGrpcService.class);
@Inject
public FeedsGrpcService(FeedDb feedDb,
DbDomainQueries domainQueries,
MqPersistence mqPersistence,
FeedFetcherService feedFetcherService) {
this.feedDb = feedDb;
this.domainQueries = domainQueries;
this.mqPersistence = mqPersistence;
this.feedFetcherService = feedFetcherService;
}
// Ensure that the service is only registered if it is enabled
@Override
public boolean shouldRegisterService() {
return feedDb.isEnabled();
}
@Override
public void updateFeeds(RpcUpdateRequest request,
StreamObserver<Empty> responseObserver)
{
FeedFetcherService.UpdateMode updateMode = switch(request.getMode()) {
case CLEAN -> FeedFetcherService.UpdateMode.CLEAN;
case REFRESH -> FeedFetcherService.UpdateMode.REFRESH;
default -> throw new IllegalStateException("Unexpected value: " + request.getMode());
};
// Start a long-running task to update the feeds
MqLongRunningTask
.of(request.getMsgId(), "updateFeeds", mqPersistence)
.asThread(() -> {
feedFetcherService.updateFeeds(updateMode);
return new MqTaskResult.Success();
})
.start();
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
@Override
public void getFeed(RpcDomainId request,
StreamObserver<RpcFeed> responseObserver)
{
if (!feedDb.isEnabled()) {
responseObserver.onError(new IllegalStateException("Feed database is disabled on this node"));
return;
}
Optional<EdgeDomain> domainName = domainQueries.getDomain(request.getDomainId());
if (domainName.isEmpty()) {
responseObserver.onError(new IllegalArgumentException("Domain not found"));
return;
}
FeedItems feedItems = feedDb.getFeed(domainName.get());
RpcFeed.Builder retB = RpcFeed.newBuilder()
.setDomainId(request.getDomainId())
.setDomain(domainName.get().toString())
.setFeedUrl(feedItems.feedUrl())
.setUpdated(feedItems.updated());
for (var item : feedItems.items()) {
retB.addItemsBuilder()
.setTitle(item.title())
.setUrl(item.url())
.setDescription(item.description())
.setDate(item.date())
.build();
}
responseObserver.onNext(retB.build());
responseObserver.onCompleted();
}
}

View File

@ -93,14 +93,14 @@ public class MqOutbox {
}
/** Send a message and wait for a response. */
public MqMessage send(String function, String payload) throws Exception {
public MqMessage sendBlocking(String function, String payload) throws Exception {
final long id = sendAsync(function, payload);
return waitResponse(id);
}
/** Send a message and wait for a response */
public MqMessage send(Object object) throws Exception {
public MqMessage sendBlocking(Object object) throws Exception {
final long id = sendAsync(object);
return waitResponse(id);

View File

@ -5,8 +5,10 @@ import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.sql.SQLException;
@ -24,6 +26,8 @@ public class MqPersistence {
private final HikariDataSource dataSource;
private final Gson gson;
private static final Logger logger = LoggerFactory.getLogger(MqPersistence.class);
public MqPersistence(HikariDataSource dataSource) {
this.dataSource = dataSource;
this.gson = null;
@ -145,6 +149,35 @@ public class MqPersistence {
}
}
/** Blocks until a message reaches a terminal state or the timeout passes.
* <p>
* @param msgId The id of the message to wait for
* @param pollInterval The interval to poll the database for updates
* @param timeout The maximum time to wait for the message to reach a terminal state
* @return The message if it reaches a terminal state, or null if the timeout passes
*/
@Nullable
public MqMessage waitForMessageTerminalState(long msgId, Duration pollInterval, Duration timeout) throws InterruptedException, SQLException {
long deadline = System.currentTimeMillis() + timeout.toMillis();
do {
var message = getMessage(msgId);
if (message.state().isTerminal()) {
return message;
}
long timeLeft = deadline - System.currentTimeMillis();
if (timeLeft <= 0) {
continue;
}
long sleepTime = Math.min(pollInterval.toMillis(), timeLeft);
Thread.sleep(sleepTime);
} while (System.currentTimeMillis() < deadline);
return null;
}
/** Creates a new message in the queue referencing as a reply to an existing message
* This message will have it's RELATED_ID set to the original message's ID.
*/
@ -197,7 +230,6 @@ public class MqPersistence {
}
}
/** Marks unclaimed messages addressed to this inbox with instanceUUID and tick,
* then returns the number of messages marked. This is an atomic operation that
* ensures that messages aren't double processed.

View File

@ -0,0 +1,138 @@
package nu.marginalia.mq.task;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/** A long-running task that can be executed asynchronously
* and report back to the message queue.
* <p></p>
* The idiomatic pattern is to create an outbox and send a message corresponding to the task,
* and then pass the message id along with the request to trigger the task over gRPC.
* <p></p>
* The gRPC service will spin off a thread and return immediately, while the task is executed
* in the background. The task can then report back to the message queue with the result
* of the task as it completes, by updating the message's state.
* */
public abstract class MqLongRunningTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(MqLongRunningTask.class);
/** Create a new task with the given message id, name, and persistence. If the msgId is
* not positive, a dummy implementation is provided that does not report to the message queue.
*/
public static MqLongRunningTask of(long msgId, String name, MqPersistence persistence) {
if (msgId <= 0) {
return new MqLongRunningTaskDummyImpl(name);
}
else {
return new MqLongRunningTaskImpl(persistence, msgId, name);
}
}
/** Creates a thread that will execute the task. The thread is not started automatically */
public Thread asThread(MqTaskFunction r) {
return new Thread(() -> runNow(r), name());
}
/** Creates a future that will execute the task on the provided ExecutorService. */
public Future<Boolean> asFuture(ExecutorService executor, MqTaskFunction r) {
return executor.submit(() -> runNow(r));
}
/** Execute the task synchronously and return true if the task was successful */
public boolean runNow(MqTaskFunction r) {
try {
switch (r.run()) {
case MqTaskResult.Success success -> {
finish();
return true;
}
case MqTaskResult.Failure failure -> fail();
}
}
catch (Exception e) {
logger.error("Task failed", e);
fail();
}
return false;
}
abstract void finish();
abstract void fail();
public abstract String name();
}
class MqLongRunningTaskDummyImpl extends MqLongRunningTask {
private final String name;
MqLongRunningTaskDummyImpl(String name) {
this.name = name;
}
@Override
public void finish() {}
@Override
public void fail() {}
@Override
public void run() {}
@Override
public String name() {
return name;
}
}
class MqLongRunningTaskImpl extends MqLongRunningTask {
private final MqPersistence persistence;
private final long msgId;
private final String name;
MqLongRunningTaskImpl(MqPersistence persistence, long msgId, String name) {
this.persistence = persistence;
this.msgId = msgId;
this.name = name;
try {
persistence.updateMessageState(msgId, MqMessageState.ACK);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void finish() {
try {
persistence.updateMessageState(msgId, MqMessageState.OK);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void fail() {
try {
persistence.updateMessageState(msgId, MqMessageState.ERR);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void run() {}
@Override
public String name() {
return name;
}
}

View File

@ -0,0 +1,5 @@
package nu.marginalia.mq.task;
public interface MqTaskFunction {
MqTaskResult run() throws Exception;
}

View File

@ -0,0 +1,14 @@
package nu.marginalia.mq.task;
public sealed interface MqTaskResult {
record Success(String message) implements MqTaskResult {
public Success(){
this("Ok");
}
}
record Failure(String message) implements MqTaskResult {
public Failure(Throwable e){
this(e.getClass().getSimpleName() + " : " + e.getMessage());
}
}
}

View File

@ -110,9 +110,9 @@ public class MqOutboxTest {
}
@Test
public void testSend() throws Exception {
public void testSendBlocking() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
Executors.newSingleThreadExecutor().submit(() -> outbox.send("test", "Hello World"));
Executors.newSingleThreadExecutor().submit(() -> outbox.sendBlocking("test", "Hello World"));
TimeUnit.MILLISECONDS.sleep(100);
@ -125,14 +125,14 @@ public class MqOutboxTest {
@Test
public void testSendAndRespondAsyncInbox() throws Exception {
public void testSendBlockingAndRespondAsyncInbox() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
inbox.subscribe(justRespond("Alright then"));
inbox.start();
var rsp = outbox.send("test", "Hello World");
var rsp = outbox.sendBlocking("test", "Hello World");
assertEquals(MqMessageState.OK, rsp.state());
assertEquals("Alright then", rsp.payload());
@ -146,14 +146,14 @@ public class MqOutboxTest {
}
@Test
public void testSendAndRespondSyncInbox() throws Exception {
public void testSendBlockingAndRespondSyncInbox() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
inbox.subscribe(justRespond("Alright then"));
inbox.start();
var rsp = outbox.send("test", "Hello World");
var rsp = outbox.sendBlocking("test", "Hello World");
assertEquals(MqMessageState.OK, rsp.state());
assertEquals("Alright then", rsp.payload());
@ -167,17 +167,17 @@ public class MqOutboxTest {
}
@Test
public void testSendMultipleAsyncInbox() throws Exception {
public void testSendBlockingMultipleAsyncInbox() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
inbox.subscribe(echo());
inbox.start();
var rsp1 = outbox.send("test", "one");
var rsp2 = outbox.send("test", "two");
var rsp3 = outbox.send("test", "three");
var rsp4 = outbox.send("test", "four");
var rsp1 = outbox.sendBlocking("test", "one");
var rsp2 = outbox.sendBlocking("test", "two");
var rsp3 = outbox.sendBlocking("test", "three");
var rsp4 = outbox.sendBlocking("test", "four");
Thread.sleep(500);
@ -201,17 +201,17 @@ public class MqOutboxTest {
}
@Test
public void testSendMultipleSyncInbox() throws Exception {
public void testSendBlockingMultipleSyncInbox() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
inbox.subscribe(echo());
inbox.start();
var rsp1 = outbox.send("test", "one");
var rsp2 = outbox.send("test", "two");
var rsp3 = outbox.send("test", "three");
var rsp4 = outbox.send("test", "four");
var rsp1 = outbox.sendBlocking("test", "one");
var rsp2 = outbox.sendBlocking("test", "two");
var rsp3 = outbox.sendBlocking("test", "three");
var rsp4 = outbox.sendBlocking("test", "four");
Thread.sleep(500);
@ -235,13 +235,13 @@ public class MqOutboxTest {
}
@Test
public void testSendAndRespondWithErrorHandlerAsyncInbox() throws Exception {
public void testSendBlockingAndRespondWithErrorHandlerAsyncInbox() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
var inbox = new MqAsynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
inbox.start();
var rsp = outbox.send("test", "Hello World");
var rsp = outbox.sendBlocking("test", "Hello World");
assertEquals(MqMessageState.ERR, rsp.state());
@ -254,13 +254,13 @@ public class MqOutboxTest {
}
@Test
public void testSendAndRespondWithErrorHandlerSyncInbox() throws Exception {
public void testSendBlockingAndRespondWithErrorHandlerSyncInbox() throws Exception {
var outbox = new MqOutbox(new MqPersistence(dataSource), inboxId, 0, inboxId+"/reply", 0, UUID.randomUUID());
var inbox = new MqSynchronousInbox(new MqPersistence(dataSource), inboxId+":0", UUID.randomUUID());
inbox.start();
var rsp = outbox.send("test", "Hello World");
var rsp = outbox.sendBlocking("test", "Hello World");
assertEquals(MqMessageState.ERR, rsp.state());

View File

@ -53,7 +53,6 @@ dependencies {
implementation project(':code:features-search:screenshots')
implementation project(':code:features-search:random-websites')
implementation project(':code:features-search:feedlot-client')
implementation libs.bundles.slf4j

View File

@ -1,15 +1,10 @@
package nu.marginalia.search;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import nu.marginalia.LanguageModels;
import nu.marginalia.WebsiteUrl;
import nu.marginalia.WmsaHome;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.renderer.config.HandlebarsConfigurator;
import nu.marginalia.feedlot.FeedlotClient;
import java.time.Duration;
public class SearchModule extends AbstractModule {
@ -22,14 +17,4 @@ public class SearchModule extends AbstractModule {
System.getProperty("search.websiteUrl", "https://search.marginalia.nu/")));
}
@Provides
public FeedlotClient provideFeedlotClient() {
return new FeedlotClient(
System.getProperty("ext-svc-feedlot-bindAddress", "feedlot"),
Integer.getInteger("ext-svc-feedlot-port", 80),
GsonFactory.get(),
Duration.ofMillis(250),
Duration.ofMillis(100)
);
}
}

View File

@ -4,10 +4,11 @@ import com.google.inject.Inject;
import nu.marginalia.api.domains.DomainInfoClient;
import nu.marginalia.api.domains.model.DomainInformation;
import nu.marginalia.api.domains.model.SimilarDomain;
import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.api.feeds.RpcFeed;
import nu.marginalia.api.feeds.RpcFeedItem;
import nu.marginalia.api.livecapture.LiveCaptureClient;
import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.feedlot.FeedlotClient;
import nu.marginalia.feedlot.model.FeedItems;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.renderer.MustacheRenderer;
import nu.marginalia.renderer.RendererFactory;
@ -37,7 +38,7 @@ public class SearchSiteInfoService {
private final SearchFlagSiteService flagSiteService;
private final DbDomainQueries domainQueries;
private final MustacheRenderer<Object> renderer;
private final FeedlotClient feedlotClient;
private final FeedsClient feedsClient;
private final LiveCaptureClient liveCaptureClient;
private final ScreenshotService screenshotService;
@ -47,7 +48,7 @@ public class SearchSiteInfoService {
RendererFactory rendererFactory,
SearchFlagSiteService flagSiteService,
DbDomainQueries domainQueries,
FeedlotClient feedlotClient,
FeedsClient feedsClient,
LiveCaptureClient liveCaptureClient,
ScreenshotService screenshotService) throws IOException
{
@ -58,7 +59,7 @@ public class SearchSiteInfoService {
this.renderer = rendererFactory.renderer("search/site-info/site-info");
this.feedlotClient = feedlotClient;
this.feedsClient = feedsClient;
this.liveCaptureClient = liveCaptureClient;
this.screenshotService = screenshotService;
}
@ -135,26 +136,29 @@ public class SearchSiteInfoService {
final Future<DomainInformation> domainInfoFuture;
final Future<List<SimilarDomain>> similarSetFuture;
final Future<List<SimilarDomain>> linkingDomainsFuture;
final CompletableFuture<RpcFeed> feedItemsFuture;
String url = "https://" + domainName + "/";
boolean hasScreenshot = screenshotService.hasScreenshot(domainId);
var feedItemsFuture = feedlotClient.getFeedItems(domainName);
if (domainId < 0) {
domainInfoFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
similarSetFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
feedItemsFuture = CompletableFuture.failedFuture(new Exception("Unknown Domain ID"));
}
else if (!domainInfoClient.isAccepting()) {
domainInfoFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
similarSetFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
linkingDomainsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
feedItemsFuture = CompletableFuture.failedFuture(new Exception("Assistant Service Unavailable"));
}
else {
domainInfoFuture = domainInfoClient.domainInformation(domainId);
similarSetFuture = domainInfoClient.similarDomains(domainId, 25);
linkingDomainsFuture = domainInfoClient.linkedDomains(domainId, 25);
feedItemsFuture = feedsClient.getFeed(domainId);
}
List<UrlDetails> sampleResults = searchOperator.doSiteSearch(domainName, domainId,5);
@ -162,13 +166,6 @@ public class SearchSiteInfoService {
url = sampleResults.getFirst().url.withPathAndParam("/", null).toString();
}
FeedItems feedItems = null;
try {
feedItems = feedItemsFuture.get();
} catch (Exception e) {
logger.debug("Failed to get feed items for {}: {}", domainName, e.getMessage());
}
var result = new SiteInfoWithContext(domainName,
domainId,
url,
@ -176,7 +173,7 @@ public class SearchSiteInfoService {
waitForFuture(domainInfoFuture, () -> createDummySiteInfo(domainName)),
waitForFuture(similarSetFuture, List::of),
waitForFuture(linkingDomainsFuture, List::of),
feedItems,
waitForFuture(feedItemsFuture.thenApply(FeedItems::new), () -> FeedItems.dummyValue(domainName)),
sampleResults
);
@ -356,6 +353,43 @@ public class SearchSiteInfoService {
}
}
public record FeedItem(String title, String date, String description, String url) {
public FeedItem(RpcFeedItem rpcFeedItem) {
this(rpcFeedItem.getTitle(),
rpcFeedItem.getDate(),
rpcFeedItem.getDescription(),
rpcFeedItem.getUrl());
}
public String pubDay() { // Extract the date from an ISO style date string
if (date.length() > 10) {
return date.substring(0, 10);
}
return date;
}
public String descriptionSafe() {
return description
.replace("<", "&lt;")
.replace(">", "&gt;");
}
}
public record FeedItems(String domain, String feedUrl, String updated, List<FeedItem> items) {
public static FeedItems dummyValue(String domain) {
return new FeedItems(domain, "", "", List.of());
}
public FeedItems(RpcFeed rpcFeedItems) {
this(rpcFeedItems.getDomain(),
rpcFeedItems.getFeedUrl(),
rpcFeedItems.getUpdated(),
rpcFeedItems.getItemsList().stream().map(FeedItem::new).toList());
}
}
public record ReportDomain(
Map<String, Boolean> view,
String domain,

View File

@ -1,5 +1,6 @@
{{#if feed.items}}
{{#with feed}}
<h2><a title="Atom/RSS feed" target="external" href="{{feedUrl}}"><img width="16" height="16" src="/rss.svg"></a> Feed (Experimental)</h2>
<h2><a title="Atom/RSS feed" target="external" href="{{feedUrl}}"><img width="16" height="16" src="/rss.svg"></a> Feed</h2>
<dl>
{{#each items}}
@ -8,8 +9,9 @@
{{/each}}
</dl>
{{/with}}
{{/if}}
{{#unless feed}}{{#if samples}}
{{#unless feed.items}}{{#if samples}}
<h2>Sample</h2>
<dl>
{{#each samples}}

View File

@ -8,6 +8,7 @@ import nu.marginalia.functions.domains.DomainInfoGrpcService;
import nu.marginalia.functions.math.MathGrpcService;
import nu.marginalia.livecapture.LiveCaptureGrpcService;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.rss.svc.FeedsGrpcService;
import nu.marginalia.screenshot.ScreenshotService;
import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.server.BaseServiceParams;
@ -31,10 +32,15 @@ public class AssistantService extends Service {
ScreenshotService screenshotService,
DomainInfoGrpcService domainInfoGrpcService,
LiveCaptureGrpcService liveCaptureGrpcService,
FeedsGrpcService feedsGrpcService,
MathGrpcService mathGrpcService,
Suggestions suggestions)
{
super(params, ServicePartition.any(), List.of(domainInfoGrpcService, mathGrpcService, liveCaptureGrpcService));
super(params, ServicePartition.any(),
List.of(domainInfoGrpcService,
mathGrpcService,
liveCaptureGrpcService,
feedsGrpcService));
this.suggestions = suggestions;

View File

@ -56,7 +56,6 @@ include 'code:libraries:message-queue'
include 'code:features-search:screenshots'
include 'code:features-search:random-websites'
include 'code:features-search:feedlot-client'
include 'code:processes:converting-process:ft-anchor-keywords'
include 'code:execution:data-extractors'
@ -155,6 +154,8 @@ dependencyResolutionManagement {
library('prometheus-server', 'io.prometheus', 'simpleclient_httpserver').version('0.16.0')
library('prometheus-hotspot', 'io.prometheus', 'simpleclient_hotspot').version('0.16.0')
library('rssreader', 'com.apptasticsoftware', 'rssreader').version('3.5.0')
library('slf4j.api', 'org.slf4j', 'slf4j-api').version('1.7.36')
library('slf4j.jdk14', 'org.slf4j', 'slf4j-jdk14').version('2.0.3')