(feeds) Add per-domain throttling for feed fetcher.

This commit is contained in:
Viktor Lofgren 2024-12-18 22:06:46 +01:00
parent 2dc9f2e639
commit 6a079c1c75
2 changed files with 69 additions and 2 deletions

View File

@ -0,0 +1,66 @@
package nu.marginalia.rss.svc;
import nu.marginalia.model.EdgeDomain;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
/** Holds lock objects for each domain, to prevent multiple threads from
* crawling the same domain at the same time.
*/
public class DomainLocks {
// The locks are stored in a map, with the domain name as the key. This map will grow
// relatively big, but should be manageable since the number of domains is limited to
// a few hundred thousand typically.
private final Map<String, Semaphore> locks = new ConcurrentHashMap<>();
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
return new DomainLock(domain.toString(),
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
}
private Semaphore defaultPermits(String topDomain) {
if (topDomain.equals("wordpress.com"))
return new Semaphore(16);
if (topDomain.equals("blogspot.com"))
return new Semaphore(8);
if (topDomain.equals("neocities.org"))
return new Semaphore(4);
if (topDomain.equals("github.io"))
return new Semaphore(4);
if (topDomain.equals("substack.com")) {
return new Semaphore(1);
}
if (topDomain.endsWith(".edu")) {
return new Semaphore(1);
}
return new Semaphore(2);
}
public static class DomainLock implements AutoCloseable {
private final String domainName;
private final Semaphore semaphore;
DomainLock(String domainName, Semaphore semaphore) throws InterruptedException {
this.domainName = domainName;
this.semaphore = semaphore;
Thread.currentThread().setName("fetching:" + domainName + " [await domain lock]");
semaphore.acquire();
Thread.currentThread().setName("fetching:" + domainName);
}
@Override
public void close() {
semaphore.release();
Thread.currentThread().setName("fetching:" + domainName + " [wrapping up]");
}
}
}

View File

@ -57,6 +57,8 @@ public class FeedFetcherService {
private final ServiceHeartbeat serviceHeartbeat; private final ServiceHeartbeat serviceHeartbeat;
private final ExecutorClient executorClient; private final ExecutorClient executorClient;
private final DomainLocks domainLocks = new DomainLocks();
private volatile boolean updating; private volatile boolean updating;
private boolean deterministic = false; private boolean deterministic = false;
@ -142,9 +144,8 @@ public class FeedFetcherService {
} }
} }
FetchResult feedData; FetchResult feedData;
try { try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) {
feedData = fetchFeedData(feed, client); feedData = fetchFeedData(feed, client);
} }
catch (Exception ex) { catch (Exception ex) {