From 7305afa0f81c6b6f9c1711c2396fb11db9285e44 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Tue, 15 Oct 2024 17:27:59 +0200 Subject: [PATCH] (crawler) Clean up the crawler code a bit, removing vestigial abstractions and historical debris --- ...CrawlingThenConvertingIntegrationTest.java | 18 +- .../nu/marginalia/crawl/AbortMonitor.java | 46 ---- .../java/nu/marginalia/crawl/CrawlerMain.java | 200 +++++++++++++----- .../marginalia/crawl/logic/DomainLocks.java | 25 ++- .../crawl/retreival/CrawlDataReference.java | 12 +- .../crawl/retreival/CrawlerRetreiver.java | 14 +- .../crawl/retreival/DomainCrawlFrontier.java | 2 +- .../revisit/DocumentWithReference.java | 2 +- .../crawl/spec/CrawlSpecProvider.java | 137 ------------ .../retreival/CrawlerMockFetcherTest.java | 10 +- .../retreival/CrawlerRetreiverTest.java | 22 +- 11 files changed, 209 insertions(+), 279 deletions(-) delete mode 100644 code/processes/crawling-process/java/nu/marginalia/crawl/AbortMonitor.java delete mode 100644 code/processes/crawling-process/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java diff --git a/code/processes/converting-process/test/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java b/code/processes/converting-process/test/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java index df6a4c2c..6476a45b 100644 --- a/code/processes/converting-process/test/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java +++ b/code/processes/converting-process/test/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java @@ -7,12 +7,12 @@ import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; import nu.marginalia.converting.model.ProcessedDomain; import nu.marginalia.converting.processor.DomainProcessor; +import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.DomainProber; -import nu.marginalia.crawl.spec.CrawlSpecProvider; import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.crawl.DomainIndexingState; @@ -77,7 +77,7 @@ public class CrawlingThenConvertingIntegrationTest { @Test public void testInvalidDomain() throws IOException { // Attempt to fetch an invalid domain - var specs = new CrawlSpecProvider.CrawlSpecRecord("invalid.invalid.invalid", 10); + var specs = new CrawlerMain.CrawlSpecRecord("invalid.invalid.invalid", 10); CrawledDomain crawlData = crawl(specs); @@ -93,7 +93,7 @@ public class CrawlingThenConvertingIntegrationTest { @Test public void testRedirectingDomain() throws IOException { // Attempt to fetch an invalid domain - var specs = new CrawlSpecProvider.CrawlSpecRecord("memex.marginalia.nu", 10); + var specs = new CrawlerMain.CrawlSpecRecord("memex.marginalia.nu", 10); CrawledDomain crawlData = crawl(specs); @@ -112,7 +112,7 @@ public class CrawlingThenConvertingIntegrationTest { @Test public void testBlockedDomain() throws IOException { // Attempt to fetch an invalid domain - var specs = new CrawlSpecProvider.CrawlSpecRecord("search.marginalia.nu", 10); + var specs = new CrawlerMain.CrawlSpecRecord("search.marginalia.nu", 10); CrawledDomain crawlData = crawl(specs, d->false); // simulate blocking by blacklisting everything @@ -128,7 +128,7 @@ public class CrawlingThenConvertingIntegrationTest { @Test public void crawlSunnyDay() throws IOException { - var specs = new CrawlSpecProvider.CrawlSpecRecord("www.marginalia.nu", 10); + var specs = new CrawlerMain.CrawlSpecRecord("www.marginalia.nu", 10); CrawledDomain domain = crawl(specs); assertFalse(domain.doc.isEmpty()); @@ -161,7 +161,7 @@ public class CrawlingThenConvertingIntegrationTest { @Test public void crawlContentTypes() throws IOException { - var specs = new CrawlSpecProvider.CrawlSpecRecord("www.marginalia.nu", 10, + var specs = new CrawlerMain.CrawlSpecRecord("www.marginalia.nu", 10, List.of( "https://www.marginalia.nu/sanic.png", "https://www.marginalia.nu/invalid" @@ -199,7 +199,7 @@ public class CrawlingThenConvertingIntegrationTest { @Test public void crawlRobotsTxt() throws IOException { - var specs = new CrawlSpecProvider.CrawlSpecRecord("search.marginalia.nu", 5, + var specs = new CrawlerMain.CrawlSpecRecord("search.marginalia.nu", 5, List.of("https://search.marginalia.nu/search?q=hello+world") ); @@ -238,11 +238,11 @@ public class CrawlingThenConvertingIntegrationTest { return null; // unreachable } } - private CrawledDomain crawl(CrawlSpecProvider.CrawlSpecRecord specs) throws IOException { + private CrawledDomain crawl(CrawlerMain.CrawlSpecRecord specs) throws IOException { return crawl(specs, domain -> true); } - private CrawledDomain crawl(CrawlSpecProvider.CrawlSpecRecord specs, Predicate domainBlacklist) throws IOException { + private CrawledDomain crawl(CrawlerMain.CrawlSpecRecord specs, Predicate domainBlacklist) throws IOException { List data = new ArrayList<>(); try (var recorder = new WarcRecorder(fileName)) { diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/AbortMonitor.java b/code/processes/crawling-process/java/nu/marginalia/crawl/AbortMonitor.java deleted file mode 100644 index 891cea6c..00000000 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/AbortMonitor.java +++ /dev/null @@ -1,46 +0,0 @@ -package nu.marginalia.crawl; - - -import lombok.SneakyThrows; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.file.Files; -import java.nio.file.Path; - -public class AbortMonitor { - private volatile boolean abort = false; - private static volatile AbortMonitor instance = null; - private static final Logger logger = LoggerFactory.getLogger(AbortMonitor.class); - - public static AbortMonitor getInstance() { - if (instance == null) { - synchronized (AbortMonitor.class) { - if (instance == null) { - instance = new AbortMonitor(); - new Thread(instance::run, "AbortMon").start(); - } - } - } - return instance; - } - - private AbortMonitor() { - } - - @SneakyThrows - public void run() { - for (;;) { - Thread.sleep(1000); - if (Files.exists(Path.of("/tmp/stop"))) { - logger.warn("Abort file found"); - abort = true; - Files.delete(Path.of("/tmp/stop")); - } - } - } - - public boolean isAlive() { - return !abort; - } -} diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java index 83965a88..05e9e396 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java @@ -4,10 +4,13 @@ import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; +import com.zaxxer.hikari.HikariDataSource; +import lombok.Builder; import nu.marginalia.ProcessConfiguration; import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; +import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.atags.source.AnchorTagsSource; import nu.marginalia.atags.source.AnchorTagsSourceFactory; import nu.marginalia.crawl.fetcher.HttpFetcherImpl; @@ -16,9 +19,9 @@ import nu.marginalia.crawl.logic.DomainLocks; import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.DomainProber; -import nu.marginalia.crawl.spec.CrawlSpecProvider; import nu.marginalia.crawl.warc.WarcArchiverFactory; import nu.marginalia.crawl.warc.WarcArchiverIf; +import nu.marginalia.db.DomainBlacklist; import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.CrawlerOutputFile; import nu.marginalia.model.EdgeDomain; @@ -35,6 +38,7 @@ import nu.marginalia.storage.FileStorageService; import nu.marginalia.util.SimpleBlockingThreadPool; import okhttp3.ConnectionPool; import okhttp3.Dispatcher; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,10 +48,7 @@ import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.security.Security; import java.sql.SQLException; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -62,22 +63,28 @@ public class CrawlerMain extends ProcessMainClass { private final MessageQueueFactory messageQueueFactory; private final DomainProber domainProber; private final FileStorageService fileStorageService; - private final CrawlSpecProvider crawlSpecProvider; private final AnchorTagsSourceFactory anchorTagsSourceFactory; private final WarcArchiverFactory warcArchiverFactory; + private final HikariDataSource dataSource; + private final DomainBlacklist blacklist; private final Gson gson; private final int node; private final SimpleBlockingThreadPool pool; private final DomainLocks domainLocks = new DomainLocks(); - private final Map processingIds = new ConcurrentHashMap<>(); + private final Map pendingCrawlTasks = new ConcurrentHashMap<>(); - private final AbortMonitor abortMonitor = AbortMonitor.getInstance(); private final AtomicInteger tasksDone = new AtomicInteger(0); private final HttpFetcherImpl fetcher; - private volatile int totalTasks; + private int totalTasks = 1; + + private static final double URL_GROWTH_FACTOR = Double.parseDouble(System.getProperty("crawler.crawlSetGrowthFactor", "1.25")); + private static final int MIN_URLS_PER_DOMAIN = Integer.getInteger("crawler.minUrlsPerDomain", 100); + private static final int MID_URLS_PER_DOMAIN = Integer.getInteger("crawler.minUrlsPerDomain", 2_000); + private static final int MAX_URLS_PER_DOMAIN = Integer.getInteger("crawler.maxUrlsPerDomain", 10_000); + @Inject public CrawlerMain(UserAgent userAgent, @@ -85,18 +92,20 @@ public class CrawlerMain extends ProcessMainClass { MessageQueueFactory messageQueueFactory, DomainProber domainProber, FileStorageService fileStorageService, ProcessConfiguration processConfiguration, - CrawlSpecProvider crawlSpecProvider, AnchorTagsSourceFactory anchorTagsSourceFactory, WarcArchiverFactory warcArchiverFactory, - Gson gson) { + HikariDataSource dataSource, + DomainBlacklist blacklist, + Gson gson) throws InterruptedException { this.userAgent = userAgent; this.heartbeat = heartbeat; this.messageQueueFactory = messageQueueFactory; this.domainProber = domainProber; this.fileStorageService = fileStorageService; - this.crawlSpecProvider = crawlSpecProvider; this.anchorTagsSourceFactory = anchorTagsSourceFactory; this.warcArchiverFactory = warcArchiverFactory; + this.dataSource = dataSource; + this.blacklist = blacklist; this.gson = gson; this.node = processConfiguration.node(); @@ -108,15 +117,13 @@ public class CrawlerMain extends ProcessMainClass { new Dispatcher(), new ConnectionPool(5, 10, TimeUnit.SECONDS) ); + + // Wait for the blacklist to be loaded before starting the crawl + blacklist.waitUntilLoaded(); } public static void main(String... args) throws Exception { - if (!AbortMonitor.getInstance().isAlive()) { - System.err.println("Remove abort file first"); - return; - } - // Prevent Java from caching DNS lookups forever (filling up the system RAM as a result) Security.setProperty("networkaddress.cache.ttl" , "3600"); @@ -144,7 +151,7 @@ public class CrawlerMain extends ProcessMainClass { crawler.runForSingleDomain(instructions.targetDomainName, instructions.outputDir); } else { - crawler.run(instructions.outputDir); + crawler.runForDatabaseDomains(instructions.outputDir); } instructions.ok(); } catch (Exception ex) { @@ -160,34 +167,99 @@ public class CrawlerMain extends ProcessMainClass { System.exit(0); } - public void run(Path outputDir) throws Exception { + public void runForDatabaseDomains(Path outputDir) throws Exception { heartbeat.start(); + logger.info("Loading domains to be crawled"); + + final List crawlSpecRecords = new ArrayList<>(); + final List domainsToCrawl = new ArrayList<>(); + + // Assign any domains with node_affinity=0 to this node, and then fetch all domains assigned to this node + // to be crawled. + + try (var conn = dataSource.getConnection()) { + try (var assignFreeDomains = conn.prepareStatement( + """ + UPDATE EC_DOMAIN + SET NODE_AFFINITY=? + WHERE NODE_AFFINITY=0 + """)) + { + // Assign any domains with node_affinity=0 to this node. We must do this now, before we start crawling + // to avoid race conditions with other crawl runs. We don't want multiple crawlers to crawl the same domain. + assignFreeDomains.setInt(1, node); + assignFreeDomains.executeUpdate(); + } + + try (var query = conn.prepareStatement(""" + SELECT DOMAIN_NAME, COALESCE(VISITED_URLS, 0), EC_DOMAIN.ID + FROM EC_DOMAIN + LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID + WHERE NODE_AFFINITY=? + """)) { + // Fetch the domains to be crawled + query.setInt(1, node); + query.setFetchSize(10_000); + var rs = query.executeQuery(); + + while (rs.next()) { + // Skip blacklisted domains + int domainId = rs.getInt(3); + if (blacklist.isBlacklisted(domainId)) + continue; + + int existingUrls = rs.getInt(2); + String domainName = rs.getString(1); + + domainsToCrawl.add(new EdgeDomain(domainName)); + crawlSpecRecords.add(CrawlSpecRecord.growExistingDomain(domainName, existingUrls)); + totalTasks++; + } + } + } + + logger.info("Loaded {} domains", crawlSpecRecords.size()); + + // Shuffle the domains to ensure we get a good mix of domains in each crawl, + // so that e.g. the big domains don't get all crawled at once, or we end up + // crawling the same server in parallel from different subdomains... + Collections.shuffle(crawlSpecRecords); + // First a validation run to ensure the file is all good to parse - totalTasks = crawlSpecProvider.totalCount(); - if (totalTasks == 0) { + if (crawlSpecRecords.isEmpty()) { // This is an error state, and we should make noise about it throw new IllegalStateException("No crawl tasks found, refusing to continue"); } - logger.info("Queued {} crawl tasks, let's go", totalTasks); + else { + logger.info("Queued {} crawl tasks, let's go", crawlSpecRecords.size()); + } + // Set up the work log and the warc archiver so we can keep track of what we've done try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log")); WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir); - AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(crawlSpecProvider.getDomains()) + AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(domainsToCrawl) ) { // Set the number of tasks done to the number of tasks that are already finished, // (this happens when the process is restarted after a crash or a shutdown) tasksDone.set(workLog.countFinishedJobs()); - // Process the crawl tasks - try (var specStream = crawlSpecProvider.stream()) { - specStream - .takeWhile((e) -> abortMonitor.isAlive()) - .filter(e -> !workLog.isJobFinished(e.domain())) - .filter(e -> processingIds.put(e.domain(), "") == null) - .map(e -> new CrawlTask(e, anchorTagsSource, outputDir, warcArchiver, workLog)) - .forEach(pool::submitQuietly); + // Create crawl tasks and submit them to the pool for execution + for (CrawlSpecRecord crawlSpec : crawlSpecRecords) { + if (workLog.isJobFinished(crawlSpec.domain())) + continue; + + var task = new CrawlTask( + crawlSpec, + anchorTagsSource, + outputDir, + warcArchiver, + workLog); + + if (pendingCrawlTasks.putIfAbsent(crawlSpec.domain(), task) == null) { + pool.submitQuietly(task); + } } logger.info("Shutting down the pool, waiting for tasks to complete..."); @@ -222,7 +294,7 @@ public class CrawlerMain extends ProcessMainClass { WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir); AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(List.of(new EdgeDomain(targetDomainName))) ) { - var spec = new CrawlSpecProvider.CrawlSpecRecord(targetDomainName, 1000, List.of()); + var spec = new CrawlSpecRecord(targetDomainName, 1000, List.of()); var task = new CrawlTask(spec, anchorTagsSource, outputDir, warcArchiver, workLog); task.run(); } @@ -234,9 +306,9 @@ public class CrawlerMain extends ProcessMainClass { } } - class CrawlTask implements SimpleBlockingThreadPool.Task { + private class CrawlTask implements SimpleBlockingThreadPool.Task { - private final CrawlSpecProvider.CrawlSpecRecord specification; + private final CrawlSpecRecord specification; private final String domain; private final String id; @@ -246,7 +318,7 @@ public class CrawlerMain extends ProcessMainClass { private final WarcArchiverIf warcArchiver; private final WorkLog workLog; - CrawlTask(CrawlSpecProvider.CrawlSpecRecord specification, + CrawlTask(CrawlSpecRecord specification, AnchorTagsSource anchorTagsSource, Path outputDir, WarcArchiverIf warcArchiver, @@ -269,6 +341,8 @@ public class CrawlerMain extends ProcessMainClass { Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); Path parquetFile = CrawlerOutputFile.createParquetPath(outputDir, id, domain); + // Move the WARC file to a temp file if it exists, so we can resume the crawl using the old data + // while writing to the same file name as before if (Files.exists(newWarcFile)) { Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING); } @@ -276,31 +350,29 @@ public class CrawlerMain extends ProcessMainClass { Files.deleteIfExists(tempFile); } - var domainLock = domainLocks.getSemaphore(new EdgeDomain(specification.domain())); - try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder); - CrawlDataReference reference = getReference()) + CrawlDataReference reference = getReference(); + ) { - // acquire the domain lock to prevent other threads from crawling the same domain, - // we release it at the end of the task to let them go ahead - Thread.currentThread().setName("crawling:" + domain + " [await domain lock]"); - domainLock.acquire(); - Thread.currentThread().setName("crawling:" + domain); - - var domainLinks = anchorTagsSource.getAnchorTags(domain); - + // Resume the crawl if it was aborted if (Files.exists(tempFile)) { retriever.syncAbortedRun(tempFile); Files.delete(tempFile); } - int size = retriever.crawlDomain(domainLinks, reference); + DomainLinks domainLinks = anchorTagsSource.getAnchorTags(domain); + + int size; + try (var lock = domainLocks.lockDomain(new EdgeDomain(domain))) { + size = retriever.crawlDomain(domainLinks, reference); + } // Delete the reference crawl data if it's not the same as the new one // (mostly a case when migrating from legacy->warc) reference.delete(); + // Convert the WARC file to Parquet CrawledDocumentParquetRecordFileWriter .convertWarc(domain, userAgent, newWarcFile, parquetFile); @@ -308,7 +380,10 @@ public class CrawlerMain extends ProcessMainClass { // otherwise delete it: warcArchiver.consumeWarc(newWarcFile, domain); + // Mark the domain as finished in the work log workLog.setJobToFinished(domain, parquetFile.toString(), size); + + // Update the progress bar heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); logger.info("Fetched {}", domain); @@ -316,11 +391,8 @@ public class CrawlerMain extends ProcessMainClass { logger.error("Error fetching domain " + domain, e); } finally { - // release the domain lock to permit other threads to crawl subdomains of this domain - domainLock.release(); - // We don't need to double-count these; it's also kept int he workLog - processingIds.remove(domain); + pendingCrawlTasks.remove(domain); Thread.currentThread().setName("[idle]"); Files.deleteIfExists(newWarcFile); @@ -379,12 +451,11 @@ public class CrawlerMain extends ProcessMainClass { var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class); - - var crawlData = fileStorageService.getStorage(request.crawlStorage); + var crawlStorage = fileStorageService.getStorage(request.crawlStorage); return new CrawlRequest( request.targetDomainName, - crawlData.asPath(), + crawlStorage.asPath(), msg, inbox); } @@ -404,4 +475,25 @@ public class CrawlerMain extends ProcessMainClass { } } + @Builder + public record CrawlSpecRecord(@NotNull String domain, int crawlDepth, @NotNull List urls) { + + public CrawlSpecRecord(String domain, int crawlDepth) { + this(domain, crawlDepth, List.of()); + } + + public static CrawlSpecRecord growExistingDomain(String domain, int visitedUrls) { + // Calculate the number of URLs to fetch for this domain, based on the number of URLs + // already fetched, and a growth factor that gets a bonus for small domains + return new CrawlSpecRecord(domain, + (int) Math.clamp( + (visitedUrls * (visitedUrls < MID_URLS_PER_DOMAIN + ? Math.max(2.5, URL_GROWTH_FACTOR) + : URL_GROWTH_FACTOR) + ), + MIN_URLS_PER_DOMAIN, + MAX_URLS_PER_DOMAIN)); + } + + } } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/logic/DomainLocks.java b/code/processes/crawling-process/java/nu/marginalia/crawl/logic/DomainLocks.java index 856fa387..5466361f 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/logic/DomainLocks.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/logic/DomainLocks.java @@ -18,8 +18,9 @@ public class DomainLocks { /** Returns a lock object corresponding to the given domain. The object is returned as-is, * and may be held by another thread. The caller is responsible for locking and releasing the lock. */ - public Semaphore getSemaphore(EdgeDomain domain) { - return locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits); + public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException { + return new DomainLock(domain.toString(), + locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits)); } private Semaphore defaultPermits(String topDomain) { @@ -42,4 +43,24 @@ public class DomainLocks { return new Semaphore(2); } + + public static class DomainLock implements AutoCloseable { + private final String domainName; + private final Semaphore semaphore; + + DomainLock(String domainName, Semaphore semaphore) throws InterruptedException { + this.domainName = domainName; + this.semaphore = semaphore; + + Thread.currentThread().setName("crawling:" + domainName + " [await domain lock]"); + semaphore.acquire(); + Thread.currentThread().setName("crawling:" + domainName); + } + + @Override + public void close() throws Exception { + semaphore.release(); + Thread.currentThread().setName("crawling:" + domainName + " [wrapping up]"); + } + } } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java index b208867c..02900472 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java @@ -36,6 +36,10 @@ public class CrawlDataReference implements AutoCloseable { } } + /** Get the next document from the crawl data, + * returning null when there are no more documents + * available + */ @Nullable public CrawledDocument nextDocument() { try { @@ -52,7 +56,7 @@ public class CrawlDataReference implements AutoCloseable { return null; } - public boolean isContentBodySame(String one, String other) { + public static boolean isContentBodySame(String one, String other) { final long contentHashOne = contentHash(one); final long contentHashOther = contentHash(other); @@ -60,7 +64,7 @@ public class CrawlDataReference implements AutoCloseable { return EasyLSH.hammingDistance(contentHashOne, contentHashOther) < 4; } - private long contentHash(String content) { + private static long contentHash(String content) { EasyLSH hash = new EasyLSH(); int next = 0; @@ -83,8 +87,8 @@ public class CrawlDataReference implements AutoCloseable { return hash.get(); } - private final HashFunction hashFunction = Hashing.murmur3_128(); - private int hashInt(int v) { + private static final HashFunction hashFunction = Hashing.murmur3_128(); + private static int hashInt(int v) { return hashFunction.hashInt(v).asInt(); } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java index e204d9c9..c6b426b3 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java @@ -3,6 +3,7 @@ package nu.marginalia.crawl.retreival; import crawlercommons.robots.SimpleRobotRules; import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.contenttype.ContentType; +import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcherImpl; @@ -11,7 +12,6 @@ import nu.marginalia.crawl.logic.LinkFilterSelector; import nu.marginalia.crawl.retreival.revisit.CrawlerRevisitor; import nu.marginalia.crawl.retreival.revisit.DocumentWithReference; import nu.marginalia.crawl.retreival.sitemap.SitemapFetcher; -import nu.marginalia.crawl.spec.CrawlSpecProvider; import nu.marginalia.ip_blocklist.UrlBlocklist; import nu.marginalia.link_parser.LinkParser; import nu.marginalia.model.EdgeDomain; @@ -54,7 +54,7 @@ public class CrawlerRetreiver implements AutoCloseable { public CrawlerRetreiver(HttpFetcher fetcher, DomainProber domainProber, - CrawlSpecProvider.CrawlSpecRecord specs, + CrawlerMain.CrawlSpecRecord specs, WarcRecorder warcRecorder) { this.warcRecorder = warcRecorder; @@ -117,9 +117,7 @@ public class CrawlerRetreiver implements AutoCloseable { sniffRootDocument(rootUrl, delayTimer); // Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified - int fetchedCount = crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer); - - if (fetchedCount > 0) { + if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) { // If we have reference data, we will always grow the crawl depth a bit crawlFrontier.increaseDepth(1.5, 2500); } @@ -162,9 +160,7 @@ public class CrawlerRetreiver implements AutoCloseable { continue; try { - if (fetchContentWithReference(top, delayTimer, DocumentWithReference.empty()).isOk()) { - fetchedCount++; - } + fetchContentWithReference(top, delayTimer, DocumentWithReference.empty()); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); @@ -172,7 +168,7 @@ public class CrawlerRetreiver implements AutoCloseable { } } - return fetchedCount; + return crawlFrontier.visitedSize(); } public void syncAbortedRun(Path warcFile) { diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java index 83ac1a66..9b9964b3 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java @@ -165,7 +165,7 @@ public class DomainCrawlFrontier { public int queueSize() { return queue.size(); } - + public int visitedSize() { return visited.size(); } public void enqueueLinksFromDocument(EdgeUrl baseUrl, Document parsed) { baseUrl = linkParser.getBaseLink(parsed, baseUrl); diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java index 1f1854b5..c24ef754 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java @@ -42,7 +42,7 @@ public record DocumentWithReference( return false; } - return reference.isContentBodySame(doc.documentBody, bodyOk.body()); + return CrawlDataReference.isContentBodySame(doc.documentBody, bodyOk.body()); } public ContentTags getContentTags() { diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java b/code/processes/crawling-process/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java deleted file mode 100644 index 16dca3bc..00000000 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java +++ /dev/null @@ -1,137 +0,0 @@ -package nu.marginalia.crawl.spec; - -import com.google.inject.Inject; -import com.zaxxer.hikari.HikariDataSource; -import lombok.Builder; -import lombok.SneakyThrows; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.db.DomainBlacklist; -import nu.marginalia.model.EdgeDomain; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.stream.Stream; - -// FIXME: This design is a vestige from when there were multiple sources of crawl data. It should be simplified and probably merged with CrawlerMain. -public class CrawlSpecProvider { - private final HikariDataSource dataSource; - private final ProcessConfiguration processConfiguration; - private final DomainBlacklist blacklist; - - private List domains; - - private static final Logger logger = LoggerFactory.getLogger(CrawlSpecProvider.class); - - private static final double URL_GROWTH_FACTOR = Double.parseDouble(System.getProperty("crawler.crawlSetGrowthFactor", "1.25")); - private static final int MIN_URLS_PER_DOMAIN = Integer.getInteger("crawler.minUrlsPerDomain", 100); - private static final int MID_URLS_PER_DOMAIN = Integer.getInteger("crawler.minUrlsPerDomain", 2_000); - private static final int MAX_URLS_PER_DOMAIN = Integer.getInteger("crawler.maxUrlsPerDomain", 10_000); - - @Inject - public CrawlSpecProvider(HikariDataSource dataSource, - ProcessConfiguration processConfiguration, - DomainBlacklist blacklist - ) { - this.dataSource = dataSource; - this.processConfiguration = processConfiguration; - this.blacklist = blacklist; - } - - // Load the domains into memory to ensure the crawler is resilient to database blips - private List loadData() throws Exception { - var domains = new ArrayList(); - - logger.info("Loading domains to be crawled"); - - blacklist.waitUntilLoaded(); - - try (var conn = dataSource.getConnection(); - var assignFreeDomains = conn.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY=? WHERE NODE_AFFINITY=0"); - var query = conn.prepareStatement(""" - SELECT DOMAIN_NAME, COALESCE(VISITED_URLS, 0), EC_DOMAIN.ID - FROM EC_DOMAIN - LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID - WHERE NODE_AFFINITY=? - """) - ) - { - - // Assign any domains with node_affinity=0 to this node. We must do this now, before we start crawling - // to avoid race conditions with other crawl runs. We don't want multiple crawlers to crawl the same domain. - assignFreeDomains.setInt(1, processConfiguration.node()); - assignFreeDomains.executeUpdate(); - - // Fetch the domains to be crawled - query.setInt(1, processConfiguration.node()); - query.setFetchSize(10_000); - var rs = query.executeQuery(); - - while (rs.next()) { - // Skip blacklisted domains - int id = rs.getInt(3); - if (blacklist.isBlacklisted(id)) - continue; - - int urls = rs.getInt(2); - - double growthFactor = urls < MID_URLS_PER_DOMAIN - ? Math.max(2.5, URL_GROWTH_FACTOR) - : URL_GROWTH_FACTOR; - - int urlsToFetch = Math.clamp((int) (growthFactor * rs.getInt(2)), MIN_URLS_PER_DOMAIN, MAX_URLS_PER_DOMAIN); - - var record = new CrawlSpecRecord( - rs.getString(1), - urlsToFetch, - List.of() - ); - - domains.add(record); - } - - } - - logger.info("Loaded {} domains", domains.size()); - - // Shuffle the domains to ensure we get a good mix of domains in each crawl, - // so that e.g. the big domains don't get all crawled at once, or we end up - // crawling the same server in parallel from different subdomains... - Collections.shuffle(domains); - - return domains; - } - - public List getDomains() { - return stream().map(CrawlSpecRecord::domain).map(EdgeDomain::new).toList(); - } - - public int totalCount() throws Exception { - if (domains == null) { - domains = loadData(); - } - return domains.size(); - } - - @SneakyThrows - public Stream stream() { - if (domains == null) { - domains = loadData(); - } - - return domains.stream(); - } - - - @Builder - public record CrawlSpecRecord(@NotNull String domain, - int crawlDepth, - @NotNull List urls) { - public CrawlSpecRecord(String domain, int crawlDepth) { - this(domain, crawlDepth, List.of()); - } - } -} diff --git a/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java b/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java index 9b6312e6..a153eb03 100644 --- a/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java +++ b/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java @@ -2,6 +2,7 @@ package nu.marginalia.crawling.retreival; import crawlercommons.robots.SimpleRobotRules; import lombok.SneakyThrows; +import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcherImpl; @@ -9,7 +10,6 @@ import nu.marginalia.crawl.fetcher.SitemapRetriever; import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.DomainProber; -import nu.marginalia.crawl.spec.CrawlSpecProvider; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.body.HttpFetchResult; @@ -68,7 +68,7 @@ public class CrawlerMockFetcherTest { } - void crawl(CrawlSpecProvider.CrawlSpecRecord spec) throws IOException { + void crawl(CrawlerMain.CrawlSpecRecord spec) throws IOException { try (var recorder = new WarcRecorder()) { new CrawlerRetreiver(fetcherMock, new DomainProber(d -> true), spec, recorder) .crawlDomain(); @@ -83,7 +83,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://startrek.website/c/startrek"), "mock-crawl-data/lemmy/c_startrek.html"); registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html"); - crawl(new CrawlSpecProvider.CrawlSpecRecord("startrek.website", 10, new ArrayList<>())); + crawl(new CrawlerMain.CrawlSpecRecord("startrek.website", 10, new ArrayList<>())); } @Test @@ -92,7 +92,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html"); - crawl(new CrawlSpecProvider.CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>())); + crawl(new CrawlerMain.CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>())); } @Test @@ -103,7 +103,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/telegram-channel-to-idle-on/3501"), "mock-crawl-data/discourse/telegram.html"); registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html"); - crawl(new CrawlSpecProvider.CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>())); + crawl(new CrawlerMain.CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>())); } class MockFetcher implements HttpFetcher { diff --git a/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java b/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java index d82976f2..a4493bd1 100644 --- a/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java +++ b/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java @@ -4,11 +4,11 @@ import lombok.SneakyThrows; import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; import nu.marginalia.atags.model.DomainLinks; +import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.retreival.*; -import nu.marginalia.crawl.spec.CrawlSpecProvider; import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.model.EdgeDomain; @@ -76,7 +76,7 @@ class CrawlerRetreiverTest { @Test public void testWarcOutput() throws IOException { - var specs = CrawlSpecProvider.CrawlSpecRecord + var specs = CrawlerMain.CrawlSpecRecord .builder() .crawlDepth(5) .domain("www.marginalia.nu") @@ -118,7 +118,7 @@ class CrawlerRetreiverTest { @Test public void testWarcOutputNoKnownUrls() throws IOException { - var specs = CrawlSpecProvider.CrawlSpecRecord + var specs = CrawlerMain.CrawlSpecRecord .builder() .crawlDepth(5) .domain("www.marginalia.nu") @@ -161,7 +161,7 @@ class CrawlerRetreiverTest { @SneakyThrows @Test public void testResync() throws IOException { - var specs = CrawlSpecProvider.CrawlSpecRecord + var specs = CrawlerMain.CrawlSpecRecord .builder() .crawlDepth(5) .domain("www.marginalia.nu") @@ -210,7 +210,7 @@ class CrawlerRetreiverTest { @Test public void testWithKnownDomains() throws IOException { - var specs = CrawlSpecProvider.CrawlSpecRecord + var specs = CrawlerMain.CrawlSpecRecord .builder() .crawlDepth(5) .domain("www.marginalia.nu") @@ -254,7 +254,7 @@ class CrawlerRetreiverTest { @Test public void testRedirect() throws IOException, URISyntaxException { - var specs = CrawlSpecProvider.CrawlSpecRecord + var specs = CrawlerMain.CrawlSpecRecord .builder() .crawlDepth(3) .domain("www.marginalia.nu") @@ -312,7 +312,7 @@ class CrawlerRetreiverTest { @Test public void testEmptySet() throws IOException { - var specs = CrawlSpecProvider.CrawlSpecRecord + var specs = CrawlerMain.CrawlSpecRecord .builder() .crawlDepth(5) .domain("www.marginalia.nu") @@ -360,7 +360,7 @@ class CrawlerRetreiverTest { @Test public void testRecrawl() throws IOException { - var specs = CrawlSpecProvider.CrawlSpecRecord + var specs = CrawlerMain.CrawlSpecRecord .builder() .crawlDepth(12) .domain("www.marginalia.nu") @@ -420,7 +420,7 @@ class CrawlerRetreiverTest { @Test public void testRecrawlWithResync() throws IOException { - var specs = CrawlSpecProvider.CrawlSpecRecord + var specs = CrawlerMain.CrawlSpecRecord .builder() .crawlDepth(12) .domain("www.marginalia.nu") @@ -508,7 +508,7 @@ class CrawlerRetreiverTest { } } - private void doCrawlWithReferenceStream(CrawlSpecProvider.CrawlSpecRecord specs, SerializableCrawlDataStream stream) { + private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, SerializableCrawlDataStream stream) { try (var recorder = new WarcRecorder(tempFileWarc2)) { new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).crawlDomain(new DomainLinks(), new CrawlDataReference(stream)); @@ -519,7 +519,7 @@ class CrawlerRetreiverTest { } @NotNull - private DomainCrawlFrontier doCrawl(Path tempFileWarc1, CrawlSpecProvider.CrawlSpecRecord specs) { + private DomainCrawlFrontier doCrawl(Path tempFileWarc1, CrawlerMain.CrawlSpecRecord specs) { try (var recorder = new WarcRecorder(tempFileWarc1)) { var crawler = new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder); crawler.crawlDomain();