diff --git a/code/common/model/src/main/java/nu/marginalia/model/EdgeUrl.java b/code/common/model/src/main/java/nu/marginalia/model/EdgeUrl.java index f0f23956..c09ed550 100644 --- a/code/common/model/src/main/java/nu/marginalia/model/EdgeUrl.java +++ b/code/common/model/src/main/java/nu/marginalia/model/EdgeUrl.java @@ -224,19 +224,19 @@ public class EdgeUrl implements Serializable { } public URL asURL() throws MalformedURLException { - int port = this.port != null ? this.port : switch(proto) { - case "http" -> 80; - case "https" -> 443; - default -> 0; - }; - - return new URL(this.proto, this.domain.toString(), port, this.path); + try { + return asURI().toURL(); + } + catch (URISyntaxException e) { + throw new MalformedURLException(e.getMessage()); + } } public URI asURI() throws URISyntaxException { - if (port == null) - return new URI(this.proto, null, this.domain.toString(), this.path, this.param); - else + if (port != null) { return new URI(this.proto, null, this.domain.toString(), this.port, this.path, this.param, null); + } + + return new URI(this.proto, this.domain.toString(), this.path, this.param, null); } } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java index a7661085..67e8738c 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java @@ -14,7 +14,7 @@ public class CrawlerOutputFile { String second = id.substring(2, 4); Path destDir = base.resolve(first).resolve(second); - return destDir.resolve(id + "-" + filesystemSafeName(name) + ".zstd"); + return destDir.resolve(STR."\{id}-\{filesystemSafeName(name)}.zstd"); } /** Return the Path to a file for the given id and name, creating the prerequisite @@ -31,7 +31,7 @@ public class CrawlerOutputFile { if (!Files.exists(destDir)) { Files.createDirectories(destDir); } - return destDir.resolve(id + "-" + filesystemSafeName(name) + ".zstd"); + return destDir.resolve(STR."\{id}-\{filesystemSafeName(name)}.zstd"); } @@ -49,4 +49,25 @@ public class CrawlerOutputFile { } + public static Path createWarcFile(Path baseDir, String id, String name, WarcFileVersion version) { + if (id.length() < 4) { + id = Strings.repeat("0", 4 - id.length()) + id; + } + + String fileName = STR."\{id}-\{filesystemSafeName(name)}.zstd\{version.suffix}"; + + return baseDir.resolve(fileName); + } + + public enum WarcFileVersion { + LIVE(".open"), + TEMP(".tmp"), + FINAL(""); + + public final String suffix; + + WarcFileVersion(String suffix) { + this.suffix = suffix; + } + } } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/CrawledDomain.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/CrawledDomain.java index 482311c1..55ec27a6 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/CrawledDomain.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/CrawledDomain.java @@ -24,6 +24,10 @@ public class CrawledDomain implements SerializableCrawlData { return doc.size(); } + public boolean hasCookies() { + return cookies != null && !cookies.isEmpty(); + } + public static final String SERIAL_IDENTIFIER = "// DOMAIN"; @Override public String getSerialIdentifier() { diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java index fc824906..fea8f69a 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java @@ -79,7 +79,7 @@ public class DomainProcessor { ret.domain = new EdgeDomain(crawledDomain.domain); ret.ip = crawledDomain.ip; - cookies = Objects.requireNonNullElse(crawledDomain.cookies, Collections.emptyList()).size() > 0; + cookies = crawledDomain.hasCookies(); ip = crawledDomain.ip; if (crawledDomain.redirectDomain != null) { diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index b3a9d26a..a5d78a1f 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -18,6 +18,7 @@ import nu.marginalia.crawl.spec.CrawlSpecProvider; import nu.marginalia.crawl.spec.DbCrawlSpecProvider; import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider; import nu.marginalia.crawling.io.CrawledDomainReader; +import nu.marginalia.crawling.io.CrawlerOutputFile; import nu.marginalia.crawlspec.CrawlSpecFileNames; import nu.marginalia.storage.FileStorageService; import nu.marginalia.model.crawlspec.CrawlSpecRecord; @@ -30,16 +31,16 @@ import nu.marginalia.process.log.WorkLog; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.crawling.io.CrawledDomainWriter; import nu.marginalia.crawl.retreival.CrawlerRetreiver; -import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; import nu.marginalia.util.SimpleBlockingThreadPool; import okhttp3.ConnectionPool; import okhttp3.Dispatcher; -import okhttp3.internal.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.sql.SQLException; import java.util.*; import java.util.concurrent.*; @@ -212,8 +213,19 @@ public class CrawlerMain { @Override public void run() throws Exception { + Path newWarcFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE); + Path tempFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); + Path finalWarcFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); + + if (Files.exists(newWarcFile)) { + Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING); + } + else { + Files.deleteIfExists(tempFile); + } + try (CrawledDomainWriter writer = new CrawledDomainWriter(outputDir, domain, id); - var warcRecorder = new WarcRecorder(); // write to a temp file for now + var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now var retreiver = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder, writer::accept); CrawlDataReference reference = getReference()) { @@ -221,19 +233,33 @@ public class CrawlerMain { var domainLinks = anchorTagsSource.getAnchorTags(domain); + if (Files.exists(tempFile)) { + retreiver.syncAbortedRun(tempFile); + Files.delete(tempFile); + } + int size = retreiver.fetch(domainLinks, reference); + Files.move(newWarcFile, finalWarcFile, StandardCopyOption.REPLACE_EXISTING); + workLog.setJobToFinished(domain, writer.getOutputFile().toString(), size); heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); logger.info("Fetched {}", domain); } catch (Exception e) { logger.error("Error fetching domain " + domain, e); + Files.deleteIfExists(newWarcFile); + if (tempFile != null) { + Files.deleteIfExists(tempFile); + } } finally { // We don't need to double-count these; it's also kept int he workLog processingIds.remove(domain); Thread.currentThread().setName("[idle]"); + + // FIXME: Remove this when we're done + Files.deleteIfExists(finalWarcFile); } } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/CrawledDocumentFactory.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java similarity index 81% rename from code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/CrawledDocumentFactory.java rename to code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java index 8a654e20..b3ab9ee5 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/CrawledDocumentFactory.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java @@ -1,4 +1,4 @@ -package nu.marginalia.crawl.retreival.fetcher; +package nu.marginalia.crawl.retreival; import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; import nu.marginalia.crawling.model.CrawledDocument; @@ -70,6 +70,22 @@ public class CrawledDocumentFactory { .httpStatus(rsp.statusCode()) .url(url.toString()) .build(); + } + public static CrawledDocument createRobotsError(EdgeUrl url) { + return CrawledDocument.builder() + .url(url.toString()) + .timestamp(LocalDateTime.now().toString()) + .httpStatus(-1) + .crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name()) + .build(); + } + public static CrawledDocument createRetryError(EdgeUrl url) { + return CrawledDocument.builder() + .url(url.toString()) + .timestamp(LocalDateTime.now().toString()) + .httpStatus(429) + .crawlerStatus(CrawlerDocumentStatus.ERROR.name()) + .build(); } } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java index bb4991b9..30054008 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java @@ -9,6 +9,9 @@ import nu.marginalia.crawl.retreival.fetcher.ContentTags; import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; +import nu.marginalia.crawl.retreival.revisit.CrawlerRevisitor; +import nu.marginalia.crawl.retreival.revisit.DocumentWithReference; +import nu.marginalia.crawl.retreival.sitemap.SitemapFetcher; import nu.marginalia.link_parser.LinkParser; import nu.marginalia.crawling.model.*; import nu.marginalia.ip_blocklist.UrlBlocklist; @@ -20,12 +23,9 @@ import org.jsoup.nodes.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.file.Path; -import java.time.LocalDateTime; import java.util.*; import java.util.function.Consumer; @@ -46,18 +46,13 @@ public class CrawlerRetreiver implements AutoCloseable { private static final LinkFilterSelector linkFilterSelector = new LinkFilterSelector(); private final DomainProber domainProber; - private final SitemapRetriever sitemapRetriever; private final DomainCrawlFrontier crawlFrontier; private final WarcRecorder warcRecorder; + private final CrawlerRevisitor crawlerRevisitor; + private final SitemapFetcher sitemapFetcher; int errorCount = 0; - /** recrawlState tag for documents that had a HTTP status 304 */ - private static final String documentWasRetainedTag = "RETAINED/304"; - - /** recrawlState tag for documents that had a 200 status but were identical to a previous version */ - private static final String documentWasSameTag = "SAME-BY-COMPARISON"; - public CrawlerRetreiver(HttpFetcher fetcher, DomainProber domainProber, CrawlSpecRecord specs, @@ -72,8 +67,10 @@ public class CrawlerRetreiver implements AutoCloseable { crawledDomainWriter = writer; - this.crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), Objects.requireNonNullElse(specs.urls, List.of()), specs.crawlDepth); - sitemapRetriever = fetcher.createSitemapRetriever(); + + crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), Objects.requireNonNullElse(specs.urls, List.of()), specs.crawlDepth); + crawlerRevisitor = new CrawlerRevisitor(crawlFrontier, crawledDomainWriter, this, warcRecorder); + sitemapFetcher = new SitemapFetcher(crawlFrontier, fetcher.createSitemapRetriever()); // We must always crawl the index page first, this is assumed when fingerprinting the server var fst = crawlFrontier.peek(); @@ -125,6 +122,12 @@ public class CrawlerRetreiver implements AutoCloseable { }; } + public void syncAbortedRun(Path warcFile) { + var resync = new CrawlerWarcResynchronizer(crawlFrontier, warcRecorder); + + resync.run(warcFile); + } + private int crawlDomain(CrawlDataReference oldCrawlData, EdgeUrl rootUrl, DomainLinks domainLinks) { String ip = findIp(domain); @@ -147,9 +150,15 @@ public class CrawlerRetreiver implements AutoCloseable { crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto)); // Add links from the sitemap to the crawl frontier - downloadSitemaps(robotsRules, rootUrl); + sitemapFetcher.downloadSitemaps(robotsRules, rootUrl); - CrawledDomain ret = new CrawledDomain(domain, null, CrawlerDomainStatus.OK.name(), null, ip, new ArrayList<>(), null); + CrawledDomain ret = new CrawledDomain(domain, + null, + CrawlerDomainStatus.OK.name(), + null, + ip, + new ArrayList<>(), + null); int fetchedCount = recrawled; @@ -161,7 +170,7 @@ public class CrawlerRetreiver implements AutoCloseable { var top = crawlFrontier.takeNextUrl(); if (!robotsRules.isAllowed(top.toString())) { - crawledDomainWriter.accept(createRobotsError(top)); + crawledDomainWriter.accept(CrawledDocumentFactory.createRobotsError(top)); continue; } @@ -196,119 +205,9 @@ public class CrawlerRetreiver implements AutoCloseable { return fetchedCount; } - /** Performs a re-crawl of old documents, comparing etags and last-modified */ - private int recrawl(CrawlDataReference oldCrawlData, - SimpleRobotRules robotsRules, - CrawlDelayTimer delayTimer) { - int recrawled = 0; - int retained = 0; - - for (;;) { - CrawledDocument doc = oldCrawlData.nextDocument(); - - if (doc == null) { - break; - } - - // This Shouldn't Happen (TM) - var urlMaybe = EdgeUrl.parse(doc.url); - if (urlMaybe.isEmpty()) continue; - var url = urlMaybe.get(); - - // If we've previously 404:d on this URL, we'll refrain from trying to fetch it again - if (doc.httpStatus == 404) { - crawlFrontier.addVisited(url); - continue; - } - - if (doc.httpStatus != 200) continue; - - if (!robotsRules.isAllowed(url.toString())) { - crawledDomainWriter.accept(createRobotsError(url)); - continue; - } - if (!crawlFrontier.filterLink(url)) - continue; - if (!crawlFrontier.addVisited(url)) - continue; - - - if (recrawled > 5 - && retained > 0.9 * recrawled - && Math.random() < 0.9) - { - // Since it looks like most of these documents haven't changed, - // we'll load the documents directly; but we do this in a random - // fashion to make sure we eventually catch changes over time - - crawledDomainWriter.accept(doc); - crawlFrontier.addVisited(url); - continue; - } - - - // GET the document with the stored document as a reference - // providing etag and last-modified headers, so we can recycle the - // document if it hasn't changed without actually downloading it - - var fetchedDocOpt = fetchWriteAndSleep(url, - delayTimer, - new DocumentWithReference(doc, oldCrawlData)); - if (fetchedDocOpt.isEmpty()) continue; - - if (documentWasRetainedTag.equals(fetchedDocOpt.get().recrawlState)) retained ++; - else if (documentWasSameTag.equals(fetchedDocOpt.get().recrawlState)) retained ++; - - recrawled ++; - } - - return recrawled; - } - - private void downloadSitemaps(SimpleRobotRules robotsRules, EdgeUrl rootUrl) { - List sitemaps = robotsRules.getSitemaps(); - - List urls = new ArrayList<>(sitemaps.size()); - if (!sitemaps.isEmpty()) { - for (var url : sitemaps) { - EdgeUrl.parse(url).ifPresent(urls::add); - } - } - else { - urls.add(rootUrl.withPathAndParam("/sitemap.xml", null)); - } - - downloadSitemaps(urls); - } - - private void downloadSitemaps(List urls) { - - Set checkedSitemaps = new HashSet<>(); - - for (var url : urls) { - // Let's not download sitemaps from other domains for now - if (!crawlFrontier.isSameDomain(url)) { - continue; - } - - if (checkedSitemaps.contains(url.path)) - continue; - - var sitemap = sitemapRetriever.fetchSitemap(url); - if (sitemap.isEmpty()) { - continue; - } - - // ensure we don't try to download this sitemap again - // (don't move this up, as we may want to check the same - // path with different protocols until we find one that works) - - checkedSitemaps.add(url.path); - - crawlFrontier.addAllToQueue(sitemap); - } - - logger.debug("Queue is now {}", crawlFrontier.queueSize()); + /** Using the old crawl data, fetch the documents comparing etags and last-modified */ + private int recrawl(CrawlDataReference oldCrawlData, SimpleRobotRules robotsRules, CrawlDelayTimer delayTimer) { + return crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer); } private void sniffRootDocument(CrawlDelayTimer delayTimer, EdgeUrl rootUrl) { @@ -345,7 +244,7 @@ public class CrawlerRetreiver implements AutoCloseable { linkParser.parseLink(url, href) .filter(crawlFrontier::isSameDomain) .map(List::of) - .ifPresent(this::downloadSitemaps); + .ifPresent(sitemapFetcher::downloadSitemaps); } } catch (Exception ex) { @@ -353,7 +252,7 @@ public class CrawlerRetreiver implements AutoCloseable { } } - private Optional fetchWriteAndSleep(EdgeUrl top, + public Optional fetchWriteAndSleep(EdgeUrl top, CrawlDelayTimer timer, DocumentWithReference reference) { logger.debug("Fetching {}", top); @@ -365,11 +264,11 @@ public class CrawlerRetreiver implements AutoCloseable { if (docOpt.isPresent()) { var doc = docOpt.get(); - if (!Objects.equals(doc.recrawlState, documentWasRetainedTag) + if (!Objects.equals(doc.recrawlState, CrawlerRevisitor.documentWasRetainedTag) && reference.isContentBodySame(doc)) { // The document didn't change since the last time - doc.recrawlState = documentWasSameTag; + doc.recrawlState = CrawlerRevisitor.documentWasSameTag; } crawledDomainWriter.accept(doc); @@ -408,7 +307,7 @@ public class CrawlerRetreiver implements AutoCloseable { var parsedDoc = Jsoup.parse(doc.documentBody); EdgeUrl url = new EdgeUrl(doc.url); - findLinks(url, parsedDoc); + crawlFrontier.enqueueLinksFromDocument(url, parsedDoc); findCanonicalUrl(url, parsedDoc) .ifPresent(canonicalLink -> doc.canonicalUrl = canonicalLink.toString()); } @@ -442,34 +341,13 @@ public class CrawlerRetreiver implements AutoCloseable { } } - return createRetryError(top); + return CrawledDocumentFactory.createRetryError(top); } private String createHash(String documentBodyHash) { return hashMethod.hashUnencodedChars(documentBodyHash).toString(); } - private void findLinks(EdgeUrl baseUrl, Document parsed) { - baseUrl = linkParser.getBaseLink(parsed, baseUrl); - - for (var link : parsed.getElementsByTag("a")) { - linkParser.parseLink(baseUrl, link).ifPresent(crawlFrontier::addToQueue); - } - for (var link : parsed.getElementsByTag("frame")) { - linkParser.parseFrame(baseUrl, link).ifPresent(crawlFrontier::addToQueue); - } - for (var link : parsed.getElementsByTag("iframe")) { - linkParser.parseFrame(baseUrl, link).ifPresent(crawlFrontier::addToQueue); - } - for (var link : parsed.getElementsByTag("link")) { - String rel = link.attr("rel"); - - if (rel.equalsIgnoreCase("next") || rel.equalsIgnoreCase("prev")) { - linkParser.parseLink(baseUrl, link).ifPresent(crawlFrontier::addToQueue); - } - } - } - private Optional findCanonicalUrl(EdgeUrl baseUrl, Document parsed) { baseUrl = baseUrl.domain.toRootUrl(); @@ -488,97 +366,9 @@ public class CrawlerRetreiver implements AutoCloseable { } } - private CrawledDocument createRobotsError(EdgeUrl url) { - return CrawledDocument.builder() - .url(url.toString()) - .timestamp(LocalDateTime.now().toString()) - .httpStatus(-1) - .crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name()) - .build(); - } - private CrawledDocument createRetryError(EdgeUrl url) { - return CrawledDocument.builder() - .url(url.toString()) - .timestamp(LocalDateTime.now().toString()) - .httpStatus(429) - .crawlerStatus(CrawlerDocumentStatus.ERROR.name()) - .build(); - } - @Override public void close() throws Exception { warcRecorder.close(); } - private record DocumentWithReference( - @Nullable CrawledDocument doc, - @Nullable CrawlDataReference reference) { - - private static final DocumentWithReference emptyInstance = new DocumentWithReference(null, null); - public static DocumentWithReference empty() { - return emptyInstance; - } - - public boolean isContentBodySame(CrawledDocument newDoc) { - if (reference == null) - return false; - if (doc == null) - return false; - if (doc.documentBody == null) - return false; - if (newDoc.documentBody == null) - return false; - - return reference.isContentBodySame(doc, newDoc); - } - - private ContentTags getContentTags() { - if (null == doc) - return ContentTags.empty(); - - String headers = doc.headers; - if (headers == null) - return ContentTags.empty(); - - String[] headersLines = headers.split("\n"); - - String lastmod = null; - String etag = null; - - for (String line : headersLines) { - if (line.toLowerCase().startsWith("etag:")) { - etag = line.substring(5).trim(); - } - if (line.toLowerCase().startsWith("last-modified:")) { - lastmod = line.substring(14).trim(); - } - } - - return new ContentTags(etag, lastmod); - } - - public boolean isEmpty() { - return doc == null || reference == null; - } - - /** If the provided document has HTTP status 304, and the reference document is provided, - * return the reference document; otherwise return the provided document. - */ - public CrawledDocument replaceOn304(CrawledDocument fetchedDoc) { - - if (doc == null) - return fetchedDoc; - - // HTTP status 304 is NOT MODIFIED, which means the document is the same as it was when - // we fetched it last time. We can recycle the reference document. - if (fetchedDoc.httpStatus != 304) - return fetchedDoc; - - var ret = doc; - ret.recrawlState = documentWasRetainedTag; - ret.timestamp = LocalDateTime.now().toString(); - return ret; - } - } - } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java new file mode 100644 index 00000000..01bafbe1 --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java @@ -0,0 +1,110 @@ +package nu.marginalia.crawl.retreival; + +import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyExtractor; +import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyResult; +import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; +import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; +import nu.marginalia.model.EdgeUrl; +import org.jsoup.Jsoup; +import org.netpreserve.jwarc.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * This class is responsible for resynchronizing the crawl frontier with a partially written + * warc file. This may happen if the crawl is interrupted or crashes. + *

+ * This is best-effort and not guaranteed to recover all data, but it should limit + * the amount of data that is lost and needs to be re-crawled in the event of an unexpected + * shutdown. + */ +public class CrawlerWarcResynchronizer { + private final DomainCrawlFrontier crawlFrontier; + private final WarcRecorder recorder; + private static final Logger logger = LoggerFactory.getLogger(CrawlerWarcResynchronizer.class); + public CrawlerWarcResynchronizer(DomainCrawlFrontier crawlFrontier, WarcRecorder recorder) { + this.crawlFrontier = crawlFrontier; + this.recorder = recorder; + } + + public void run(Path tempFile) { + // First pass, enqueue links + try (var reader = new WarcReader(tempFile)) { + for (var item : reader) { + accept(item); + } + } catch (IOException e) { + logger.info(STR."Failed read full warc file \{tempFile}", e); + } + + // Second pass, copy records to the new warc file + try (var reader = new WarcReader(tempFile)) { + for (var item : reader) { + recorder.resync(item); + } + } catch (IOException e) { + logger.info(STR."Failed read full warc file \{tempFile}", e); + } + } + + public void accept(WarcRecord item) { + try { + if (item instanceof WarcResponse rsp) { + response(rsp); + } else if (item instanceof WarcRevisit revisit) { + revisit(revisit); + } else if (item instanceof WarcRequest req) { + request(req); + } + } + catch (Exception ex) { + logger.info(STR."Failed to process warc record \{item}", ex); + } + } + + private void request(WarcRequest request) { + EdgeUrl.parse(request.target()).ifPresent(crawlFrontier::addVisited); + } + + private void response(WarcResponse rsp) { + var url = new EdgeUrl(rsp.targetURI()); + + crawlFrontier.addVisited(url); + + try { + var response = HttpFetchResult.importWarc(rsp); + if (DocumentBodyExtractor.extractBody(response) instanceof DocumentBodyResult.Ok ok) { + var doc = Jsoup.parse(ok.body()); + crawlFrontier.enqueueLinksFromDocument(url, doc); + } + } + catch (Exception e) { + logger.info(STR."Failed to parse response body for \{url}", e); + } + } + + private void revisit(WarcRevisit revisit) throws IOException { + if (!WarcRecorder.revisitURI.equals(revisit.profile())) { + return; + } + + var url = new EdgeUrl(revisit.targetURI()); + + crawlFrontier.addVisited(url); + + try { + var response = HttpFetchResult.importWarc(revisit); + if (DocumentBodyExtractor.extractBody(response) instanceof DocumentBodyResult.Ok ok) { + var doc = Jsoup.parse(ok.body()); + crawlFrontier.enqueueLinksFromDocument(url, doc); + } + } + catch (Exception e) { + logger.info(STR."Failed to parse response body for \{url}", e); + } + } + +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java index 30902a8e..6d868fdf 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java @@ -3,14 +3,19 @@ package nu.marginalia.crawl.retreival; import com.google.common.hash.HashFunction; import it.unimi.dsi.fastutil.longs.LongOpenHashSet; import nu.marginalia.ip_blocklist.UrlBlocklist; +import nu.marginalia.link_parser.LinkParser; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; +import org.jsoup.nodes.Document; import java.net.URISyntaxException; import java.util.*; import java.util.function.Predicate; public class DomainCrawlFrontier { + + private static final LinkParser linkParser = new LinkParser(); + private final ArrayDeque queue; // To save the number of strings kept in memory, @@ -141,4 +146,27 @@ public class DomainCrawlFrontier { public int queueSize() { return queue.size(); } + + + public void enqueueLinksFromDocument(EdgeUrl baseUrl, Document parsed) { + baseUrl = linkParser.getBaseLink(parsed, baseUrl); + + for (var link : parsed.getElementsByTag("a")) { + linkParser.parseLink(baseUrl, link).ifPresent(this::addToQueue); + } + for (var link : parsed.getElementsByTag("frame")) { + linkParser.parseFrame(baseUrl, link).ifPresent(this::addToQueue); + } + for (var link : parsed.getElementsByTag("iframe")) { + linkParser.parseFrame(baseUrl, link).ifPresent(this::addToQueue); + } + for (var link : parsed.getElementsByTag("link")) { + String rel = link.attr("rel"); + + if (rel.equalsIgnoreCase("next") || rel.equalsIgnoreCase("prev")) { + linkParser.parseLink(baseUrl, link).ifPresent(this::addToQueue); + } + } + } + } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java index 8fc288f9..be815954 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java @@ -8,6 +8,7 @@ import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; +import java.nio.file.Path; import java.util.List; @ImplementedBy(HttpFetcherImpl.class) diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java index 8ff9dd12..3faffe4a 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java @@ -5,22 +5,21 @@ import com.google.inject.name.Named; import crawlercommons.robots.SimpleRobotRules; import crawlercommons.robots.SimpleRobotRulesParser; import lombok.SneakyThrows; -import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.crawl.retreival.Cookies; import nu.marginalia.crawl.retreival.RateLimitException; import nu.marginalia.crawl.retreival.fetcher.ContentTypeProber.ContentTypeProbeResult; +import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyExtractor; +import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyResult; import nu.marginalia.crawl.retreival.fetcher.socket.*; import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; -import static nu.marginalia.crawl.retreival.fetcher.CrawledDocumentFactory.*; +import static nu.marginalia.crawl.retreival.CrawledDocumentFactory.*; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.crawling.model.CrawlerDocumentStatus; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; -import nu.marginalia.contenttype.ContentTypeParser; import okhttp3.*; -import org.apache.commons.io.input.BOMInputStream; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +33,6 @@ import java.nio.charset.IllegalCharsetNameException; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPInputStream; public class HttpFetcherImpl implements HttpFetcher { @@ -45,7 +43,7 @@ public class HttpFetcherImpl implements HttpFetcher { private static final SimpleRobotRulesParser robotsParser = new SimpleRobotRulesParser(); - private final ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); + private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); private final ContentTypeProber contentTypeProber; @Override @@ -188,14 +186,14 @@ public class HttpFetcherImpl implements HttpFetcher { } else if (result instanceof HttpFetchResult.ResultOk ok) { try { - return extractBody(url, ok); + return extractBody(userAgent, url, ok); } catch (Exception ex) { return createErrorFromException(url, ex); } } else { - throw new IllegalStateException("Unknown result type " + result.getClass()); + throw new IllegalStateException(STR."Unknown result type \{result.getClass()}"); } } @@ -216,7 +214,7 @@ public class HttpFetcherImpl implements HttpFetcher { }; } - private CrawledDocument extractBody(EdgeUrl url, HttpFetchResult.ResultOk rsp) throws IOException, RateLimitException { + public static CrawledDocument extractBody(String userAgent, EdgeUrl url, HttpFetchResult.ResultOk rsp) throws IOException, RateLimitException { var responseUrl = new EdgeUrl(rsp.uri()); @@ -230,29 +228,6 @@ public class HttpFetcherImpl implements HttpFetcher { throw new RateLimitException(retryAfter); } - var byteStream = rsp.getInputStream(); - - if ("gzip".equals(rsp.header("Content-Encoding"))) { - byteStream = new GZIPInputStream(byteStream); - } - byteStream = new BOMInputStream(byteStream); - - var contentTypeHeader = rsp.header("Content-Type"); - if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) { - return createErrorResponse(url, rsp, CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); - } - - byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder - - var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data); - if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) { - return createErrorResponse(url, rsp, CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); - } - - if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) { - return createErrorResponse(url, rsp, CrawlerDocumentStatus.BAD_CHARSET, ""); - } - if (!isXRobotsTagsPermitted(rsp.allHeaders("X-Robots-Tag"), userAgent)) { return CrawledDocument.builder() .crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name()) @@ -264,17 +239,20 @@ public class HttpFetcherImpl implements HttpFetcher { .build(); } - var strData = DocumentBodyToString.getStringData(contentType, data); - - return CrawledDocument.builder() - .crawlerStatus(CrawlerDocumentStatus.OK.name()) - .headers(rsp.headers().toString()) - .contentType(contentTypeHeader) - .timestamp(LocalDateTime.now().toString()) - .httpStatus(rsp.statusCode()) - .url(responseUrl.toString()) - .documentBody(strData) - .build(); + return switch(DocumentBodyExtractor.extractBody(rsp)) { + case DocumentBodyResult.Error(CrawlerDocumentStatus status, String why) -> + createErrorResponse(url, rsp, status, why); + case DocumentBodyResult.Ok(String contentType, String body) -> + CrawledDocument.builder() + .crawlerStatus(CrawlerDocumentStatus.OK.name()) + .headers(rsp.headers().toString()) + .contentType(contentType) + .timestamp(LocalDateTime.now().toString()) + .httpStatus(rsp.statusCode()) + .url(responseUrl.toString()) + .documentBody(body) + .build(); + }; } /** Check X-Robots-Tag header tag to see if we are allowed to index this page. diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyExtractor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyExtractor.java new file mode 100644 index 00000000..99ae2cae --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyExtractor.java @@ -0,0 +1,44 @@ +package nu.marginalia.crawl.retreival.fetcher.body; + +import nu.marginalia.contenttype.ContentTypeParser; +import nu.marginalia.contenttype.DocumentBodyToString; +import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; +import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; +import nu.marginalia.crawling.model.CrawlerDocumentStatus; +import org.apache.commons.io.input.BOMInputStream; + +import java.io.IOException; +import java.util.zip.GZIPInputStream; + +public class DocumentBodyExtractor { + private static ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); + + public static DocumentBodyResult extractBody(HttpFetchResult.ResultOk rsp) throws IOException { + var byteStream = rsp.getInputStream(); + + if ("gzip".equals(rsp.header("Content-Encoding"))) { + byteStream = new GZIPInputStream(byteStream); + } + byteStream = new BOMInputStream(byteStream); + + var contentTypeHeader = rsp.header("Content-Type"); + if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); + } + + byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder + + var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data); + if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); + } + + if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CHARSET, ""); + } + + + return new DocumentBodyResult.Ok(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data)); + } + +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyResult.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyResult.java new file mode 100644 index 00000000..fc5d67ec --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyResult.java @@ -0,0 +1,8 @@ +package nu.marginalia.crawl.retreival.fetcher.body; + +import nu.marginalia.crawling.model.CrawlerDocumentStatus; + +public sealed interface DocumentBodyResult { + record Ok(String contentType, String body) implements DocumentBodyResult { } + record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult { } +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/HttpFetchResult.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/HttpFetchResult.java index 305c05da..ae9673b1 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/HttpFetchResult.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/HttpFetchResult.java @@ -1,13 +1,47 @@ package nu.marginalia.crawl.retreival.fetcher.warc; import okhttp3.Headers; +import org.netpreserve.jwarc.MessageHeaders; +import org.netpreserve.jwarc.WarcResponse; +import org.netpreserve.jwarc.WarcRevisit; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.List; public sealed interface HttpFetchResult { + static ResultOk importWarc(WarcResponse response) throws IOException { + var http = response.http(); + try (var body = http.body()) { + byte[] bytes = body.stream().readAllBytes(); + + return new ResultOk( + response.targetURI(), + http.status(), + http.headers(), + bytes, + 0, + bytes.length + ); + } + } + static ResultOk importWarc(WarcRevisit revisit) throws IOException { + var http = revisit.http(); + try (var body = http.body()) { + byte[] bytes = body.stream().readAllBytes(); + + return new ResultOk( + revisit.targetURI(), + http.status(), + http.headers(), + bytes, + 0, + bytes.length + ); + } + } record ResultOk(URI uri, int statusCode, Headers headers, @@ -15,6 +49,26 @@ public sealed interface HttpFetchResult { int bytesStart, int bytesLength ) implements HttpFetchResult { + + public ResultOk(URI uri, + int statusCode, + MessageHeaders headers, + byte[] bytesRaw, + int bytesStart, + int bytesLength) { + this(uri, statusCode, convertHeaders(headers), bytesRaw, bytesStart, bytesLength); + } + + private static Headers convertHeaders(MessageHeaders headers) { + var ret = new Headers.Builder(); + for (var header : headers.map().entrySet()) { + for (var value : header.getValue()) { + ret.add(header.getKey(), value); + } + } + return ret.build(); + } + public InputStream getInputStream() { return new ByteArrayInputStream(bytesRaw, bytesStart, bytesLength); } @@ -26,6 +80,7 @@ public sealed interface HttpFetchResult { return headers.values(name); } + }; record ResultError(Exception ex) implements HttpFetchResult { }; } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java index 683498a0..368bf3c7 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java @@ -34,6 +34,17 @@ public class WarcProtocolReconstructor { return requestStringBuilder.toString(); } + static String getResponseHeader(String headersAsString, int code) { + String version = "1.1"; + + String statusCode = String.valueOf(code); + String statusMessage = STATUS_CODE_MAP.getOrDefault(code, "Unknown"); + + String headerString = getHeadersAsString(headersAsString); + + return STR."HTTP/\{version} \{statusCode} \{statusMessage}\r\n\{headerString}\r\n\r\n"; + } + static String getResponseHeader(Response response) { String version = response.protocol() == Protocol.HTTP_1_1 ? "1.1" : "2.0"; @@ -99,6 +110,13 @@ public class WarcProtocolReconstructor { Map.entry(511, "Network Authentication Required") ); + static private String getHeadersAsString(String headersBlob) { + StringJoiner joiner = new StringJoiner("\r\n"); + + Arrays.stream(headersBlob.split("\n")).forEach(joiner::add); + + return joiner.toString(); + } static private String getHeadersAsString(Response response) { StringJoiner joiner = new StringJoiner("\r\n"); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java index a8ee9cf9..3d4b5aaa 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java @@ -1,12 +1,14 @@ package nu.marginalia.crawl.retreival.fetcher.warc; import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; +import nu.marginalia.model.EdgeUrl; import okhttp3.OkHttpClient; import okhttp3.Request; import org.netpreserve.jwarc.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; @@ -16,6 +18,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.security.NoSuchAlgorithmException; import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** Based on JWarc's fetch method, APL 2.0 license *

@@ -24,6 +29,8 @@ import java.time.Instant; * be reconstructed. */ public class WarcRecorder implements AutoCloseable { + public static final URI revisitURI = URI.create("urn:marginalia:revisit"); + private static final int MAX_TIME = 30_000; private static final int MAX_SIZE = 1024 * 1024 * 10; private final WarcWriter writer; @@ -85,8 +92,6 @@ public class WarcRecorder implements AutoCloseable { inputStream = body.byteStream(); } - byte[] buf = new byte[8192]; - ip = IpInterceptingNetworkInterceptor.getIpFromResponse(response); String responseHeaders = WarcProtocolReconstructor.getResponseHeader(response); @@ -111,9 +116,6 @@ public class WarcRecorder implements AutoCloseable { responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n); totalLength += n; - responseDigestBuilder.update(buf, n); - payloadDigestBuilder.update(buf, n); - if (MAX_TIME > 0 && System.currentTimeMillis() - startMillis > MAX_TIME) { truncationReason = WarcTruncationReason.TIME; break; @@ -138,8 +140,6 @@ public class WarcRecorder implements AutoCloseable { // Build and write the response - long pos = writer.position(); - var warcResponse = responseBuilder.build(); warcResponse.http(); // force HTTP header to be parsed before body is consumed so that caller can use it writer.write(warcResponse); @@ -174,6 +174,59 @@ public class WarcRecorder implements AutoCloseable { } } + public void resync(WarcRecord item) throws IOException { + writer.write(item); + } + + /** + * Flag the given URL as skipped by the crawler, so that it will not be retried. + * Which URLs were skipped is still important when resynchronizing on the WARC file, + * so that the crawler can avoid re-fetching them. + * + * @param url The URL to flag + * @param headers + * @param documentBody + */ + public void flagAsSkipped(EdgeUrl url, String headers, int statusCode, String documentBody) { + try { + WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder(); + WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder(); + + String header = WarcProtocolReconstructor.getResponseHeader(headers, statusCode); + ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer(); + responseDataBuffer.put(header); + + responseDigestBuilder.update(header); + + try (var inputStream = new ByteArrayInputStream(documentBody.getBytes())) { + int remainingLength; + while ((remainingLength = responseDataBuffer.remaining()) > 0) { + int startPos = responseDataBuffer.pos(); + + int n = responseDataBuffer.readFrom(inputStream, remainingLength); + if (n < 0) + break; + + responseDataBuffer.updateDigest(responseDigestBuilder, startPos, n); + responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n); + } + } + + WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), revisitURI) + .blockDigest(responseDigestBuilder.build()) + .payloadDigest(payloadDigestBuilder.build()) + .date(Instant.now()) + .body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes()) + .build(); + + revisit.http(); // force HTTP header to be parsed before body is consumed so that caller can use it + + writer.write(revisit); + } catch (URISyntaxException | IOException | NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + private class ResponseDataBuffer { private final byte[] data; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java new file mode 100644 index 00000000..c77af845 --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java @@ -0,0 +1,123 @@ +package nu.marginalia.crawl.retreival.revisit; + +import crawlercommons.robots.SimpleRobotRules; +import nu.marginalia.crawl.retreival.CrawlDataReference; +import nu.marginalia.crawl.retreival.CrawlDelayTimer; +import nu.marginalia.crawl.retreival.CrawlerRetreiver; +import nu.marginalia.crawl.retreival.DomainCrawlFrontier; +import nu.marginalia.crawl.retreival.CrawledDocumentFactory; +import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; +import nu.marginalia.crawling.model.CrawledDocument; +import nu.marginalia.crawling.model.SerializableCrawlData; +import nu.marginalia.model.EdgeUrl; +import org.jsoup.Jsoup; + +import java.util.function.Consumer; + +/** This class encapsulates the logic for re-visiting a domain that has already been crawled. + * We may use information from the previous crawl to inform the next crawl, specifically the + * E-Tag and Last-Modified headers. + */ +public class CrawlerRevisitor { + /** recrawlState tag for documents that had a HTTP status 304 */ + public static final String documentWasRetainedTag = "RETAINED/304"; + + /** recrawlState tag for documents that had a 200 status but were identical to a previous version */ + public static final String documentWasSameTag = "SAME-BY-COMPARISON"; + + + private final DomainCrawlFrontier crawlFrontier; + private final Consumer crawledDomainWriter; + private final CrawlerRetreiver crawlerRetreiver; + private final WarcRecorder warcRecorder; + + public CrawlerRevisitor(DomainCrawlFrontier crawlFrontier, + Consumer crawledDomainWriter, + CrawlerRetreiver crawlerRetreiver, + WarcRecorder warcRecorder) { + this.crawlFrontier = crawlFrontier; + this.crawledDomainWriter = crawledDomainWriter; + this.crawlerRetreiver = crawlerRetreiver; + this.warcRecorder = warcRecorder; + } + + /** Performs a re-crawl of old documents, comparing etags and last-modified */ + public int recrawl(CrawlDataReference oldCrawlData, + SimpleRobotRules robotsRules, + CrawlDelayTimer delayTimer) { + int recrawled = 0; + int retained = 0; + + for (;;) { + CrawledDocument doc = oldCrawlData.nextDocument(); + + if (doc == null) { + break; + } + + // This Shouldn't Happen (TM) + var urlMaybe = EdgeUrl.parse(doc.url); + if (urlMaybe.isEmpty()) continue; + var url = urlMaybe.get(); + + // If we've previously 404:d on this URL, we'll refrain from trying to fetch it again + if (doc.httpStatus == 404) { + crawlFrontier.addVisited(url); + continue; + } + + if (doc.httpStatus != 200) continue; + + if (!robotsRules.isAllowed(url.toString())) { + crawledDomainWriter.accept(CrawledDocumentFactory.createRobotsError(url)); + continue; + } + if (!crawlFrontier.filterLink(url)) + continue; + if (!crawlFrontier.addVisited(url)) + continue; + + + if (recrawled > 5 + && retained > 0.9 * recrawled + && Math.random() < 0.9) + { + // Since it looks like most of these documents haven't changed, + // we'll load the documents directly; but we do this in a random + // fashion to make sure we eventually catch changes over time + // and ensure we discover new links + + crawledDomainWriter.accept(doc); + crawlFrontier.addVisited(url); + + // Hoover up any links from the document + if (doc.httpStatus == 200 && doc.documentBody != null) { + var parsedDoc = Jsoup.parse(doc.documentBody); + crawlFrontier.enqueueLinksFromDocument(url, parsedDoc); + } + + // Add a WARC record so we don't repeat this + warcRecorder.flagAsSkipped(url, doc.headers, doc.httpStatus, doc.documentBody); + + continue; + } + + + // GET the document with the stored document as a reference + // providing etag and last-modified headers, so we can recycle the + // document if it hasn't changed without actually downloading it + + var fetchedDocOpt = crawlerRetreiver.fetchWriteAndSleep(url, + delayTimer, + new DocumentWithReference(doc, oldCrawlData)); + if (fetchedDocOpt.isEmpty()) continue; + + if (documentWasRetainedTag.equals(fetchedDocOpt.get().recrawlState)) retained ++; + else if (documentWasSameTag.equals(fetchedDocOpt.get().recrawlState)) retained ++; + + recrawled ++; + } + + return recrawled; + } +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java new file mode 100644 index 00000000..e832541f --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java @@ -0,0 +1,82 @@ +package nu.marginalia.crawl.retreival.revisit; + +import nu.marginalia.crawl.retreival.CrawlDataReference; +import nu.marginalia.crawl.retreival.CrawlerRetreiver; +import nu.marginalia.crawl.retreival.fetcher.ContentTags; +import nu.marginalia.crawling.model.CrawledDocument; + +import javax.annotation.Nullable; +import java.time.LocalDateTime; + +public record DocumentWithReference( + @Nullable CrawledDocument doc, + @Nullable CrawlDataReference reference) { + + private static final DocumentWithReference emptyInstance = new DocumentWithReference(null, null); + + public static DocumentWithReference empty() { + return emptyInstance; + } + + public boolean isContentBodySame(CrawledDocument newDoc) { + if (reference == null) + return false; + if (doc == null) + return false; + if (doc.documentBody == null) + return false; + if (newDoc.documentBody == null) + return false; + + return reference.isContentBodySame(doc, newDoc); + } + + public ContentTags getContentTags() { + if (null == doc) + return ContentTags.empty(); + + String headers = doc.headers; + if (headers == null) + return ContentTags.empty(); + + String[] headersLines = headers.split("\n"); + + String lastmod = null; + String etag = null; + + for (String line : headersLines) { + if (line.toLowerCase().startsWith("etag:")) { + etag = line.substring(5).trim(); + } + if (line.toLowerCase().startsWith("last-modified:")) { + lastmod = line.substring(14).trim(); + } + } + + return new ContentTags(etag, lastmod); + } + + public boolean isEmpty() { + return doc == null || reference == null; + } + + /** + * If the provided document has HTTP status 304, and the reference document is provided, + * return the reference document; otherwise return the provided document. + */ + public CrawledDocument replaceOn304(CrawledDocument fetchedDoc) { + + if (doc == null) + return fetchedDoc; + + // HTTP status 304 is NOT MODIFIED, which means the document is the same as it was when + // we fetched it last time. We can recycle the reference document. + if (fetchedDoc.httpStatus != 304) + return fetchedDoc; + + var ret = doc; + ret.recrawlState = CrawlerRevisitor.documentWasRetainedTag; + ret.timestamp = LocalDateTime.now().toString(); + return ret; + } +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/sitemap/SitemapFetcher.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/sitemap/SitemapFetcher.java new file mode 100644 index 00000000..3ce33d64 --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/sitemap/SitemapFetcher.java @@ -0,0 +1,71 @@ +package nu.marginalia.crawl.retreival.sitemap; + +import crawlercommons.robots.SimpleRobotRules; +import nu.marginalia.crawl.retreival.DomainCrawlFrontier; +import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever; +import nu.marginalia.model.EdgeUrl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class SitemapFetcher { + + private final DomainCrawlFrontier crawlFrontier; + private final SitemapRetriever sitemapRetriever; + private static final Logger logger = LoggerFactory.getLogger(SitemapFetcher.class); + + public SitemapFetcher(DomainCrawlFrontier crawlFrontier, SitemapRetriever sitemapRetriever) { + this.crawlFrontier = crawlFrontier; + this.sitemapRetriever = sitemapRetriever; + } + + public void downloadSitemaps(SimpleRobotRules robotsRules, EdgeUrl rootUrl) { + List sitemaps = robotsRules.getSitemaps(); + + List urls = new ArrayList<>(sitemaps.size()); + if (!sitemaps.isEmpty()) { + for (var url : sitemaps) { + EdgeUrl.parse(url).ifPresent(urls::add); + } + } + else { + urls.add(rootUrl.withPathAndParam("/sitemap.xml", null)); + } + + downloadSitemaps(urls); + } + + public void downloadSitemaps(List urls) { + + Set checkedSitemaps = new HashSet<>(); + + for (var url : urls) { + // Let's not download sitemaps from other domains for now + if (!crawlFrontier.isSameDomain(url)) { + continue; + } + + if (checkedSitemaps.contains(url.path)) + continue; + + var sitemap = sitemapRetriever.fetchSitemap(url); + if (sitemap.isEmpty()) { + continue; + } + + // ensure we don't try to download this sitemap again + // (don't move this up, as we may want to check the same + // path with different protocols until we find one that works) + + checkedSitemaps.add(url.path); + + crawlFrontier.addAllToQueue(sitemap); + } + + logger.debug("Queue is now {}", crawlFrontier.queueSize()); + } +} diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizerTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizerTest.java new file mode 100644 index 00000000..ae3d9be4 --- /dev/null +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizerTest.java @@ -0,0 +1,88 @@ +package nu.marginalia.crawl.retreival; + +import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; +import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.model.EdgeUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.netpreserve.jwarc.WarcReader; +import org.netpreserve.jwarc.WarcRequest; +import org.netpreserve.jwarc.WarcResponse; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.zip.GZIPInputStream; + +import static org.junit.jupiter.api.Assertions.*; + +class CrawlerWarcResynchronizerTest { + Path fileName; + Path outputFile; + OkHttpClient httpClient; + @BeforeEach + public void setUp() throws Exception { + httpClient = new OkHttpClient.Builder() + .addNetworkInterceptor(new IpInterceptingNetworkInterceptor()) + .build(); + + fileName = Files.createTempFile("test", ".warc.gz"); + outputFile = Files.createTempFile("test", ".warc.gz"); + } + + @AfterEach + public void tearDown() throws Exception { + Files.deleteIfExists(fileName); + Files.deleteIfExists(outputFile); + } + + @Test + void run() throws IOException, URISyntaxException { + try (var oldRecorder = new WarcRecorder(fileName)) { + fetchUrl(oldRecorder, "https://www.marginalia.nu/"); + fetchUrl(oldRecorder, "https://www.marginalia.nu/log/"); + fetchUrl(oldRecorder, "https://www.marginalia.nu/feed/"); + } catch (Exception e) { + fail(e); + } + + var crawlFrontier = new DomainCrawlFrontier(new EdgeDomain("www.marginalia.nu"), List.of(), 100); + + try (var newRecorder = new WarcRecorder(outputFile)) { + new CrawlerWarcResynchronizer(crawlFrontier, newRecorder).run(fileName); + } + + assertTrue(crawlFrontier.isVisited(new EdgeUrl("https://www.marginalia.nu/"))); + assertTrue(crawlFrontier.isVisited(new EdgeUrl("https://www.marginalia.nu/log/"))); + assertTrue(crawlFrontier.isVisited(new EdgeUrl("https://www.marginalia.nu/feed/"))); + + try (var warcReader = new WarcReader(outputFile)) { + for (var item : warcReader) { + if (item instanceof WarcRequest req) { + System.out.println("req:" + req.target()); + } + if (item instanceof WarcResponse rsp) { + System.out.println("req:" + rsp.target()); + } + } + } + + new GZIPInputStream(Files.newInputStream(outputFile)).transferTo(System.out); + } + + void fetchUrl(WarcRecorder recorder, String url) throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException { + var req = new Request.Builder().url(url) + .addHeader("User-agent", "test.marginalia.nu") + .addHeader("Accept-Encoding", "gzip") + .get().build(); + recorder.fetch(httpClient, req); + } +} \ No newline at end of file diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java index 80c1218d..55f2eebe 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java @@ -2,6 +2,7 @@ package nu.marginalia.crawl.retreival.fetcher; import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; +import nu.marginalia.model.EdgeUrl; import okhttp3.OkHttpClient; import okhttp3.Request; import org.junit.jupiter.api.AfterEach; @@ -66,4 +67,33 @@ class WarcRecorderTest { assertEquals("https://www.marginalia.nu/", sampleData.get("request")); assertEquals("https://www.marginalia.nu/", sampleData.get("response")); } + + @Test + public void flagAsSkipped() throws IOException, URISyntaxException { + + try (var recorder = new WarcRecorder(fileName)) { + recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"), + """ + Content-type: text/html + X-Cookies: 1 + """, + 200, + "test"); + } + + try (var reader = new WarcReader(fileName)) { + for (var record : reader) { + if (record instanceof WarcResponse rsp) { + assertEquals("https://www.marginalia.nu/", rsp.target()); + assertEquals("text/html", rsp.contentType().type()); + assertEquals(200, rsp.http().status()); + assertEquals("1", rsp.http().headers().first("X-Cookies").orElse(null)); + } + } + } + + new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out); + } + + } \ No newline at end of file diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java index e5264301..2a00e6de 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URISyntaxException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -151,5 +152,6 @@ public class CrawlerMockFetcherTest { public SitemapRetriever createSitemapRetriever() { return Mockito.mock(SitemapRetriever.class); } + } }