(feeds) Refresh the feed db using the previous db, when it is available.

This commit is contained in:
Viktor Lofgren 2024-11-09 17:36:38 +01:00
parent b0ac3c586f
commit ab17af99da
4 changed files with 83 additions and 20 deletions

View File

@ -41,12 +41,16 @@ public class FeedDbReader implements AutoCloseable {
var rs = stmt.executeQuery(""" var rs = stmt.executeQuery("""
select select
json_extract(feed, '$.domain') as domain, json_extract(feed, '$.domain') as domain,
json_extract(feed, '$.feedUrl') as url json_extract(feed, '$.feedUrl') as url,
json_extract(feed, '$.updated') as updated
from feed from feed
"""); """);
while (rs.next()) { while (rs.next()) {
ret.add(new FeedDefinition(rs.getString("domain"), rs.getString("url"))); ret.add(new FeedDefinition(
rs.getString("domain"),
rs.getString("url"),
rs.getString("updated")));
} }
} catch (SQLException e) { } catch (SQLException e) {

View File

@ -31,13 +31,6 @@ public class FeedDbWriter implements AutoCloseable {
connection = DriverManager.getConnection(dbUrl); connection = DriverManager.getConnection(dbUrl);
try (var stmt = connection.createStatement()) { try (var stmt = connection.createStatement()) {
// Disable synchronous writes for speed. We don't care about recovery.
//
// If this operation fails we just retry the entire operation from scratch
// with a new db file.
stmt.execute("PRAGMA synchronous = OFF");
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)"); stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)");
} }

View File

@ -1,3 +1,28 @@
package nu.marginalia.rss.model; package nu.marginalia.rss.model;
public record FeedDefinition(String domain, String feedUrl) { } import javax.annotation.Nullable;
import java.time.Duration;
import java.time.ZonedDateTime;
public record FeedDefinition(
String domain,
String feedUrl,
@Nullable String updated)
{
private static final Duration defaultDuration = Duration.ofDays(30);
public Duration durationSinceUpdated() {
if (updated == null) { // Default to 30 days if no update time is available
return defaultDuration;
}
try {
return Duration.between(ZonedDateTime.parse(updated), ZonedDateTime.now());
}
catch (Exception e) {
return defaultDuration;
}
}
}

View File

@ -5,6 +5,7 @@ import com.google.inject.Inject;
import com.opencsv.CSVReader; import com.opencsv.CSVReader;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.executor.client.ExecutorClient; import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.rss.db.FeedDb; import nu.marginalia.rss.db.FeedDb;
import nu.marginalia.rss.model.FeedDefinition; import nu.marginalia.rss.model.FeedDefinition;
@ -28,6 +29,7 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -67,11 +69,11 @@ public class FeedFetcherService {
this.nodeConfigurationService = nodeConfigurationService; this.nodeConfigurationService = nodeConfigurationService;
this.serviceHeartbeat = serviceHeartbeat; this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient; this.executorClient = executorClient;
rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher");
} }
public void updateFeeds() throws IOException { public void updateFeeds() throws IOException {
rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher");
if (updating) // Prevent concurrent updates if (updating) // Prevent concurrent updates
{ {
logger.error("Already updating feeds, refusing to start another update"); logger.error("Already updating feeds, refusing to start another update");
@ -82,30 +84,69 @@ public class FeedFetcherService {
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds") var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
) { ) {
updating = true; updating = true;
var definitions = readDefinitions();
// Read the feed definitions from the database, if they are not available, read them from the system's
// RSS exports instead
Collection<FeedDefinition> definitions = feedDb.getAllFeeds();
if (definitions == null) {
definitions = readDefinitionsFromSystem();
}
logger.info("Found {} feed definitions", definitions.size()); logger.info("Found {} feed definitions", definitions.size());
AtomicInteger updated = new AtomicInteger(0); final AtomicInteger definitionsUpdated = new AtomicInteger(0);
final int totalDefinitions = definitions.size();
SimpleBlockingThreadPool executor = new SimpleBlockingThreadPool("FeedFetcher", 64, 4); SimpleBlockingThreadPool executor = new SimpleBlockingThreadPool("FeedFetcher", 64, 4);
for (var feed : definitions) { for (var feed : definitions) {
executor.submitQuietly(() -> { executor.submitQuietly(() -> {
var oldData = feedDb.getFeed(new EdgeDomain(feed.domain()));
// If we have existing data, we might skip updating it with a probability that increases with time,
// this is to avoid hammering the feeds that are updated very rarely and save some time and resources
// on our end
if (!oldData.isEmpty()) {
Duration duration = feed.durationSinceUpdated();
long daysSinceUpdate = duration.toDays();
if (daysSinceUpdate > 2 && ThreadLocalRandom.current()
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)
{
// Skip updating this feed, just write the old data back instead
writer.saveFeed(oldData);
return;
}
}
var items = fetchFeed(feed); var items = fetchFeed(feed);
if (!items.isEmpty()) { if (!items.isEmpty()) {
writer.saveFeed(items); writer.saveFeed(items);
} }
if ((updated.incrementAndGet() % 10_000) == 0) { if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
// Update the progress every 10k feeds, to avoid hammering the database and flooding the logs // Update the progress every 1k feeds, to avoid hammering the database and flooding the logs
heartbeat.progress("Updated " + updated + "/" + definitions.size() + " feeds", updated.get(), definitions.size()); heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions);
} }
}); });
} }
executor.shutDown(); executor.shutDown();
executor.awaitTermination(1, TimeUnit.DAYS); // Wait for the executor to finish, but give up after 60 minutes to avoid hanging indefinitely
for (int waitedMinutes = 0; waitedMinutes < 60; waitedMinutes++) {
if (executor.awaitTermination(1, TimeUnit.MINUTES)) break;
}
executor.shutDownNow();
// Wait for any in-progress writes to finish before switching the database
// in case we ended up murdering the writer with shutDownNow. It's a very
// slim chance but this increases the odds of a clean switch over.
TimeUnit.SECONDS.sleep(5);
feedDb.switchDb(writer); feedDb.switchDb(writer);
@ -117,7 +158,7 @@ public class FeedFetcherService {
} }
} }
public Collection<FeedDefinition> readDefinitions() throws IOException { public Collection<FeedDefinition> readDefinitionsFromSystem() throws IOException {
Collection<FileStorage> storages = getLatestFeedStorages(); Collection<FileStorage> storages = getLatestFeedStorages();
List<FeedDefinition> feedDefinitionList = new ArrayList<>(); List<FeedDefinition> feedDefinitionList = new ArrayList<>();
@ -134,7 +175,7 @@ public class FeedFetcherService {
var domain = row[0].trim(); var domain = row[0].trim();
var feedUrl = row[2].trim(); var feedUrl = row[2].trim();
feedDefinitionList.add(new FeedDefinition(domain, feedUrl)); feedDefinitionList.add(new FeedDefinition(domain, feedUrl, null));
} }
} }