diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/DomainLocks.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/DomainLocks.java new file mode 100644 index 00000000..0d5d1e2b --- /dev/null +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/DomainLocks.java @@ -0,0 +1,66 @@ +package nu.marginalia.rss.svc; + +import nu.marginalia.model.EdgeDomain; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; + +/** Holds lock objects for each domain, to prevent multiple threads from + * crawling the same domain at the same time. + */ +public class DomainLocks { + // The locks are stored in a map, with the domain name as the key. This map will grow + // relatively big, but should be manageable since the number of domains is limited to + // a few hundred thousand typically. + private final Map locks = new ConcurrentHashMap<>(); + + /** Returns a lock object corresponding to the given domain. The object is returned as-is, + * and may be held by another thread. The caller is responsible for locking and releasing the lock. + */ + public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException { + return new DomainLock(domain.toString(), + locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits)); + } + + private Semaphore defaultPermits(String topDomain) { + if (topDomain.equals("wordpress.com")) + return new Semaphore(16); + if (topDomain.equals("blogspot.com")) + return new Semaphore(8); + + if (topDomain.equals("neocities.org")) + return new Semaphore(4); + if (topDomain.equals("github.io")) + return new Semaphore(4); + + if (topDomain.equals("substack.com")) { + return new Semaphore(1); + } + if (topDomain.endsWith(".edu")) { + return new Semaphore(1); + } + + return new Semaphore(2); + } + + public static class DomainLock implements AutoCloseable { + private final String domainName; + private final Semaphore semaphore; + + DomainLock(String domainName, Semaphore semaphore) throws InterruptedException { + this.domainName = domainName; + this.semaphore = semaphore; + + Thread.currentThread().setName("fetching:" + domainName + " [await domain lock]"); + semaphore.acquire(); + Thread.currentThread().setName("fetching:" + domainName); + } + + @Override + public void close() { + semaphore.release(); + Thread.currentThread().setName("fetching:" + domainName + " [wrapping up]"); + } + } +} diff --git a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java index 42768bfd..771bfb2e 100644 --- a/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java +++ b/code/functions/live-capture/java/nu/marginalia/rss/svc/FeedFetcherService.java @@ -57,6 +57,8 @@ public class FeedFetcherService { private final ServiceHeartbeat serviceHeartbeat; private final ExecutorClient executorClient; + private final DomainLocks domainLocks = new DomainLocks(); + private volatile boolean updating; private boolean deterministic = false; @@ -142,9 +144,8 @@ public class FeedFetcherService { } } - FetchResult feedData; - try { + try (DomainLocks.DomainLock domainLock = domainLocks.lockDomain(new EdgeDomain(feed.domain()))) { feedData = fetchFeedData(feed, client); } catch (Exception ex) {