mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 13:09:00 +00:00
(feeds) Improve error handling in the feed fetcher.
This commit is contained in:
parent
eb2fe18867
commit
b66fb9caf6
@ -19,6 +19,7 @@ import java.security.MessageDigest;
|
||||
import java.time.Instant;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
@ -85,6 +86,26 @@ public class FeedDb {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
public Map<String, Integer> getAllErrorCounts() {
|
||||
if (!feedDbEnabled) {
|
||||
throw new IllegalStateException("Feed database is disabled on this node");
|
||||
}
|
||||
|
||||
// Capture the current reader to avoid concurrency issues
|
||||
FeedDbReader reader = this.reader;
|
||||
|
||||
try {
|
||||
if (reader != null) {
|
||||
return reader.getAllErrorCounts();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Error getting all feeds", e);
|
||||
}
|
||||
return Map.of();
|
||||
}
|
||||
|
||||
|
||||
@NotNull
|
||||
public FeedItems getFeed(EdgeDomain domain) {
|
||||
if (!feedDbEnabled) {
|
||||
|
@ -13,9 +13,7 @@ import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
public class FeedDbReader implements AutoCloseable {
|
||||
@ -33,6 +31,7 @@ public class FeedDbReader implements AutoCloseable {
|
||||
// Create table if it doesn't exist to avoid errors before any feeds have been fetched
|
||||
try (var stmt = connection.createStatement()) {
|
||||
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)");
|
||||
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS errors (domain TEXT PRIMARY KEY, cnt INT DEFAULT 0)");
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,6 +75,22 @@ public class FeedDbReader implements AutoCloseable {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public Map<String, Integer> getAllErrorCounts() {
|
||||
Map<String, Integer> ret = new HashMap<>(100_000);
|
||||
|
||||
try (var stmt = connection.prepareStatement("SELECT domain, cnt FROM errors")) {
|
||||
|
||||
var rs = stmt.executeQuery();
|
||||
while (rs.next()) {
|
||||
ret.put(rs.getString(1), rs.getInt(2));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.error("Error getting errors", e);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
public FeedItems getFeed(EdgeDomain domain) {
|
||||
try (var stmt = connection.prepareStatement("SELECT FEED FROM feed WHERE DOMAIN = ?")) {
|
||||
stmt.setString(1, domain.toString());
|
||||
@ -124,4 +139,6 @@ public class FeedDbReader implements AutoCloseable {
|
||||
logger.error("Error getting updated links", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ public class FeedDbWriter implements AutoCloseable {
|
||||
|
||||
private final Connection connection;
|
||||
private final PreparedStatement insertFeedStmt;
|
||||
private final PreparedStatement insertErrorStmt;
|
||||
private final Path dbPath;
|
||||
|
||||
private volatile boolean closed = false;
|
||||
@ -32,9 +33,11 @@ public class FeedDbWriter implements AutoCloseable {
|
||||
|
||||
try (var stmt = connection.createStatement()) {
|
||||
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS feed (domain TEXT PRIMARY KEY, feed JSON)");
|
||||
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS errors (domain TEXT PRIMARY KEY, cnt INT DEFAULT 0)");
|
||||
}
|
||||
|
||||
insertFeedStmt = connection.prepareStatement("INSERT INTO feed (domain, feed) VALUES (?, ?)");
|
||||
insertErrorStmt = connection.prepareStatement("INSERT INTO errors (domain, cnt) VALUES (?, ?)");
|
||||
}
|
||||
|
||||
public Path getDbPath() {
|
||||
@ -53,6 +56,17 @@ public class FeedDbWriter implements AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void setErrorCount(String domain, int count) {
|
||||
try {
|
||||
insertErrorStmt.setString(1, domain);
|
||||
insertErrorStmt.setInt(2, count);
|
||||
insertErrorStmt.executeUpdate();
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
logger.error("Error saving error count " + domain, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private String serialize(FeedItems items) {
|
||||
return gson.toJson(items);
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import nu.marginalia.executor.client.ExecutorClient;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.rss.db.FeedDb;
|
||||
import nu.marginalia.rss.db.FeedDbWriter;
|
||||
import nu.marginalia.rss.model.FeedDefinition;
|
||||
import nu.marginalia.rss.model.FeedItem;
|
||||
import nu.marginalia.rss.model.FeedItems;
|
||||
@ -21,10 +22,13 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
@ -43,14 +47,7 @@ public class FeedFetcherService {
|
||||
private static final int MAX_FEED_ITEMS = 10;
|
||||
private static final Logger logger = LoggerFactory.getLogger(FeedFetcherService.class);
|
||||
|
||||
private final RssReader rssReader = new RssReader(
|
||||
HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(5))
|
||||
.executor(Executors.newCachedThreadPool())
|
||||
.followRedirects(HttpClient.Redirect.NORMAL)
|
||||
.version(HttpClient.Version.HTTP_2)
|
||||
.build()
|
||||
);
|
||||
private final RssReader rssReader = new RssReader();
|
||||
|
||||
private final FeedDb feedDb;
|
||||
private final FileStorageService fileStorageService;
|
||||
@ -59,6 +56,7 @@ public class FeedFetcherService {
|
||||
private final ExecutorClient executorClient;
|
||||
|
||||
private volatile boolean updating;
|
||||
private boolean deterministic = false;
|
||||
|
||||
@Inject
|
||||
public FeedFetcherService(FeedDb feedDb,
|
||||
@ -72,8 +70,6 @@ public class FeedFetcherService {
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
this.serviceHeartbeat = serviceHeartbeat;
|
||||
this.executorClient = executorClient;
|
||||
|
||||
rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier());
|
||||
}
|
||||
|
||||
public enum UpdateMode {
|
||||
@ -81,14 +77,25 @@ public class FeedFetcherService {
|
||||
REFRESH
|
||||
};
|
||||
|
||||
/** Disable random-based heuristics. This is meant for testing */
|
||||
public void setDeterministic() {
|
||||
this.deterministic = true;
|
||||
}
|
||||
|
||||
public void updateFeeds(UpdateMode updateMode) throws IOException {
|
||||
if (updating) // Prevent concurrent updates
|
||||
{
|
||||
throw new IllegalStateException("Already updating feeds, refusing to start another update");
|
||||
}
|
||||
|
||||
try (var writer = feedDb.createWriter();
|
||||
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
|
||||
try (FeedDbWriter writer = feedDb.createWriter();
|
||||
HttpClient client = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(15))
|
||||
.executor(Executors.newCachedThreadPool())
|
||||
.followRedirects(HttpClient.Redirect.NORMAL)
|
||||
.version(HttpClient.Version.HTTP_2)
|
||||
.build();
|
||||
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Update Rss Feeds")
|
||||
) {
|
||||
updating = true;
|
||||
|
||||
@ -96,6 +103,7 @@ public class FeedFetcherService {
|
||||
// RSS exports instead
|
||||
|
||||
Collection<FeedDefinition> definitions = feedDb.getAllFeeds();
|
||||
Map<String, Integer> errorCounts = feedDb.getAllErrorCounts();
|
||||
|
||||
// If we didn't get any definitions, or a clean update is requested, read the definitions from the system
|
||||
// instead
|
||||
@ -123,8 +131,8 @@ public class FeedFetcherService {
|
||||
long daysSinceUpdate = duration.toDays();
|
||||
|
||||
|
||||
if (daysSinceUpdate > 2 && ThreadLocalRandom.current()
|
||||
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1)
|
||||
if (deterministic || (daysSinceUpdate > 2 && ThreadLocalRandom.current()
|
||||
.nextInt(1, 1 + (int) Math.min(10, daysSinceUpdate) / 2) > 1))
|
||||
{
|
||||
// Skip updating this feed, just write the old data back instead
|
||||
writer.saveFeed(oldData);
|
||||
@ -133,9 +141,26 @@ public class FeedFetcherService {
|
||||
}
|
||||
|
||||
|
||||
var items = fetchFeed(feed);
|
||||
if (!items.isEmpty()) {
|
||||
writer.saveFeed(items);
|
||||
FetchResult feedData;
|
||||
try {
|
||||
feedData = fetchFeedData(feed, client);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
feedData = new FetchResult.TransientError();
|
||||
}
|
||||
|
||||
switch (feedData) {
|
||||
case FetchResult.Success(String value) -> writer.saveFeed(fetchFeed(value, feed));
|
||||
case FetchResult.TransientError() -> {
|
||||
int errorCount = errorCounts.getOrDefault(feed.domain().toLowerCase(), 0);
|
||||
writer.setErrorCount(feed.domain().toLowerCase(), ++errorCount);
|
||||
|
||||
if (errorCount < 5) {
|
||||
// Permit the server a few days worth of retries before we drop the feed entirely
|
||||
writer.saveFeed(oldData);
|
||||
}
|
||||
}
|
||||
case FetchResult.PermanentError() -> {} // let the definition be forgotten about
|
||||
}
|
||||
|
||||
if ((definitionsUpdated.incrementAndGet() % 1_000) == 0) {
|
||||
@ -168,6 +193,46 @@ public class FeedFetcherService {
|
||||
}
|
||||
}
|
||||
|
||||
private FetchResult fetchFeedData(FeedDefinition feed, HttpClient client) {
|
||||
try {
|
||||
URI uri = new URI(feed.feedUrl());
|
||||
|
||||
HttpRequest getRequest = HttpRequest.newBuilder()
|
||||
.GET()
|
||||
.uri(uri)
|
||||
.header("User-Agent", WmsaHome.getUserAgent().uaIdentifier())
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.header("Accept", "text/*, */*;q=0.9")
|
||||
.timeout(Duration.ofSeconds(15))
|
||||
.build();
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
var rs = client.send(getRequest, HttpResponse.BodyHandlers.ofString());
|
||||
if (429 == rs.statusCode()) {
|
||||
int retryAfter = Integer.parseInt(rs.headers().firstValue("Retry-After").orElse("2"));
|
||||
Thread.sleep(Duration.ofSeconds(Math.clamp(retryAfter, 1, 5)));
|
||||
} else if (200 == rs.statusCode()) {
|
||||
return new FetchResult.Success(rs.body());
|
||||
} else if (404 == rs.statusCode()) {
|
||||
return new FetchResult.PermanentError(); // never try again
|
||||
} else {
|
||||
return new FetchResult.TransientError(); // we try again in a few days
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.debug("Error fetching feed", ex);
|
||||
}
|
||||
|
||||
return new FetchResult.TransientError();
|
||||
}
|
||||
|
||||
public sealed interface FetchResult {
|
||||
record Success(String value) implements FetchResult {}
|
||||
record TransientError() implements FetchResult {}
|
||||
record PermanentError() implements FetchResult {}
|
||||
}
|
||||
|
||||
public Collection<FeedDefinition> readDefinitionsFromSystem() throws IOException {
|
||||
Collection<FileStorage> storages = getLatestFeedStorages();
|
||||
List<FeedDefinition> feedDefinitionList = new ArrayList<>();
|
||||
@ -231,9 +296,9 @@ public class FeedFetcherService {
|
||||
}
|
||||
}
|
||||
|
||||
public FeedItems fetchFeed(FeedDefinition definition) {
|
||||
public FeedItems fetchFeed(String feedData, FeedDefinition definition) {
|
||||
try {
|
||||
List<Item> rawItems = rssReader.read(definition.feedUrl()).toList();
|
||||
List<Item> rawItems = rssReader.read(new ByteArrayInputStream(feedData.getBytes())).toList();
|
||||
|
||||
boolean keepUriFragment = rawItems.size() < 2 || areFragmentsDisparate(rawItems);
|
||||
|
||||
|
@ -8,9 +8,30 @@ import org.junit.jupiter.api.Test;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
class FeedDbTest {
|
||||
|
||||
@Test
|
||||
public void testErrorCounts() throws Exception {
|
||||
Path dbPath = Files.createTempFile("rss-feeds", ".db");
|
||||
|
||||
try {
|
||||
FeedDb db = new FeedDb(dbPath);
|
||||
|
||||
try (var writer = db.createWriter()) {
|
||||
writer.setErrorCount("foo", 1);
|
||||
writer.setErrorCount("bar", 5);
|
||||
db.switchDb(writer);
|
||||
}
|
||||
|
||||
Map<String, Integer> allErrorCounts = db.getAllErrorCounts();
|
||||
Assertions.assertEquals(Map.of("foo", 1, "bar", 5), allErrorCounts);
|
||||
} finally {
|
||||
Files.delete(dbPath);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDbHash() throws Exception{
|
||||
Path dbPath = Files.createTempFile("rss-feeds", ".db");
|
||||
@ -31,6 +52,5 @@ class FeedDbTest {
|
||||
} finally {
|
||||
Files.delete(dbPath);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,120 @@
|
||||
package nu.marginalia.rss.svc;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.name.Names;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.rss.db.FeedDb;
|
||||
import nu.marginalia.rss.model.FeedItems;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import nu.marginalia.test.TestMigrationLoader;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.mockito.Mockito;
|
||||
import org.testcontainers.containers.MariaDBContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Tag("slow")
|
||||
@Testcontainers
|
||||
class FeedFetcherServiceTest extends AbstractModule {
|
||||
FeedFetcherService feedFetcherService;
|
||||
FeedDb feedDb;
|
||||
|
||||
@Container
|
||||
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
|
||||
.withDatabaseName("WMSA_prod")
|
||||
.withUsername("wmsa")
|
||||
.withPassword("wmsa")
|
||||
.withNetworkAliases("mariadb");
|
||||
|
||||
static HikariDataSource dataSource;
|
||||
static Path tempDir;
|
||||
|
||||
@BeforeAll
|
||||
public static void setUpDb() throws IOException {
|
||||
tempDir = Files.createTempDirectory(FeedFetcherServiceTest.class.getSimpleName());
|
||||
|
||||
System.setProperty("system.homePath", tempDir.toString());
|
||||
|
||||
HikariConfig config = new HikariConfig();
|
||||
config.setJdbcUrl(mariaDBContainer.getJdbcUrl());
|
||||
config.setUsername("wmsa");
|
||||
config.setPassword("wmsa");
|
||||
|
||||
dataSource = new HikariDataSource(config);
|
||||
|
||||
TestMigrationLoader.flywayMigration(dataSource);
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws IOException {
|
||||
if (!Files.exists(tempDir)) {
|
||||
Files.createDirectory(tempDir);
|
||||
}
|
||||
// Trick WmsaHome that this is a full home directory
|
||||
if (!Files.exists(tempDir.resolve("model"))) {
|
||||
Files.createDirectory(tempDir.resolve("model"));
|
||||
}
|
||||
if (!Files.exists(tempDir.resolve("data"))) {
|
||||
Files.createDirectory(tempDir.resolve("data"));
|
||||
}
|
||||
var injector = Guice.createInjector(this);
|
||||
|
||||
feedDb = injector.getInstance(FeedDb.class);
|
||||
feedFetcherService = injector.getInstance(FeedFetcherService.class);
|
||||
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() throws IOException {
|
||||
FileUtils.deleteDirectory(tempDir.toFile());
|
||||
}
|
||||
|
||||
public void configure() {
|
||||
bind(HikariDataSource.class).toInstance(dataSource);
|
||||
bind(ServiceRegistryIf.class).toInstance(Mockito.mock(ServiceRegistryIf.class));
|
||||
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(ServiceId.Executor, 1, "", "", 0, UUID.randomUUID()));
|
||||
bind(Integer.class).annotatedWith(Names.named("wmsa-system-node")).toInstance(1);
|
||||
}
|
||||
|
||||
@Tag("flaky")
|
||||
@Test
|
||||
public void testSunnyDay() throws Exception {
|
||||
try (var writer = feedDb.createWriter()) {
|
||||
writer.saveFeed(new FeedItems("www.marginalia.nu", "https://www.marginalia.nu/log/index.xml", "", List.of()));
|
||||
feedDb.switchDb(writer);
|
||||
}
|
||||
|
||||
feedFetcherService.setDeterministic();
|
||||
feedFetcherService.updateFeeds(FeedFetcherService.UpdateMode.REFRESH);
|
||||
|
||||
Assertions.assertFalse(feedDb.getFeed(new EdgeDomain("www.marginalia.nu")).isEmpty());
|
||||
}
|
||||
|
||||
@Tag("flaky")
|
||||
@Test
|
||||
public void test404() throws Exception {
|
||||
try (var writer = feedDb.createWriter()) {
|
||||
writer.saveFeed(new FeedItems("www.marginalia.nu", "https://www.marginalia.nu/log/missing.xml", "", List.of()));
|
||||
feedDb.switchDb(writer);
|
||||
}
|
||||
|
||||
feedFetcherService.setDeterministic();
|
||||
feedFetcherService.updateFeeds(FeedFetcherService.UpdateMode.REFRESH);
|
||||
|
||||
// We forget the feed on a 404 error
|
||||
Assertions.assertEquals(FeedItems.none(), feedDb.getFeed(new EdgeDomain("www.marginalia.nu")));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user