(crawler) Clean up and refactor the code a bit

This commit is contained in:
Viktor Lofgren 2023-07-23 18:59:14 +02:00
parent c069c8c182
commit 69f333c0bf
6 changed files with 190 additions and 162 deletions

View File

@ -1,24 +1,19 @@
package nu.marginalia.crawl.retreival;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import nu.marginalia.bigstring.BigString;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.lsh.EasyLSH;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.*;
/** A reference to a domain that has been crawled before. */
public class CrawlDataReference {
private final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class);
private final Iterator<SerializableCrawlData> data;
private final HashFunction hashFunction = Hashing.murmur3_128();
public CrawlDataReference(Iterator<SerializableCrawlData> data) {
this.data = data;
@ -38,7 +33,7 @@ public class CrawlDataReference {
return null;
}
public boolean isContentSame(CrawledDocument one, CrawledDocument other) {
public boolean isContentBodySame(CrawledDocument one, CrawledDocument other) {
assert one.documentBody != null;
assert other.documentBody != null;
@ -48,13 +43,15 @@ public class CrawlDataReference {
return EasyLSH.hammingDistance(contentHashOne, contentHashOther) < 4;
}
private long contentHash(BigString documentBody) {
String content = documentBody.decode();
EasyLSH hash = new EasyLSH();
int next = 0;
boolean isInTag = false;
// In a naive best-effort fashion, extract the text
// content of the document and feed it into the LSH
for (int i = 0; i < content.length(); i++) {
char c = content.charAt(i);
if (c == '<') {
@ -62,12 +59,17 @@ public class CrawlDataReference {
} else if (c == '>') {
isInTag = false;
} else if (!isInTag) {
next = (next << 8) | (byte) c;
hash.addHashUnordered(hashFunction.hashInt(next).asInt());
next = (next << 8) | (c & 0xff);
hash.addHashUnordered(hashInt(next));
}
}
return hash.get();
}
private final HashFunction hashFunction = Hashing.murmur3_128();
private int hashInt(int v) {
return hashFunction.hashInt(v).asInt();
}
}

View File

@ -0,0 +1,57 @@
package nu.marginalia.crawl.retreival;
import lombok.SneakyThrows;
import static java.lang.Math.max;
import static java.lang.Math.min;
public class CrawlDelayTimer {
// When no crawl delay is specified, lean toward twice the fetch+process time, within these limits:
private static final long DEFAULT_CRAWL_DELAY_MIN_MS = Long.getLong("defaultCrawlDelay", 1000);
private static final long DEFAULT_CRAWL_DELAY_MAX_MS = Long.getLong("defaultCrawlDelaySlow", 2500);
/** Flag to indicate that the crawler should slow down, e.g. from 429s */
private boolean slowDown = false;
private final long delayTime;
public CrawlDelayTimer(long delayTime) {
this.delayTime = delayTime;
}
@SneakyThrows
public void delay(long spentTime) {
long sleepTime = delayTime;
if (sleepTime >= 1) {
if (spentTime > sleepTime)
return;
Thread.sleep(min(sleepTime - spentTime, 5000));
}
else if (slowDown) {
// Additional delay when the server is signalling it wants slower requests
Thread.sleep( DEFAULT_CRAWL_DELAY_MIN_MS);
}
else {
// When no crawl delay is specified, lean toward twice the fetch+process time,
// within sane limits. This means slower servers get slower crawling, and faster
// servers get faster crawling.
sleepTime = spentTime * 2;
sleepTime = min(sleepTime, DEFAULT_CRAWL_DELAY_MAX_MS);
sleepTime = max(sleepTime, DEFAULT_CRAWL_DELAY_MIN_MS);
if (spentTime > sleepTime)
return;
Thread.sleep(sleepTime - spentTime);
}
}
/** Increase the delay between requests if the server is signalling it wants slower requests with HTTP 429 */
public void slowDown() {
slowDown = true;
}
}

View File

@ -11,7 +11,6 @@ import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.crawling.model.*;
import nu.marginalia.ip_blocklist.UrlBlocklist;
import nu.marginalia.lsh.EasyLSH;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.jsoup.Jsoup;
@ -26,25 +25,12 @@ import java.time.LocalDateTime;
import java.util.*;
import java.util.function.Consumer;
import static java.lang.Math.max;
import static java.lang.Math.min;
public class CrawlerRetreiver {
private static final long DEFAULT_CRAWL_DELAY_MIN_MS = Long.getLong("defaultCrawlDelay", 1000);
private static final long DEFAULT_CRAWL_DELAY_MAX_MS = Long.getLong("defaultCrawlDelaySlow", 2500);
private static final int MAX_ERRORS = 20;
private final HttpFetcher fetcher;
/** Flag to indicate that the crawler should slow down, e.g. from 429s */
private boolean slowDown = false;
/** Testing flag to disable crawl delay (otherwise crawler tests take several minutes) */
private boolean testFlagIgnoreDelay = false;
private final String id;
private final String domain;
private final Consumer<SerializableCrawlData> crawledDomainWriter;
@ -61,7 +47,12 @@ public class CrawlerRetreiver {
private final DomainCrawlFrontier crawlFrontier;
int errorCount = 0;
private String retainedTag = "RETAINED/304";
/** recrawlState tag for documents that had a HTTP status 304 */
private static final String documentWasRetainedTag = "RETAINED/304";
/** recrawlState tag for documents that had a 200 status but were identical to a previous version */
private static final String documentWasSameTag = "SAME-BY-COMPARISON";
public CrawlerRetreiver(HttpFetcher fetcher,
CrawlingSpecification specs,
@ -91,11 +82,6 @@ public class CrawlerRetreiver {
}
}
public CrawlerRetreiver withNoDelay() {
testFlagIgnoreDelay = true;
return this;
}
public int fetch() {
return fetch(new CrawlDataReference());
}
@ -146,13 +132,13 @@ public class CrawlerRetreiver {
assert !crawlFrontier.isEmpty();
var robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain);
long crawlDelay = robotsRules.getCrawlDelay();
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
sniffRootDocument();
sniffRootDocument(delayTimer);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
int recrawled = recrawl(oldCrawlData, robotsRules, crawlDelay);
int recrawled = recrawl(oldCrawlData, robotsRules, delayTimer);
if (recrawled > 0) {
// If we have reference data, we will always grow the crawl depth a bit
@ -195,7 +181,7 @@ public class CrawlerRetreiver {
continue;
if (fetchDocument(top, null, crawlDelay).isPresent()) {
if (fetchWriteAndSleep(top, delayTimer, DocumentWithReference.empty()).isPresent()) {
fetchedCount++;
}
}
@ -207,9 +193,10 @@ public class CrawlerRetreiver {
return fetchedCount;
}
/** Performs a re-crawl of old documents, comparing etags and last-modified */
private int recrawl(CrawlDataReference oldCrawlData,
SimpleRobotRules robotsRules,
long crawlDelay) {
CrawlDelayTimer delayTimer) {
int recrawled = 0;
int retained = 0;
@ -247,8 +234,6 @@ public class CrawlerRetreiver {
&& retained > 0.9 * recrawled
&& Math.random() < 0.75)
{
logger.info("Direct-loading {}", url);
// Since it looks like most of these documents haven't changed,
// we'll load the documents directly; but we do this in a random
// fashion to make sure we eventually catch changes over time
@ -263,15 +248,13 @@ public class CrawlerRetreiver {
// providing etag and last-modified headers, so we can recycle the
// document if it hasn't changed without actually downloading it
var fetchedDocOpt = fetchDocument(url, doc, crawlDelay);
var fetchedDocOpt = fetchWriteAndSleep(url,
delayTimer,
new DocumentWithReference(doc, oldCrawlData));
if (fetchedDocOpt.isEmpty()) continue;
if (Objects.equals(fetchedDocOpt.get().recrawlState, retainedTag)) {
retained ++;
}
else if (oldCrawlData.isContentSame(doc, fetchedDocOpt.get())) {
retained ++;
}
if (documentWasRetainedTag.equals(fetchedDocOpt.get().recrawlState)) retained ++;
else if (documentWasSameTag.equals(fetchedDocOpt.get().recrawlState)) retained ++;
recrawled ++;
}
@ -279,18 +262,6 @@ public class CrawlerRetreiver {
return recrawled;
}
private static final HashFunction hasher = Hashing.murmur3_128(0);
private long hashDoc(CrawledDocument doc) {
var hash = new EasyLSH();
long val = 0;
for (var b : doc.documentBody.decode().getBytes()) {
val = val << 8 | (b & 0xFF);
hash.addUnordered(hasher.hashLong(val).asLong());
}
return hash.get();
}
private void downloadSitemaps(SimpleRobotRules robotsRules) {
List<String> sitemaps = robotsRules.getSitemaps();
if (sitemaps.isEmpty()) {
@ -337,13 +308,13 @@ public class CrawlerRetreiver {
logger.debug("Queue is now {}", crawlFrontier.queueSize());
}
private void sniffRootDocument() {
private void sniffRootDocument(CrawlDelayTimer delayTimer) {
try {
logger.debug("Configuring link filter");
var url = crawlFrontier.peek().withPathAndParam("/", null);
var maybeSample = fetchUrl(url, null).filter(sample -> sample.httpStatus == 200);
var maybeSample = fetchUrl(url, delayTimer, DocumentWithReference.empty()).filter(sample -> sample.httpStatus == 200);
if (maybeSample.isEmpty())
return;
var sample = maybeSample.get();
@ -379,33 +350,41 @@ public class CrawlerRetreiver {
}
}
private Optional<CrawledDocument> fetchDocument(EdgeUrl top,
@Nullable CrawledDocument reference,
long crawlDelay) {
private Optional<CrawledDocument> fetchWriteAndSleep(EdgeUrl top,
CrawlDelayTimer timer,
DocumentWithReference reference) {
logger.debug("Fetching {}", top);
long startTime = System.currentTimeMillis();
var doc = fetchUrl(top, reference);
if (doc.isPresent()) {
var d = doc.get();
crawledDomainWriter.accept(d);
var docOpt = fetchUrl(top, timer, reference);
if (d.url != null) {
// We may have redirected to a different path
EdgeUrl.parse(d.url).ifPresent(crawlFrontier::addVisited);
if (docOpt.isPresent()) {
var doc = docOpt.get();
if (!Objects.equals(doc.recrawlState, documentWasRetainedTag)
&& reference.isContentBodySame(doc))
{
// The document didn't change since the last time
doc.recrawlState = documentWasSameTag;
}
if ("ERROR".equals(d.crawlerStatus) && d.httpStatus != 404) {
crawledDomainWriter.accept(doc);
if (doc.url != null) {
// We may have redirected to a different path
EdgeUrl.parse(doc.url).ifPresent(crawlFrontier::addVisited);
}
if ("ERROR".equals(doc.crawlerStatus) && doc.httpStatus != 404) {
errorCount++;
}
}
long crawledTime = System.currentTimeMillis() - startTime;
delay(crawlDelay, crawledTime);
timer.delay(System.currentTimeMillis() - startTime);
return doc;
return docOpt;
}
private boolean isAllowedProtocol(String proto) {
@ -413,35 +392,23 @@ public class CrawlerRetreiver {
|| proto.equalsIgnoreCase("https");
}
private Optional<CrawledDocument> fetchUrl(EdgeUrl top, @Nullable CrawledDocument reference) {
private Optional<CrawledDocument> fetchUrl(EdgeUrl top, CrawlDelayTimer timer, DocumentWithReference reference) {
try {
var contentTags = getContentTags(reference);
var fetchedDoc = fetchContent(top, contentTags);
CrawledDocument doc;
var contentTags = reference.getContentTags();
var fetchedDoc = tryDownload(top, timer, contentTags);
// HTTP status 304 is NOT MODIFIED, which means the document is the same as it was when
// we fetched it last time. We can recycle the reference document.
if (reference != null
&& fetchedDoc.httpStatus == 304)
{
doc = reference;
doc.recrawlState = retainedTag;
doc.timestamp = LocalDateTime.now().toString();
}
else {
doc = fetchedDoc;
}
CrawledDocument doc = reference.replaceOn304(fetchedDoc);
if (doc.documentBody != null) {
var decoded = doc.documentBody.decode();
doc.documentBodyHash = createHash(decoded);
Optional<Document> parsedDoc = parseDoc(decoded);
var parsedDoc = Jsoup.parse(decoded);
EdgeUrl url = new EdgeUrl(doc.url);
parsedDoc.ifPresent(parsed -> findLinks(url, parsed));
parsedDoc.flatMap(parsed -> findCanonicalUrl(url, parsed))
findLinks(url, parsedDoc);
findCanonicalUrl(url, parsedDoc)
.ifPresent(canonicalLink -> doc.canonicalUrl = canonicalLink.toString());
}
@ -455,33 +422,9 @@ public class CrawlerRetreiver {
}
private ContentTags getContentTags(@Nullable CrawledDocument reference) {
if (null == reference)
return ContentTags.empty();
String headers = reference.headers;
if (headers == null)
return ContentTags.empty();
String[] headersLines = headers.split("\n");
String lastmod = null;
String etag = null;
for (String line : headersLines) {
if (line.toLowerCase().startsWith("etag:")) {
etag = line.substring(5).trim();
}
if (line.toLowerCase().startsWith("last-modified:")) {
lastmod = line.substring(14).trim();
}
}
return new ContentTags(etag, lastmod);
}
@SneakyThrows
private CrawledDocument fetchContent(EdgeUrl top, ContentTags tags) {
private CrawledDocument tryDownload(EdgeUrl top, CrawlDelayTimer timer, ContentTags tags) {
for (int i = 0; i < 2; i++) {
try {
var doc = fetcher.fetchContent(top, tags);
@ -489,7 +432,8 @@ public class CrawlerRetreiver {
return doc;
}
catch (RateLimitException ex) {
slowDown = true;
timer.slowDown();
int delay = ex.retryAfter();
if (delay > 0 && delay < 5000) {
Thread.sleep(delay);
@ -504,10 +448,6 @@ public class CrawlerRetreiver {
return hashMethod.hashUnencodedChars(documentBodyHash).toString();
}
private Optional<Document> parseDoc(String decoded) {
return Optional.of(Jsoup.parse(decoded));
}
private void findLinks(EdgeUrl baseUrl, Document parsed) {
baseUrl = linkParser.getBaseLink(parsed, baseUrl);
@ -547,36 +487,6 @@ public class CrawlerRetreiver {
}
}
@SneakyThrows
private void delay(long sleepTime, long spentTime) {
if (testFlagIgnoreDelay)
return;
if (sleepTime >= 1) {
if (spentTime > sleepTime)
return;
Thread.sleep(min(sleepTime - spentTime, 5000));
}
else if (slowDown) {
Thread.sleep( 1000);
}
else {
// When no crawl delay is specified, lean toward twice the fetch+process time,
// within sane limits. This means slower servers get slower crawling, and faster
// servers get faster crawling.
sleepTime = spentTime * 2;
sleepTime = min(sleepTime, DEFAULT_CRAWL_DELAY_MAX_MS);
sleepTime = max(sleepTime, DEFAULT_CRAWL_DELAY_MIN_MS);
if (spentTime > sleepTime)
return;
Thread.sleep(sleepTime - spentTime);
}
}
private CrawledDocument createRobotsError(EdgeUrl url) {
return CrawledDocument.builder()
.url(url.toString())
@ -594,12 +504,71 @@ public class CrawlerRetreiver {
.build();
}
private record DocumentWithReference(
@Nullable CrawledDocument doc,
@Nullable CrawlDataReference reference) {
enum CrawlDataComparison {
NO_OLD_DATA,
SMALL_SAMPLE,
CHANGES_FOUND,
NO_CHANGES
};
private static final DocumentWithReference emptyInstance = new DocumentWithReference(null, null);
public static DocumentWithReference empty() {
return emptyInstance;
}
public boolean isContentBodySame(CrawledDocument newDoc) {
if (reference == null)
return false;
if (doc == null)
return false;
return reference.isContentBodySame(doc, newDoc);
}
private ContentTags getContentTags() {
if (null == doc)
return ContentTags.empty();
String headers = doc.headers;
if (headers == null)
return ContentTags.empty();
String[] headersLines = headers.split("\n");
String lastmod = null;
String etag = null;
for (String line : headersLines) {
if (line.toLowerCase().startsWith("etag:")) {
etag = line.substring(5).trim();
}
if (line.toLowerCase().startsWith("last-modified:")) {
lastmod = line.substring(14).trim();
}
}
return new ContentTags(etag, lastmod);
}
public boolean isEmpty() {
return doc == null || reference == null;
}
/** If the provided document has HTTP status 304, and the reference document is provided,
* return the reference document; otherwise return the provided document.
*/
public CrawledDocument replaceOn304(CrawledDocument fetchedDoc) {
if (doc == null)
return fetchedDoc;
// HTTP status 304 is NOT MODIFIED, which means the document is the same as it was when
// we fetched it last time. We can recycle the reference document.
if (fetchedDoc.httpStatus != 304)
return fetchedDoc;
var ret = doc;
ret.recrawlState = documentWasRetainedTag;
ret.timestamp = LocalDateTime.now().toString();
return ret;
}
}
}

View File

@ -80,7 +80,7 @@ public class DomainCrawlFrontier {
return;
// reduce memory usage by not growing queue huge when crawling large sites
if (queue.size() + visited.size() >= depth + 100)
if (queue.size() + visited.size() >= depth + 1000)
return;
if (visited.contains(url.toString()))

View File

@ -71,7 +71,6 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "startrek.website", new ArrayList<>()), out::add)
.withNoDelay()
.fetch();
out.forEach(System.out::println);
@ -84,7 +83,6 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "en.wikipedia.org", new ArrayList<>()), out::add)
.withNoDelay()
.fetch();
out.forEach(System.out::println);
@ -99,7 +97,6 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 100, "community.tt-rss.org", new ArrayList<>()), out::add)
.withNoDelay()
.fetch();
out.forEach(System.out::println);

View File

@ -118,13 +118,16 @@ class CrawlerRetreiverTest {
Path out = Files.createTempDirectory("crawling-process");
var writer = new CrawledDomainWriter(out, "www.marginalia.nu", "123456");
var writer = new CrawledDomainWriter(out, specs.domain, specs.id);
Map<Class<? extends SerializableCrawlData>, List<SerializableCrawlData>> data = new HashMap<>();
new CrawlerRetreiver(httpFetcher, specs, d -> {
data.computeIfAbsent(d.getClass(), k->new ArrayList<>()).add(d);
if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
if (Math.random() > 0.5) {
doc.headers = "";
}
}
writer.accept(d);
}).fetch();