diff --git a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java index 888eb731..c810742d 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java @@ -41,12 +41,16 @@ public class FeedDbReader implements AutoCloseable { var rs = stmt.executeQuery(""" select json_extract(feed, '$.domain') as domain, - json_extract(feed, '$.feedUrl') as url + json_extract(feed, '$.feedUrl') as url, + json_extract(feed, '$.updated') as updated from feed """); while (rs.next()) { - ret.add(new FeedDefinition(rs.getString("domain"), rs.getString("url"))); + ret.add(new FeedDefinition( + rs.getString("domain"), + rs.getString("url"), + rs.getString("updated"))); } } catch (SQLException e) { diff --git a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java index fcf1c363..36b5f710 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java @@ -31,13 +31,6 @@ public class FeedDbWriter implements AutoCloseable { connection = DriverManager.getConnection(dbUrl); try (var stmt = connection.createStatement()) { - // Disable synchronous writes for speed. We don't care about recovery. - // - // If this operation fails we just retry the entire operation from scratch - // with a new db file. - - stmt.execute("PRAGMA synchronous = OFF"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)"); } diff --git a/code/functions/live-capture/java/nu/marginalia/rss/model/FeedDefinition.java b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedDefinition.java index dd7cce32..8d724c38 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/model/FeedDefinition.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/model/FeedDefinition.java @@ -1,3 +1,28 @@ package nu.marginalia.rss.model; -public record FeedDefinition(String domain, String feedUrl) { } +import javax.annotation.Nullable; +import java.time.Duration; +import java.time.ZonedDateTime; + +public record FeedDefinition( + String domain, + String feedUrl, + @Nullable String updated) +{ + + private static final Duration defaultDuration = Duration.ofDays(30); + + public Duration durationSinceUpdated() { + if (updated == null) { // Default to 30 days if no update time is available + return defaultDuration; + } + + try { + return Duration.between(ZonedDateTime.parse(updated), ZonedDateTime.now()); + } + catch (Exception e) { + return defaultDuration; + } + } + +} diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java index e6595e74..f083806c 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java @@ -5,6 +5,7 @@ import com.google.inject.Inject; import com.opencsv.CSVReader; import nu.marginalia.WmsaHome; import nu.marginalia.executor.client.ExecutorClient; +import nu.marginalia.model.EdgeDomain; import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.rss.db.FeedDb; import nu.marginalia.rss.model.FeedDefinition; @@ -28,6 +29,7 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; @@ -67,11 +69,11 @@ public class FeedFetcherService { this.nodeConfigurationService = nodeConfigurationService; this.serviceHeartbeat = serviceHeartbeat; this.executorClient = executorClient; + + rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher"); } public void updateFeeds() throws IOException { - rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher"); - if (updating) // Prevent concurrent updates { logger.error("Already updating feeds, refusing to start another update"); @@ -82,30 +84,69 @@ public class FeedFetcherService { var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds") ) { updating = true; - var definitions = readDefinitions(); + + // Read the feed definitions from the database, if they are not available, read them from the system's + // RSS exports instead + + Collection definitions = feedDb.getAllFeeds(); + if (definitions == null) { + definitions = readDefinitionsFromSystem(); + } logger.info("Found {} feed definitions", definitions.size()); - AtomicInteger updated = new AtomicInteger(0); + final AtomicInteger definitionsUpdated = new AtomicInteger(0); + final int totalDefinitions = definitions.size(); SimpleBlockingThreadPool executor = new SimpleBlockingThreadPool("FeedFetcher", 64, 4); for (var feed : definitions) { executor.submitQuietly(() -> { + var oldData = feedDb.getFeed(new EdgeDomain(feed.domain())); + + // If we have existing data, we might skip updating it with a probability that increases with time, + // this is to avoid hammering the feeds that are updated very rarely and save some time and resources + // on our end + + if (!oldData.isEmpty()) { + Duration duration = feed.durationSinceUpdated(); + long daysSinceUpdate = duration.toDays(); + + + if (daysSinceUpdate > 2 && ThreadLocalRandom.current() + .nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1) + { + // Skip updating this feed, just write the old data back instead + writer.saveFeed(oldData); + return; + } + } + + var items = fetchFeed(feed); if (!items.isEmpty()) { writer.saveFeed(items); } - if ((updated.incrementAndGet() % 10_000) == 0) { - // Update the progress every 10k feeds, to avoid hammering the database and flooding the logs - heartbeat.progress("Updated " + updated + "/" + definitions.size() + " feeds", updated.get(), definitions.size()); + if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) { + // Update the progress every 1k feeds, to avoid hammering the database and flooding the logs + heartbeat.progress("Updated " + definitionsUpdated + "/" + totalDefinitions + " feeds", definitionsUpdated.get(), totalDefinitions); } }); } executor.shutDown(); - executor.awaitTermination(1, TimeUnit.DAYS); + // Wait for the executor to finish, but give up after 60 minutes to avoid hanging indefinitely + for (int waitedMinutes = 0; waitedMinutes < 60; waitedMinutes++) { + if (executor.awaitTermination(1, TimeUnit.MINUTES)) break; + } + executor.shutDownNow(); + + // Wait for any in-progress writes to finish before switching the database + // in case we ended up murdering the writer with shutDownNow. It's a very + // slim chance but this increases the odds of a clean switch over. + + TimeUnit.SECONDS.sleep(5); feedDb.switchDb(writer); @@ -117,7 +158,7 @@ public class FeedFetcherService { } } - public Collection readDefinitions() throws IOException { + public Collection readDefinitionsFromSystem() throws IOException { Collection storages = getLatestFeedStorages(); List feedDefinitionList = new ArrayList<>(); @@ -134,7 +175,7 @@ public class FeedFetcherService { var domain = row[0].trim(); var feedUrl = row[2].trim(); - feedDefinitionList.add(new FeedDefinition(domain, feedUrl)); + feedDefinitionList.add(new FeedDefinition(domain, feedUrl, null)); } }