From ebd10a5f285bb6ecdab79a6f520c8e3ddac7e068 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 6 Nov 2023 16:14:58 +0100 Subject: [PATCH] (crawler) Integrate atags.parquet with the crawler so that "important" URLs are prioritized --- .../atags/source/AnchorTagsSource.java | 4 + .../atags/source/AnchorTagsSourceFactory.java | 28 +++--- .../ConvertingIntegrationTestModule.java | 5 +- code/processes/crawling-process/build.gradle | 2 + .../java/nu/marginalia/crawl/CrawlerMain.java | 27 ++++-- .../crawl/retreival/CrawlerRetreiver.java | 97 +++++++++---------- .../crawl/retreival/DomainProber.java | 22 ++++- .../crawl/retreival/fetcher/FetchResult.java | 8 ++ .../retreival/fetcher/HttpFetcherImpl.java | 12 +-- .../crawl/spec/CrawlSpecProvider.java | 6 ++ .../retreival/CrawlerMockFetcherTest.java | 2 +- .../retreival/CrawlerRetreiverTest.java | 3 +- 12 files changed, 133 insertions(+), 83 deletions(-) diff --git a/code/features-convert/anchor-keywords/src/main/java/nu/marginalia/atags/source/AnchorTagsSource.java b/code/features-convert/anchor-keywords/src/main/java/nu/marginalia/atags/source/AnchorTagsSource.java index d0dbb9e6..662d573c 100644 --- a/code/features-convert/anchor-keywords/src/main/java/nu/marginalia/atags/source/AnchorTagsSource.java +++ b/code/features-convert/anchor-keywords/src/main/java/nu/marginalia/atags/source/AnchorTagsSource.java @@ -6,5 +6,9 @@ import nu.marginalia.model.EdgeDomain; public interface AnchorTagsSource extends AutoCloseable { DomainLinks getAnchorTags(EdgeDomain domain); + default DomainLinks getAnchorTags(String domain) { + return getAnchorTags(new EdgeDomain(domain)); + } + default void close() throws Exception {} } diff --git a/code/features-convert/anchor-keywords/src/main/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java b/code/features-convert/anchor-keywords/src/main/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java index 0b8596bd..4f968bba 100644 --- a/code/features-convert/anchor-keywords/src/main/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java +++ b/code/features-convert/anchor-keywords/src/main/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java @@ -30,25 +30,29 @@ public class AnchorTagsSourceFactory { } public AnchorTagsSource create() throws SQLException { - if (!Files.exists(atagsPath)) - return dummy(); - - List relevantDomains = getRelevantDomains(); - - if (relevantDomains.isEmpty()) - return dummy(); - - return new AnchorTagsImpl(atagsPath, relevantDomains); + return create(getRelevantDomainsByNodeAffinity()); } - private AnchorTagsSource dummy() { - return x -> new DomainLinks(); + public AnchorTagsSource create(List relevantDomains) throws SQLException { + if (!Files.exists(atagsPath)) { + logger.info("Omitting anchor tag data because '{}' does not exist, or is not reachable from the crawler process", atagsPath); + + return domain -> new DomainLinks(); + } + + if (relevantDomains.isEmpty()) { + logger.info("Omitting anchor tag data because no relevant domains were provided"); + + return domain -> new DomainLinks(); + } + + return new AnchorTagsImpl(atagsPath, relevantDomains); } // Only get domains that are assigned to this node. This reduces the amount of data // that needs to be loaded into the duckdb instance to a more manageable level, and keeps // the memory footprint of the service down. - private List getRelevantDomains() { + private List getRelevantDomainsByNodeAffinity() { try (var conn = dataSource.getConnection(); var stmt = conn.prepareStatement(""" SELECT DOMAIN_NAME diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTestModule.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTestModule.java index b23c5cc1..3d171a53 100644 --- a/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTestModule.java +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTestModule.java @@ -3,6 +3,7 @@ package nu.marginalia.converting; import com.google.inject.AbstractModule; import com.google.inject.name.Names; import nu.marginalia.LanguageModels; +import nu.marginalia.ProcessConfiguration; import nu.marginalia.WmsaHome; import nu.marginalia.converting.processor.ConverterDomainTypes; import nu.marginalia.service.module.ServiceConfiguration; @@ -17,7 +18,9 @@ public class ConvertingIntegrationTestModule extends AbstractModule { bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration( null, 1, "localhost", 0, 0, null )); - + bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration( + "converting-process", 1, null + )); bind(LanguageModels.class).toInstance(WmsaHome.getLanguageModels()); bind(ConverterDomainTypes.class).toInstance(Mockito.mock(ConverterDomainTypes.class)); } diff --git a/code/processes/crawling-process/build.gradle b/code/processes/crawling-process/build.gradle index 8f8dd951..00f0f01b 100644 --- a/code/processes/crawling-process/build.gradle +++ b/code/processes/crawling-process/build.gradle @@ -37,6 +37,8 @@ dependencies { implementation project(':code:process-models:crawling-model') implementation project(':code:process-models:crawl-spec') + + implementation project(':code:features-convert:anchor-keywords') implementation project(':code:features-crawl:crawl-blocklist') implementation project(':code:features-crawl:link-parser') diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index 99829868..bff49aaa 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -8,6 +8,8 @@ import nu.marginalia.ProcessConfiguration; import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; +import nu.marginalia.atags.source.AnchorTagsSource; +import nu.marginalia.atags.source.AnchorTagsSourceFactory; import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.spec.CrawlSpecProvider; @@ -56,6 +58,7 @@ public class CrawlerMain { private final MessageQueueFactory messageQueueFactory; private final FileStorageService fileStorageService; private final DbCrawlSpecProvider dbCrawlSpecProvider; + private final AnchorTagsSourceFactory anchorTagsSourceFactory; private final Gson gson; private final int node; private final SimpleBlockingThreadPool pool; @@ -76,12 +79,14 @@ public class CrawlerMain { FileStorageService fileStorageService, ProcessConfiguration processConfiguration, DbCrawlSpecProvider dbCrawlSpecProvider, + AnchorTagsSourceFactory anchorTagsSourceFactory, Gson gson) { this.heartbeat = heartbeat; this.userAgent = userAgent; this.messageQueueFactory = messageQueueFactory; this.fileStorageService = fileStorageService; this.dbCrawlSpecProvider = dbCrawlSpecProvider; + this.anchorTagsSourceFactory = anchorTagsSourceFactory; this.gson = gson; this.node = processConfiguration.node(); @@ -131,7 +136,10 @@ public class CrawlerMain { public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException { heartbeat.start(); - try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"))) { + try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log")); + AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains()) + ) { + // First a validation run to ensure the file is all good to parse logger.info("Validating JSON"); @@ -144,7 +152,7 @@ public class CrawlerMain { .takeWhile((e) -> abortMonitor.isAlive()) .filter(e -> !workLog.isJobFinished(e.domain)) .filter(e -> processingIds.put(e.domain, "") == null) - .map(e -> new CrawlTask(e, outputDir, workLog)) + .map(e -> new CrawlTask(e, anchorTagsSource, outputDir, workLog)) .forEach(pool::submitQuietly); } @@ -178,13 +186,16 @@ public class CrawlerMain { private final String domain; private final String id; + private final AnchorTagsSource anchorTagsSource; private final Path outputDir; private final WorkLog workLog; CrawlTask(CrawlSpecRecord specification, + AnchorTagsSource anchorTagsSource, Path outputDir, WorkLog workLog) { this.specification = specification; + this.anchorTagsSource = anchorTagsSource; this.outputDir = outputDir; this.workLog = workLog; @@ -202,18 +213,20 @@ public class CrawlerMain { try (CrawledDomainWriter writer = new CrawledDomainWriter(outputDir, domain, id); CrawlDataReference reference = getReference()) { - Thread.currentThread().setName("crawling:" + specification.domain); + Thread.currentThread().setName("crawling:" + domain); + + var domainLinks = anchorTagsSource.getAnchorTags(domain); var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept); - int size = retreiver.fetch(reference); + int size = retreiver.fetch(domainLinks, reference); - workLog.setJobToFinished(specification.domain, writer.getOutputFile().toString(), size); + workLog.setJobToFinished(domain, writer.getOutputFile().toString(), size); heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); - logger.info("Fetched {}", specification.domain); + logger.info("Fetched {}", domain); } catch (Exception e) { - logger.error("Error fetching domain " + specification.domain, e); + logger.error("Error fetching domain " + domain, e); } finally { // We don't need to double-count these; it's also kept int he workLog diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java index 04cd43bc..ce5ecb89 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java @@ -4,6 +4,7 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import crawlercommons.robots.SimpleRobotRules; import lombok.SneakyThrows; +import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.crawl.retreival.fetcher.ContentTags; import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever; @@ -81,49 +82,41 @@ public class CrawlerRetreiver { } public int fetch() { - return fetch(new CrawlDataReference()); + return fetch(new DomainLinks(), new CrawlDataReference()); } - public int fetch(CrawlDataReference oldCrawlData) { + public int fetch(DomainLinks domainLinks, CrawlDataReference oldCrawlData) { final DomainProber.ProbeResult probeResult = domainProber.probeDomain(fetcher, domain, crawlFrontier.peek()); - if (probeResult instanceof DomainProber.ProbeResultOk) { - return crawlDomain(oldCrawlData); - } + return switch (probeResult) { + case DomainProber.ProbeResultOk(EdgeUrl probedUrl) -> crawlDomain(oldCrawlData, probedUrl, domainLinks); + case DomainProber.ProbeResultError(CrawlerDomainStatus status, String desc) -> { + crawledDomainWriter.accept( + CrawledDomain.builder() + .crawlerStatus(status.name()) + .crawlerStatusDesc(desc) + .domain(domain) + .ip(findIp(domain)) + .build() + ); + yield 1; + } + case DomainProber.ProbeResultRedirect(EdgeDomain redirectDomain) -> { + crawledDomainWriter.accept( + CrawledDomain.builder() + .crawlerStatus(CrawlerDomainStatus.REDIRECT.name()) + .crawlerStatusDesc("Redirected to different domain") + .redirectDomain(redirectDomain.toString()) + .domain(domain) + .ip(findIp(domain)) + .build() + ); + yield 1; + } + }; + } - // handle error cases for probe - - var ip = findIp(domain); - - if (probeResult instanceof DomainProber.ProbeResultError err) { - crawledDomainWriter.accept( - CrawledDomain.builder() - .crawlerStatus(err.status().name()) - .crawlerStatusDesc(err.desc()) - .domain(domain) - .ip(ip) - .build() - ); - return 1; - } - - if (probeResult instanceof DomainProber.ProbeResultRedirect redirect) { - crawledDomainWriter.accept( - CrawledDomain.builder() - .crawlerStatus(CrawlerDomainStatus.REDIRECT.name()) - .crawlerStatusDesc("Redirected to different domain") - .redirectDomain(redirect.domain().toString()) - .domain(domain) - .ip(ip) - .build() - ); - return 1; - } - - throw new IllegalStateException("Unknown probe result: " + probeResult); - }; - - private int crawlDomain(CrawlDataReference oldCrawlData) { + private int crawlDomain(CrawlDataReference oldCrawlData, EdgeUrl rootUrl, DomainLinks domainLinks) { String ip = findIp(domain); assert !crawlFrontier.isEmpty(); @@ -131,7 +124,7 @@ public class CrawlerRetreiver { final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain); final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay()); - sniffRootDocument(delayTimer); + sniffRootDocument(delayTimer, rootUrl); // Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified int recrawled = recrawl(oldCrawlData, robotsRules, delayTimer); @@ -141,7 +134,11 @@ public class CrawlerRetreiver { crawlFrontier.increaseDepth(1.5); } - downloadSitemaps(robotsRules); + // Add external links to the crawl frontier + crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto)); + + // Add links from the sitemap to the crawl frontier + downloadSitemaps(robotsRules, rootUrl); CrawledDomain ret = new CrawledDomain(domain, null, CrawlerDomainStatus.OK.name(), null, ip, new ArrayList<>(), null); @@ -259,17 +256,17 @@ public class CrawlerRetreiver { return recrawled; } - private void downloadSitemaps(SimpleRobotRules robotsRules) { + private void downloadSitemaps(SimpleRobotRules robotsRules, EdgeUrl rootUrl) { List sitemaps = robotsRules.getSitemaps(); - if (sitemaps.isEmpty()) { - sitemaps = List.of( - "http://" + domain + "/sitemap.xml", - "https://" + domain + "/sitemap.xml"); - } List urls = new ArrayList<>(sitemaps.size()); - for (var url : sitemaps) { - EdgeUrl.parse(url).ifPresent(urls::add); + if (!sitemaps.isEmpty()) { + for (var url : sitemaps) { + EdgeUrl.parse(url).ifPresent(urls::add); + } + } + else { + urls.add(rootUrl.withPathAndParam("/sitemap.xml", null)); } downloadSitemaps(urls); @@ -305,11 +302,11 @@ public class CrawlerRetreiver { logger.debug("Queue is now {}", crawlFrontier.queueSize()); } - private void sniffRootDocument(CrawlDelayTimer delayTimer) { + private void sniffRootDocument(CrawlDelayTimer delayTimer, EdgeUrl rootUrl) { try { logger.debug("Configuring link filter"); - var url = crawlFrontier.peek().withPathAndParam("/", null); + var url = rootUrl.withPathAndParam("/", null); var maybeSample = fetchUrl(url, delayTimer, DocumentWithReference.empty()).filter(sample -> sample.httpStatus == 200); if (maybeSample.isEmpty()) diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainProber.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainProber.java index 4b1c6413..67f006d4 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainProber.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainProber.java @@ -43,7 +43,7 @@ public class DomainProber { var fetchResult = fetcher.probeDomain(firstUrlInQueue.withPathAndParam("/", null)); if (fetchResult.ok()) - return new ProbeResultOk(); + return new ProbeResultOk(fetchResult.url); if (fetchResult.state == FetchResultState.REDIRECT) return new ProbeResultRedirect(fetchResult.domain); @@ -51,9 +51,21 @@ public class DomainProber { return new ProbeResultError(CrawlerDomainStatus.ERROR, "Bad status"); } - interface ProbeResult {}; + public sealed interface ProbeResult permits ProbeResultError, ProbeResultRedirect, ProbeResultOk {}; - record ProbeResultError(CrawlerDomainStatus status, String desc) implements ProbeResult {} - record ProbeResultRedirect(EdgeDomain domain) implements ProbeResult {} - record ProbeResultOk() implements ProbeResult {} + /** The probing failed for one reason or another + * @param status Machine readable status + * @param desc Human-readable description of the error + */ + public record ProbeResultError(CrawlerDomainStatus status, String desc) implements ProbeResult {} + + /** This domain redirects to another domain */ + public record ProbeResultRedirect(EdgeDomain domain) implements ProbeResult {} + + /** If the retreivala of the probed url was successful, return the url as it was fetched + * (which may be different from the url we probed, if we attempted another URL schema). + * + * @param probedUrl The url we successfully probed + */ + public record ProbeResultOk(EdgeUrl probedUrl) implements ProbeResult {} } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/FetchResult.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/FetchResult.java index 40b6d1a8..ee4c734f 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/FetchResult.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/FetchResult.java @@ -3,13 +3,21 @@ package nu.marginalia.crawl.retreival.fetcher; import lombok.AllArgsConstructor; import lombok.ToString; import nu.marginalia.model.EdgeDomain; +import nu.marginalia.model.EdgeUrl; @AllArgsConstructor @ToString public class FetchResult { public final FetchResultState state; + public final EdgeUrl url; public final EdgeDomain domain; + public FetchResult(FetchResultState state, EdgeUrl url) { + this.state = state; + this.url = url; + this.domain = url.domain; + } + public boolean ok() { return state == FetchResultState.OK; } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java index b0b0fd9d..041ae08d 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java @@ -15,6 +15,7 @@ import nu.marginalia.model.EdgeUrl; import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; import nu.marginalia.crawl.retreival.logic.ContentTypeParser; import okhttp3.*; +import org.apache.commons.collections4.queue.PredicatedQueue; import org.apache.commons.io.input.BOMInputStream; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -106,13 +107,12 @@ public class HttpFetcherImpl implements HttpFetcher { var call = client.newCall(head); try (var rsp = call.execute()) { - var requestUrl = rsp.request().url().toString(); - EdgeDomain requestDomain = new EdgeUrl(requestUrl).domain; + EdgeUrl requestUrl = new EdgeUrl(rsp.request().url().toString()); - if (!Objects.equals(requestDomain, url.domain)) { - return new FetchResult(FetchResultState.REDIRECT, requestDomain); + if (!Objects.equals(requestUrl.domain, url.domain)) { + return new FetchResult(FetchResultState.REDIRECT, requestUrl); } - return new FetchResult(FetchResultState.OK, requestDomain); + return new FetchResult(FetchResultState.OK, requestUrl); } catch (Exception ex) { @@ -121,7 +121,7 @@ public class HttpFetcherImpl implements HttpFetcher { } logger.info("Error during fetching", ex); - return new FetchResult(FetchResultState.ERROR, url.domain); + return new FetchResult(FetchResultState.ERROR, url); } } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java index 22177d6f..240b4232 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java @@ -1,10 +1,16 @@ package nu.marginalia.crawl.spec; +import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.crawlspec.CrawlSpecRecord; +import java.util.List; import java.util.stream.Stream; public interface CrawlSpecProvider { int totalCount() throws Exception; Stream stream(); + + default List getDomains() { + return stream().map(CrawlSpecRecord::getDomain).map(EdgeDomain::new).toList(); + } } diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java index 12bdc0f7..c0df397f 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java @@ -114,7 +114,7 @@ public class CrawlerMockFetcherTest { @Override public FetchResult probeDomain(EdgeUrl url) { logger.info("Probing {}", url); - return new FetchResult(FetchResultState.OK, url.domain); + return new FetchResult(FetchResultState.OK, url); } @Override diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java index d6fa3261..147aca68 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java @@ -2,6 +2,7 @@ package nu.marginalia.crawling.retreival; import lombok.SneakyThrows; import nu.marginalia.WmsaHome; +import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; @@ -139,7 +140,7 @@ class CrawlerRetreiverTest { if (d instanceof CrawledDocument doc) { System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus); } - }).fetch(new CrawlDataReference(stream)); + }).fetch(new DomainLinks(), new CrawlDataReference(stream)); } } \ No newline at end of file