From b66fb9caf6af0d6ed249643d45aebd649a422d39 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Wed, 18 Dec 2024 17:02:13 +0100 Subject: [PATCH] (feeds) Improve error handling in the feed fetcher. --- .../java/nu/marginalia/rss/db/FeedDb.java | 21 +++ .../nu/marginalia/rss/db/FeedDbReader.java | 23 +++- .../nu/marginalia/rss/db/FeedDbWriter.java | 14 ++ .../rss/svc/FeedFetcherService.java | 103 ++++++++++++--- .../test/nu/marginalia/rss/db/FeedDbTest.java | 22 +++- .../rss/svc/FeedFetcherServiceTest.java | 120 ++++++++++++++++++ 6 files changed, 280 insertions(+), 23 deletions(-) create mode 100644 code/functions/live-capture/test/nu/marginalia/rss/svc/FeedFetcherServiceTest.java diff --git a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java index 921b4e9c..c4909a40 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDb.java @@ -19,6 +19,7 @@ import java.security.MessageDigest; import java.time.Instant; import java.util.Base64; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -85,6 +86,26 @@ public class FeedDb { return List.of(); } + public Map getAllErrorCounts() { + if (!feedDbEnabled) { + throw new IllegalStateException("Feed database is disabled on this node"); + } + + // Capture the current reader to avoid concurrency issues + FeedDbReader reader = this.reader; + + try { + if (reader != null) { + return reader.getAllErrorCounts(); + } + } + catch (Exception e) { + logger.error("Error getting all feeds", e); + } + return Map.of(); + } + + @NotNull public FeedItems getFeed(EdgeDomain domain) { if (!feedDbEnabled) { diff --git a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java index c86ab8f4..9bb02acf 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbReader.java @@ -13,9 +13,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.function.BiConsumer; public class FeedDbReader implements AutoCloseable { @@ -33,6 +31,7 @@ public class FeedDbReader implements AutoCloseable { // Create table if it doesn't exist to avoid errors before any feeds have been fetched try (var stmt = connection.createStatement()) { stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS errors (domain TEXT PRIMARY KEY, cnt INT DEFAULT 0)"); } } @@ -76,6 +75,22 @@ public class FeedDbReader implements AutoCloseable { return Optional.empty(); } + public Map getAllErrorCounts() { + Map ret = new HashMap<>(100_000); + + try (var stmt = connection.prepareStatement("SELECT domain, cnt FROM errors")) { + + var rs = stmt.executeQuery(); + while (rs.next()) { + ret.put(rs.getString(1), rs.getInt(2)); + } + } catch (SQLException e) { + logger.error("Error getting errors", e); + } + + return ret; + } + public FeedItems getFeed(EdgeDomain domain) { try (var stmt = connection.prepareStatement("SELECT FEED FROM feed WHERE DOMAIN = ?")) { stmt.setString(1, domain.toString()); @@ -124,4 +139,6 @@ public class FeedDbReader implements AutoCloseable { logger.error("Error getting updated links", e); } } + + } diff --git a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java index 36b5f710..bbd6354e 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/db/FeedDbWriter.java @@ -19,6 +19,7 @@ public class FeedDbWriter implements AutoCloseable { private final Connection connection; private final PreparedStatement insertFeedStmt; + private final PreparedStatement insertErrorStmt; private final Path dbPath; private volatile boolean closed = false; @@ -32,9 +33,11 @@ public class FeedDbWriter implements AutoCloseable { try (var stmt = connection.createStatement()) { stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS errors (domain TEXT PRIMARY KEY, cnt INT DEFAULT 0)"); } insertFeedStmt = connection.prepareStatement("INSERT INTO feed (domain, feed) VALUES (?, ?)"); + insertErrorStmt = connection.prepareStatement("INSERT INTO errors (domain, cnt) VALUES (?, ?)"); } public Path getDbPath() { @@ -53,6 +56,17 @@ public class FeedDbWriter implements AutoCloseable { } } + public synchronized void setErrorCount(String domain, int count) { + try { + insertErrorStmt.setString(1, domain); + insertErrorStmt.setInt(2, count); + insertErrorStmt.executeUpdate(); + } + catch (SQLException ex) { + logger.error("Error saving error count " + domain, ex); + } + } + private String serialize(FeedItems items) { return gson.toJson(items); } diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java index 5cbf6019..cb1e6e4d 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java @@ -9,6 +9,7 @@ import nu.marginalia.executor.client.ExecutorClient; import nu.marginalia.model.EdgeDomain; import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.rss.db.FeedDb; +import nu.marginalia.rss.db.FeedDbWriter; import nu.marginalia.rss.model.FeedDefinition; import nu.marginalia.rss.model.FeedItem; import nu.marginalia.rss.model.FeedItems; @@ -21,10 +22,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.sql.SQLException; import java.time.Duration; import java.time.LocalDateTime; @@ -43,14 +47,7 @@ public class FeedFetcherService { private static final int MAX_FEED_ITEMS = 10; private static final Logger logger = LoggerFactory.getLogger(FeedFetcherService.class); - private final RssReader rssReader = new RssReader( - HttpClient.newBuilder() - .connectTimeout(Duration.ofSeconds(5)) - .executor(Executors.newCachedThreadPool()) - .followRedirects(HttpClient.Redirect.NORMAL) - .version(HttpClient.Version.HTTP_2) - .build() - ); + private final RssReader rssReader = new RssReader(); private final FeedDb feedDb; private final FileStorageService fileStorageService; @@ -59,6 +56,7 @@ public class FeedFetcherService { private final ExecutorClient executorClient; private volatile boolean updating; + private boolean deterministic = false; @Inject public FeedFetcherService(FeedDb feedDb, @@ -72,8 +70,6 @@ public class FeedFetcherService { this.nodeConfigurationService = nodeConfigurationService; this.serviceHeartbeat = serviceHeartbeat; this.executorClient = executorClient; - - rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier()); } public enum UpdateMode { @@ -81,14 +77,25 @@ public class FeedFetcherService { REFRESH }; + /** Disable random-based heuristics. This is meant for testing */ + public void setDeterministic() { + this.deterministic = true; + } + public void updateFeeds(UpdateMode updateMode) throws IOException { if (updating) // Prevent concurrent updates { throw new IllegalStateException("Already updating feeds, refusing to start another update"); } - try (var writer = feedDb.createWriter(); - var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds") + try (FeedDbWriter writer = feedDb.createWriter(); + HttpClient client = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(15)) + .executor(Executors.newCachedThreadPool()) + .followRedirects(HttpClient.Redirect.NORMAL) + .version(HttpClient.Version.HTTP_2) + .build(); + var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds") ) { updating = true; @@ -96,6 +103,7 @@ public class FeedFetcherService { // RSS exports instead Collection definitions = feedDb.getAllFeeds(); + Map errorCounts = feedDb.getAllErrorCounts(); // If we didn't get any definitions, or a clean update is requested, read the definitions from the system // instead @@ -123,8 +131,8 @@ public class FeedFetcherService { long daysSinceUpdate = duration.toDays(); - if (daysSinceUpdate > 2 && ThreadLocalRandom.current() - .nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1) + if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current() + .nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)) { // Skip updating this feed, just write the old data back instead writer.saveFeed(oldData); @@ -133,9 +141,26 @@ public class FeedFetcherService { } - var items = fetchFeed(feed); - if (!items.isEmpty()) { - writer.saveFeed(items); + FetchResult feedData; + try { + feedData = fetchFeedData(feed, client); + } + catch (Exception ex) { + feedData = new FetchResult.TransientError(); + } + + switch (feedData) { + case FetchResult.Success(String value) -> writer.saveFeed(fetchFeed(value, feed)); + case FetchResult.TransientError() -> { + int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0); + writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount); + + if (errorCount < 5) { + // Permit the server a few days worth of retries before we drop the feed entirely + writer.saveFeed(oldData); + } + } + case FetchResult.PermanentError() -> {} // let the definition be forgotten about } if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) { @@ -168,6 +193,46 @@ public class FeedFetcherService { } } + private FetchResult fetchFeedData(FeedDefinition feed, HttpClient client) { + try { + URI uri = new URI(feed.feedUrl()); + + HttpRequest getRequest = HttpRequest.newBuilder() + .GET() + .uri(uri) + .header("User-Agent", WmsaHome.getUserAgent().uaIdentifier()) + .header("Accept-Encoding", "gzip") + .header("Accept", "text/*, */*;q=0.9") + .timeout(Duration.ofSeconds(15)) + .build(); + + for (int i = 0; i < 3; i++) { + var rs = client.send(getRequest, HttpResponse.BodyHandlers.ofString()); + if (429 == rs.statusCode()) { + int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2")); + Thread.sleep(Duration.ofSeconds(Math.clamp(retryAfter, 1, 5))); + } else if (200 == rs.statusCode()) { + return new FetchResult.Success(rs.body()); + } else if (404 == rs.statusCode()) { + return new FetchResult.PermanentError(); // never try again + } else { + return new FetchResult.TransientError(); // we try again in a few days + } + } + } + catch (Exception ex) { + logger.debug("Error fetching feed", ex); + } + + return new FetchResult.TransientError(); + } + + public sealed interface FetchResult { + record Success(String value) implements FetchResult {} + record TransientError() implements FetchResult {} + record PermanentError() implements FetchResult {} + } + public Collection readDefinitionsFromSystem() throws IOException { Collection storages = getLatestFeedStorages(); List feedDefinitionList = new ArrayList<>(); @@ -231,9 +296,9 @@ public class FeedFetcherService { } } - public FeedItems fetchFeed(FeedDefinition definition) { + public FeedItems fetchFeed(String feedData, FeedDefinition definition) { try { - List rawItems = rssReader.read(definition.feedUrl()).toList(); + List rawItems = rssReader.read(new ByteArrayInputStream(feedData.getBytes())).toList(); boolean keepUriFragment = rawItems.size() < 2 || areFragmentsDisparate(rawItems); diff --git a/code/functions/live-capture/test/nu/marginalia/rss/db/FeedDbTest.java b/code/functions/live-capture/test/nu/marginalia/rss/db/FeedDbTest.java index e2dba9e1..620734fe 100644 --- a/code/functions/live-capture/test/nu/marginalia/rss/db/FeedDbTest.java +++ b/code/functions/live-capture/test/nu/marginalia/rss/db/FeedDbTest.java @@ -8,9 +8,30 @@ import org.junit.jupiter.api.Test; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Map; class FeedDbTest { + @Test + public void testErrorCounts() throws Exception { + Path dbPath = Files.createTempFile("rss-feeds", ".db"); + + try { + FeedDb db = new FeedDb(dbPath); + + try (var writer = db.createWriter()) { + writer.setErrorCount("foo", 1); + writer.setErrorCount("bar", 5); + db.switchDb(writer); + } + + Map allErrorCounts = db.getAllErrorCounts(); + Assertions.assertEquals(Map.of("foo", 1, "bar", 5), allErrorCounts); + } finally { + Files.delete(dbPath); + } + } + @Test public void testDbHash() throws Exception{ Path dbPath = Files.createTempFile("rss-feeds", ".db"); @@ -31,6 +52,5 @@ class FeedDbTest { } finally { Files.delete(dbPath); } - } } \ No newline at end of file diff --git a/code/functions/live-capture/test/nu/marginalia/rss/svc/FeedFetcherServiceTest.java b/code/functions/live-capture/test/nu/marginalia/rss/svc/FeedFetcherServiceTest.java new file mode 100644 index 00000000..88fb07cf --- /dev/null +++ b/code/functions/live-capture/test/nu/marginalia/rss/svc/FeedFetcherServiceTest.java @@ -0,0 +1,120 @@ +package nu.marginalia.rss.svc; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.name.Names; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.rss.db.FeedDb; +import nu.marginalia.rss.model.FeedItems; +import nu.marginalia.service.ServiceId; +import nu.marginalia.service.discovery.ServiceRegistryIf; +import nu.marginalia.service.module.ServiceConfiguration; +import nu.marginalia.test.TestMigrationLoader; +import org.junit.jupiter.api.*; +import org.mockito.Mockito; +import org.testcontainers.containers.MariaDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.UUID; + +@Tag("slow") +@Testcontainers +class FeedFetcherServiceTest extends AbstractModule { + FeedFetcherService feedFetcherService; + FeedDb feedDb; + + @Container + static MariaDBContainer mariaDBContainer = new MariaDBContainer<>("mariadb") + .withDatabaseName("WMSA_prod") + .withUsername("wmsa") + .withPassword("wmsa") + .withNetworkAliases("mariadb"); + + static HikariDataSource dataSource; + static Path tempDir; + + @BeforeAll + public static void setUpDb() throws IOException { + tempDir = Files.createTempDirectory(FeedFetcherServiceTest.class.getSimpleName()); + + System.setProperty("system.homePath", tempDir.toString()); + + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(mariaDBContainer.getJdbcUrl()); + config.setUsername("wmsa"); + config.setPassword("wmsa"); + + dataSource = new HikariDataSource(config); + + TestMigrationLoader.flywayMigration(dataSource); + } + + @BeforeEach + public void setUp() throws IOException { + if (!Files.exists(tempDir)) { + Files.createDirectory(tempDir); + } + // Trick WmsaHome that this is a full home directory + if (!Files.exists(tempDir.resolve("model"))) { + Files.createDirectory(tempDir.resolve("model")); + } + if (!Files.exists(tempDir.resolve("data"))) { + Files.createDirectory(tempDir.resolve("data")); + } + var injector = Guice.createInjector(this); + + feedDb = injector.getInstance(FeedDb.class); + feedFetcherService = injector.getInstance(FeedFetcherService.class); + + } + + @AfterEach + public void tearDown() throws IOException { + FileUtils.deleteDirectory(tempDir.toFile()); + } + + public void configure() { + bind(HikariDataSource.class).toInstance(dataSource); + bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class)); + bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID())); + bind(Integer.class).annotatedWith(Names.named("wmsa-system-node")).toInstance(1); + } + + @Tag("flaky") + @Test + public void testSunnyDay() throws Exception { + try (var writer = feedDb.createWriter()) { + writer.saveFeed(new FeedItems("www.marginalia.nu", "https://www.marginalia.nu/log/index.xml", "", List.of())); + feedDb.switchDb(writer); + } + + feedFetcherService.setDeterministic(); + feedFetcherService.updateFeeds(FeedFetcherService.UpdateMode.REFRESH); + + Assertions.assertFalse(feedDb.getFeed(new EdgeDomain("www.marginalia.nu")).isEmpty()); + } + + @Tag("flaky") + @Test + public void test404() throws Exception { + try (var writer = feedDb.createWriter()) { + writer.saveFeed(new FeedItems("www.marginalia.nu", "https://www.marginalia.nu/log/missing.xml", "", List.of())); + feedDb.switchDb(writer); + } + + feedFetcherService.setDeterministic(); + feedFetcherService.updateFeeds(FeedFetcherService.UpdateMode.REFRESH); + + // We forget the feed on a 404 error + Assertions.assertEquals(FeedItems.none(), feedDb.getFeed(new EdgeDomain("www.marginalia.nu"))); + } + +} \ No newline at end of file