From ffde8c83051f2abf1945eb01f8d0c61d1afe19a9 Mon Sep 17 00:00:00 2001 From: vlofgren Date: Wed, 10 Aug 2022 18:46:13 +0200 Subject: [PATCH] Faster crawling --- .../wmsa/edge/crawling/CrawlerMain.java | 43 +++++++++++++------ .../crawling/retreival/CrawlerRetreiver.java | 6 +-- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/CrawlerMain.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/CrawlerMain.java index 6d23c4d1..f6c9a5b6 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/CrawlerMain.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/CrawlerMain.java @@ -29,11 +29,17 @@ public class CrawlerMain implements AutoCloseable { new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", true))); private final UserAgent userAgent; + private final ThreadPoolExecutor pool; + final int poolSize = 256; + final int poolQueueSize = 32; public CrawlerMain(EdgeCrawlPlan plan) throws Exception { this.plan = plan; this.userAgent = WmsaHome.getUserAgent(); + BlockingQueue queue = new LinkedBlockingQueue<>(poolQueueSize); + pool = new ThreadPoolExecutor(poolSize/128, poolSize, 5, TimeUnit.MINUTES, queue); // maybe need to set -Xss for JVM to deal with this? + workLog = plan.createCrawlWorkLog(); crawlDataDir = plan.crawl.getDir(); } @@ -84,31 +90,44 @@ public class CrawlerMain implements AutoCloseable { logger.info("Let's go"); - final int poolSize = 1024; - - BlockingQueue queue = new LinkedBlockingQueue<>(10); - ThreadPoolExecutor pool = new ThreadPoolExecutor(poolSize/128, poolSize, 5, TimeUnit.MINUTES, queue); // maybe need to set -Xss for JVM to deal with this? - AbortMonitor abortMonitor = AbortMonitor.getInstance(); + + Semaphore taskSem = new Semaphore(poolSize); + plan.forEachCrawlingSpecification(spec -> { if (abortMonitor.isAlive()) { - pool.execute(() -> fetchDomain(spec)); + try { + taskSem.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + pool.execute(() -> { + try { + fetchDomain(spec); + } + finally { + taskSem.release(); + } + }); } }); - logger.info("Awaiting termination"); - pool.shutdown(); - - while (!pool.awaitTermination(1, TimeUnit.SECONDS)); - - logger.info("All finished"); } public void close() throws Exception { + logger.info("Awaiting termination"); + pool.shutdown(); + + while (!pool.awaitTermination(1, TimeUnit.SECONDS)); + logger.info("All finished"); + workLog.close(); dispatcher.executorService().shutdownNow(); + + } } diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/retreival/CrawlerRetreiver.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/retreival/CrawlerRetreiver.java index f8b8eab8..802211ce 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/retreival/CrawlerRetreiver.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/retreival/CrawlerRetreiver.java @@ -83,8 +83,6 @@ public class CrawlerRetreiver { } public int fetch() throws IOException { - logger.info("Fetching {}", domain); - Optional probeResult = probeDomainForProblems(domain); if (probeResult.isPresent()) { @@ -272,10 +270,10 @@ public class CrawlerRetreiver { @SneakyThrows private void delay(long crawlDelay, long timeParsed) { if (crawlDelay >= 1) { - if (timeParsed/1000 > crawlDelay) + if (timeParsed > crawlDelay) return; - Thread.sleep(Math.min(1000*crawlDelay-timeParsed, 5000)); + Thread.sleep(Math.min(crawlDelay-timeParsed, 5000)); } else { if (timeParsed > DEFAULT_CRAWL_DELAY_MS)