(crawler) Integrate atags.parquet with the crawler so that "important" URLs are prioritized

This commit is contained in:
Viktor Lofgren 2023-11-06 16:14:58 +01:00
parent 2b77184281
commit ebd10a5f28
12 changed files with 133 additions and 83 deletions

View File

@ -6,5 +6,9 @@ import nu.marginalia.model.EdgeDomain;
public interface AnchorTagsSource extends AutoCloseable { public interface AnchorTagsSource extends AutoCloseable {
DomainLinks getAnchorTags(EdgeDomain domain); DomainLinks getAnchorTags(EdgeDomain domain);
default DomainLinks getAnchorTags(String domain) {
return getAnchorTags(new EdgeDomain(domain));
}
default void close() throws Exception {} default void close() throws Exception {}
} }

View File

@ -30,25 +30,29 @@ public class AnchorTagsSourceFactory {
} }
public AnchorTagsSource create() throws SQLException { public AnchorTagsSource create() throws SQLException {
if (!Files.exists(atagsPath)) return create(getRelevantDomainsByNodeAffinity());
return dummy();
List<EdgeDomain> relevantDomains = getRelevantDomains();
if (relevantDomains.isEmpty())
return dummy();
return new AnchorTagsImpl(atagsPath, relevantDomains);
} }
private AnchorTagsSource dummy() { public AnchorTagsSource create(List<EdgeDomain> relevantDomains) throws SQLException {
return x -> new DomainLinks(); if (!Files.exists(atagsPath)) {
logger.info("Omitting anchor tag data because '{}' does not exist, or is not reachable from the crawler process", atagsPath);
return domain -> new DomainLinks();
}
if (relevantDomains.isEmpty()) {
logger.info("Omitting anchor tag data because no relevant domains were provided");
return domain -> new DomainLinks();
}
return new AnchorTagsImpl(atagsPath, relevantDomains);
} }
// Only get domains that are assigned to this node. This reduces the amount of data // Only get domains that are assigned to this node. This reduces the amount of data
// that needs to be loaded into the duckdb instance to a more manageable level, and keeps // that needs to be loaded into the duckdb instance to a more manageable level, and keeps
// the memory footprint of the service down. // the memory footprint of the service down.
private List<EdgeDomain> getRelevantDomains() { private List<EdgeDomain> getRelevantDomainsByNodeAffinity() {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement(""" var stmt = conn.prepareStatement("""
SELECT DOMAIN_NAME SELECT DOMAIN_NAME

View File

@ -3,6 +3,7 @@ package nu.marginalia.converting;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.name.Names; import com.google.inject.name.Names;
import nu.marginalia.LanguageModels; import nu.marginalia.LanguageModels;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.converting.processor.ConverterDomainTypes; import nu.marginalia.converting.processor.ConverterDomainTypes;
import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.service.module.ServiceConfiguration;
@ -17,7 +18,9 @@ public class ConvertingIntegrationTestModule extends AbstractModule {
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration( bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(
null, 1, "localhost", 0, 0, null null, 1, "localhost", 0, 0, null
)); ));
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration(
"converting-process", 1, null
));
bind(LanguageModels.class).toInstance(WmsaHome.getLanguageModels()); bind(LanguageModels.class).toInstance(WmsaHome.getLanguageModels());
bind(ConverterDomainTypes.class).toInstance(Mockito.mock(ConverterDomainTypes.class)); bind(ConverterDomainTypes.class).toInstance(Mockito.mock(ConverterDomainTypes.class));
} }

View File

@ -37,6 +37,8 @@ dependencies {
implementation project(':code:process-models:crawling-model') implementation project(':code:process-models:crawling-model')
implementation project(':code:process-models:crawl-spec') implementation project(':code:process-models:crawl-spec')
implementation project(':code:features-convert:anchor-keywords')
implementation project(':code:features-crawl:crawl-blocklist') implementation project(':code:features-crawl:crawl-blocklist')
implementation project(':code:features-crawl:link-parser') implementation project(':code:features-crawl:link-parser')

View File

@ -8,6 +8,8 @@ import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.UserAgent; import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.atags.source.AnchorTagsSource;
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.spec.CrawlSpecProvider; import nu.marginalia.crawl.spec.CrawlSpecProvider;
@ -56,6 +58,7 @@ public class CrawlerMain {
private final MessageQueueFactory messageQueueFactory; private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService; private final FileStorageService fileStorageService;
private final DbCrawlSpecProvider dbCrawlSpecProvider; private final DbCrawlSpecProvider dbCrawlSpecProvider;
private final AnchorTagsSourceFactory anchorTagsSourceFactory;
private final Gson gson; private final Gson gson;
private final int node; private final int node;
private final SimpleBlockingThreadPool pool; private final SimpleBlockingThreadPool pool;
@ -76,12 +79,14 @@ public class CrawlerMain {
FileStorageService fileStorageService, FileStorageService fileStorageService,
ProcessConfiguration processConfiguration, ProcessConfiguration processConfiguration,
DbCrawlSpecProvider dbCrawlSpecProvider, DbCrawlSpecProvider dbCrawlSpecProvider,
AnchorTagsSourceFactory anchorTagsSourceFactory,
Gson gson) { Gson gson) {
this.heartbeat = heartbeat; this.heartbeat = heartbeat;
this.userAgent = userAgent; this.userAgent = userAgent;
this.messageQueueFactory = messageQueueFactory; this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService; this.fileStorageService = fileStorageService;
this.dbCrawlSpecProvider = dbCrawlSpecProvider; this.dbCrawlSpecProvider = dbCrawlSpecProvider;
this.anchorTagsSourceFactory = anchorTagsSourceFactory;
this.gson = gson; this.gson = gson;
this.node = processConfiguration.node(); this.node = processConfiguration.node();
@ -131,7 +136,10 @@ public class CrawlerMain {
public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException { public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException {
heartbeat.start(); heartbeat.start();
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"))) { try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"));
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains())
) {
// First a validation run to ensure the file is all good to parse // First a validation run to ensure the file is all good to parse
logger.info("Validating JSON"); logger.info("Validating JSON");
@ -144,7 +152,7 @@ public class CrawlerMain {
.takeWhile((e) -> abortMonitor.isAlive()) .takeWhile((e) -> abortMonitor.isAlive())
.filter(e -> !workLog.isJobFinished(e.domain)) .filter(e -> !workLog.isJobFinished(e.domain))
.filter(e -> processingIds.put(e.domain, "") == null) .filter(e -> processingIds.put(e.domain, "") == null)
.map(e -> new CrawlTask(e, outputDir, workLog)) .map(e -> new CrawlTask(e, anchorTagsSource, outputDir, workLog))
.forEach(pool::submitQuietly); .forEach(pool::submitQuietly);
} }
@ -178,13 +186,16 @@ public class CrawlerMain {
private final String domain; private final String domain;
private final String id; private final String id;
private final AnchorTagsSource anchorTagsSource;
private final Path outputDir; private final Path outputDir;
private final WorkLog workLog; private final WorkLog workLog;
CrawlTask(CrawlSpecRecord specification, CrawlTask(CrawlSpecRecord specification,
AnchorTagsSource anchorTagsSource,
Path outputDir, Path outputDir,
WorkLog workLog) { WorkLog workLog) {
this.specification = specification; this.specification = specification;
this.anchorTagsSource = anchorTagsSource;
this.outputDir = outputDir; this.outputDir = outputDir;
this.workLog = workLog; this.workLog = workLog;
@ -202,18 +213,20 @@ public class CrawlerMain {
try (CrawledDomainWriter writer = new CrawledDomainWriter(outputDir, domain, id); try (CrawledDomainWriter writer = new CrawledDomainWriter(outputDir, domain, id);
CrawlDataReference reference = getReference()) CrawlDataReference reference = getReference())
{ {
Thread.currentThread().setName("crawling:" + specification.domain); Thread.currentThread().setName("crawling:" + domain);
var domainLinks = anchorTagsSource.getAnchorTags(domain);
var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept); var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept);
int size = retreiver.fetch(reference); int size = retreiver.fetch(domainLinks, reference);
workLog.setJobToFinished(specification.domain, writer.getOutputFile().toString(), size); workLog.setJobToFinished(domain, writer.getOutputFile().toString(), size);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
logger.info("Fetched {}", specification.domain); logger.info("Fetched {}", domain);
} catch (Exception e) { } catch (Exception e) {
logger.error("Error fetching domain " + specification.domain, e); logger.error("Error fetching domain " + domain, e);
} }
finally { finally {
// We don't need to double-count these; it's also kept int he workLog // We don't need to double-count these; it's also kept int he workLog

View File

@ -4,6 +4,7 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import crawlercommons.robots.SimpleRobotRules; import crawlercommons.robots.SimpleRobotRules;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.crawl.retreival.fetcher.ContentTags; import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever; import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever;
@ -81,49 +82,41 @@ public class CrawlerRetreiver {
} }
public int fetch() { public int fetch() {
return fetch(new CrawlDataReference()); return fetch(new DomainLinks(), new CrawlDataReference());
} }
public int fetch(CrawlDataReference oldCrawlData) { public int fetch(DomainLinks domainLinks, CrawlDataReference oldCrawlData) {
final DomainProber.ProbeResult probeResult = domainProber.probeDomain(fetcher, domain, crawlFrontier.peek()); final DomainProber.ProbeResult probeResult = domainProber.probeDomain(fetcher, domain, crawlFrontier.peek());
if (probeResult instanceof DomainProber.ProbeResultOk) { return switch (probeResult) {
return crawlDomain(oldCrawlData); case DomainProber.ProbeResultOk(EdgeUrl probedUrl) -> crawlDomain(oldCrawlData, probedUrl, domainLinks);
} case DomainProber.ProbeResultError(CrawlerDomainStatus status, String desc) -> {
// handle error cases for probe
var ip = findIp(domain);
if (probeResult instanceof DomainProber.ProbeResultError err) {
crawledDomainWriter.accept( crawledDomainWriter.accept(
CrawledDomain.builder() CrawledDomain.builder()
.crawlerStatus(err.status().name()) .crawlerStatus(status.name())
.crawlerStatusDesc(err.desc()) .crawlerStatusDesc(desc)
.domain(domain) .domain(domain)
.ip(ip) .ip(findIp(domain))
.build() .build()
); );
return 1; yield 1;
} }
case DomainProber.ProbeResultRedirect(EdgeDomain redirectDomain) -> {
if (probeResult instanceof DomainProber.ProbeResultRedirect redirect) {
crawledDomainWriter.accept( crawledDomainWriter.accept(
CrawledDomain.builder() CrawledDomain.builder()
.crawlerStatus(CrawlerDomainStatus.REDIRECT.name()) .crawlerStatus(CrawlerDomainStatus.REDIRECT.name())
.crawlerStatusDesc("Redirected to different domain") .crawlerStatusDesc("Redirected to different domain")
.redirectDomain(redirect.domain().toString()) .redirectDomain(redirectDomain.toString())
.domain(domain) .domain(domain)
.ip(ip) .ip(findIp(domain))
.build() .build()
); );
return 1; yield 1;
}
};
} }
throw new IllegalStateException("Unknown probe result: " + probeResult); private int crawlDomain(CrawlDataReference oldCrawlData, EdgeUrl rootUrl, DomainLinks domainLinks) {
};
private int crawlDomain(CrawlDataReference oldCrawlData) {
String ip = findIp(domain); String ip = findIp(domain);
assert !crawlFrontier.isEmpty(); assert !crawlFrontier.isEmpty();
@ -131,7 +124,7 @@ public class CrawlerRetreiver {
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain); final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay()); final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
sniffRootDocument(delayTimer); sniffRootDocument(delayTimer, rootUrl);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified // Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
int recrawled = recrawl(oldCrawlData, robotsRules, delayTimer); int recrawled = recrawl(oldCrawlData, robotsRules, delayTimer);
@ -141,7 +134,11 @@ public class CrawlerRetreiver {
crawlFrontier.increaseDepth(1.5); crawlFrontier.increaseDepth(1.5);
} }
downloadSitemaps(robotsRules); // Add external links to the crawl frontier
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
// Add links from the sitemap to the crawl frontier
downloadSitemaps(robotsRules, rootUrl);
CrawledDomain ret = new CrawledDomain(domain, null, CrawlerDomainStatus.OK.name(), null, ip, new ArrayList<>(), null); CrawledDomain ret = new CrawledDomain(domain, null, CrawlerDomainStatus.OK.name(), null, ip, new ArrayList<>(), null);
@ -259,18 +256,18 @@ public class CrawlerRetreiver {
return recrawled; return recrawled;
} }
private void downloadSitemaps(SimpleRobotRules robotsRules) { private void downloadSitemaps(SimpleRobotRules robotsRules, EdgeUrl rootUrl) {
List<String> sitemaps = robotsRules.getSitemaps(); List<String> sitemaps = robotsRules.getSitemaps();
if (sitemaps.isEmpty()) {
sitemaps = List.of(
"http://" + domain + "/sitemap.xml",
"https://" + domain + "/sitemap.xml");
}
List<EdgeUrl> urls = new ArrayList<>(sitemaps.size()); List<EdgeUrl> urls = new ArrayList<>(sitemaps.size());
if (!sitemaps.isEmpty()) {
for (var url : sitemaps) { for (var url : sitemaps) {
EdgeUrl.parse(url).ifPresent(urls::add); EdgeUrl.parse(url).ifPresent(urls::add);
} }
}
else {
urls.add(rootUrl.withPathAndParam("/sitemap.xml", null));
}
downloadSitemaps(urls); downloadSitemaps(urls);
} }
@ -305,11 +302,11 @@ public class CrawlerRetreiver {
logger.debug("Queue is now {}", crawlFrontier.queueSize()); logger.debug("Queue is now {}", crawlFrontier.queueSize());
} }
private void sniffRootDocument(CrawlDelayTimer delayTimer) { private void sniffRootDocument(CrawlDelayTimer delayTimer, EdgeUrl rootUrl) {
try { try {
logger.debug("Configuring link filter"); logger.debug("Configuring link filter");
var url = crawlFrontier.peek().withPathAndParam("/", null); var url = rootUrl.withPathAndParam("/", null);
var maybeSample = fetchUrl(url, delayTimer, DocumentWithReference.empty()).filter(sample -> sample.httpStatus == 200); var maybeSample = fetchUrl(url, delayTimer, DocumentWithReference.empty()).filter(sample -> sample.httpStatus == 200);
if (maybeSample.isEmpty()) if (maybeSample.isEmpty())

View File

@ -43,7 +43,7 @@ public class DomainProber {
var fetchResult = fetcher.probeDomain(firstUrlInQueue.withPathAndParam("/", null)); var fetchResult = fetcher.probeDomain(firstUrlInQueue.withPathAndParam("/", null));
if (fetchResult.ok()) if (fetchResult.ok())
return new ProbeResultOk(); return new ProbeResultOk(fetchResult.url);
if (fetchResult.state == FetchResultState.REDIRECT) if (fetchResult.state == FetchResultState.REDIRECT)
return new ProbeResultRedirect(fetchResult.domain); return new ProbeResultRedirect(fetchResult.domain);
@ -51,9 +51,21 @@ public class DomainProber {
return new ProbeResultError(CrawlerDomainStatus.ERROR, "Bad status"); return new ProbeResultError(CrawlerDomainStatus.ERROR, "Bad status");
} }
interface ProbeResult {}; public sealed interface ProbeResult permits ProbeResultError, ProbeResultRedirect, ProbeResultOk {};
record ProbeResultError(CrawlerDomainStatus status, String desc) implements ProbeResult {} /** The probing failed for one reason or another
record ProbeResultRedirect(EdgeDomain domain) implements ProbeResult {} * @param status Machine readable status
record ProbeResultOk() implements ProbeResult {} * @param desc Human-readable description of the error
*/
public record ProbeResultError(CrawlerDomainStatus status, String desc) implements ProbeResult {}
/** This domain redirects to another domain */
public record ProbeResultRedirect(EdgeDomain domain) implements ProbeResult {}
/** If the retreivala of the probed url was successful, return the url as it was fetched
* (which may be different from the url we probed, if we attempted another URL schema).
*
* @param probedUrl The url we successfully probed
*/
public record ProbeResultOk(EdgeUrl probedUrl) implements ProbeResult {}
} }

View File

@ -3,13 +3,21 @@ package nu.marginalia.crawl.retreival.fetcher;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.ToString; import lombok.ToString;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
@AllArgsConstructor @AllArgsConstructor
@ToString @ToString
public class FetchResult { public class FetchResult {
public final FetchResultState state; public final FetchResultState state;
public final EdgeUrl url;
public final EdgeDomain domain; public final EdgeDomain domain;
public FetchResult(FetchResultState state, EdgeUrl url) {
this.state = state;
this.url = url;
this.domain = url.domain;
}
public boolean ok() { public boolean ok() {
return state == FetchResultState.OK; return state == FetchResultState.OK;
} }

View File

@ -15,6 +15,7 @@ import nu.marginalia.model.EdgeUrl;
import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; import nu.marginalia.crawl.retreival.logic.ContentTypeLogic;
import nu.marginalia.crawl.retreival.logic.ContentTypeParser; import nu.marginalia.crawl.retreival.logic.ContentTypeParser;
import okhttp3.*; import okhttp3.*;
import org.apache.commons.collections4.queue.PredicatedQueue;
import org.apache.commons.io.input.BOMInputStream; import org.apache.commons.io.input.BOMInputStream;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -106,13 +107,12 @@ public class HttpFetcherImpl implements HttpFetcher {
var call = client.newCall(head); var call = client.newCall(head);
try (var rsp = call.execute()) { try (var rsp = call.execute()) {
var requestUrl = rsp.request().url().toString(); EdgeUrl requestUrl = new EdgeUrl(rsp.request().url().toString());
EdgeDomain requestDomain = new EdgeUrl(requestUrl).domain;
if (!Objects.equals(requestDomain, url.domain)) { if (!Objects.equals(requestUrl.domain, url.domain)) {
return new FetchResult(FetchResultState.REDIRECT, requestDomain); return new FetchResult(FetchResultState.REDIRECT, requestUrl);
} }
return new FetchResult(FetchResultState.OK, requestDomain); return new FetchResult(FetchResultState.OK, requestUrl);
} }
catch (Exception ex) { catch (Exception ex) {
@ -121,7 +121,7 @@ public class HttpFetcherImpl implements HttpFetcher {
} }
logger.info("Error during fetching", ex); logger.info("Error during fetching", ex);
return new FetchResult(FetchResultState.ERROR, url.domain); return new FetchResult(FetchResultState.ERROR, url);
} }
} }

View File

@ -1,10 +1,16 @@
package nu.marginalia.crawl.spec; package nu.marginalia.crawl.spec;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.crawlspec.CrawlSpecRecord; import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
public interface CrawlSpecProvider { public interface CrawlSpecProvider {
int totalCount() throws Exception; int totalCount() throws Exception;
Stream<CrawlSpecRecord> stream(); Stream<CrawlSpecRecord> stream();
default List<EdgeDomain> getDomains() {
return stream().map(CrawlSpecRecord::getDomain).map(EdgeDomain::new).toList();
}
} }

View File

@ -114,7 +114,7 @@ public class CrawlerMockFetcherTest {
@Override @Override
public FetchResult probeDomain(EdgeUrl url) { public FetchResult probeDomain(EdgeUrl url) {
logger.info("Probing {}", url); logger.info("Probing {}", url);
return new FetchResult(FetchResultState.OK, url.domain); return new FetchResult(FetchResultState.OK, url);
} }
@Override @Override

View File

@ -2,6 +2,7 @@ package nu.marginalia.crawling.retreival;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
@ -139,7 +140,7 @@ class CrawlerRetreiverTest {
if (d instanceof CrawledDocument doc) { if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus); System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
} }
}).fetch(new CrawlDataReference(stream)); }).fetch(new DomainLinks(), new CrawlDataReference(stream));
} }
} }