(crawler) Clean up crawl data reference and recrawl logic

This commit is contained in:
Viktor Lofgren 2023-07-22 18:36:29 +02:00
parent 9e4aa7da7c
commit c069c8c182
9 changed files with 94 additions and 144 deletions

View File

@ -6,6 +6,7 @@ import lombok.SneakyThrows;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.model.gson.GsonFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,7 +31,10 @@ public class CrawledDomainReader {
public CrawledDomainReader() {
}
public Iterator<SerializableCrawlData> createIterator(Path path) throws IOException {
public Iterator<SerializableCrawlData> createIterator(Path basePath, CrawlingSpecification spec) throws IOException {
final var path = CrawlerOutputFile.getOutputFile(basePath, spec.id, spec.domain);
BufferedReader br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(path.toFile()))));
return new Iterator<>() {

View File

@ -18,8 +18,6 @@ public class CrawlingSpecification {
public String domain;
public List<String> urls;
public CrawledDomain oldData;
@Override
public String toString() {
return String.format(getClass().getSimpleName() + "[" + id + "/" + domain + ": " + crawlDepth + "[ " + urls.size() + "]");

View File

@ -8,22 +8,15 @@ import java.util.concurrent.Semaphore;
public class CrawlLimiter {
public static final int maxPoolSize = Integer.getInteger("crawler.pool-size", 512);
public record CrawlTaskLimits(Path refreshPath, boolean isRefreshable, int taskSize) {}
private final Semaphore taskSemCount = new Semaphore(maxPoolSize);
public CrawlTaskLimits getTaskLimits(Path fileName) {
return new CrawlTaskLimits(fileName, true, 1);
}
public void acquire(CrawlTaskLimits properties) throws InterruptedException {
public void acquire() throws InterruptedException {
// It's very important that we acquire the RAM semaphore first to avoid a deadlock
taskSemCount.acquire(1);
}
public void release(CrawlTaskLimits properties) {
public void release() {
taskSemCount.release(1);
}
}

View File

@ -6,6 +6,7 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.CrawlerOutputFile;
@ -173,49 +174,37 @@ public class CrawlerMain implements AutoCloseable {
return;
}
var limits = crawlLimiter.getTaskLimits(CrawlerOutputFile.getOutputFile(crawlDataDir, crawlingSpecification));
try {
crawlLimiter.acquire(limits);
crawlLimiter.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
pool.execute(() -> {
try {
fetchDomain(crawlingSpecification, limits);
fetchDomain(crawlingSpecification);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
}
finally {
crawlLimiter.release(limits);
crawlLimiter.release();
}
});
}
private void fetchDomain(CrawlingSpecification specification, CrawlLimiter.CrawlTaskLimits limits) {
private void fetchDomain(CrawlingSpecification specification) {
if (workLog.isJobFinished(specification.id))
return;
HttpFetcher fetcher = new HttpFetcherImpl(userAgent.uaString(), dispatcher, connectionPool);
Iterator<SerializableCrawlData> iterator;
try {
if (limits.isRefreshable()) {
iterator = reader.createIterator(limits.refreshPath());
}
else {
iterator = Collections.emptyIterator();
}
} catch (IOException e) {
logger.warn("Failed to read previous crawl data for {}", specification.domain);
iterator = Collections.emptyIterator();
}
try (CrawledDomainWriter writer = new CrawledDomainWriter(crawlDataDir, specification.domain, specification.id)) {
var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept);
int size = retreiver.fetch(iterator);
CrawlDataReference reference = getReference(specification);
int size = retreiver.fetch(reference);
workLog.setJobToFinished(specification.id, writer.getOutputFile().toString(), size);
@ -225,6 +214,16 @@ public class CrawlerMain implements AutoCloseable {
}
}
private CrawlDataReference getReference(CrawlingSpecification specification) {
try {
var iterator = reader.createIterator(crawlDataDir, specification);
return new CrawlDataReference(iterator);
} catch (IOException e) {
logger.warn("Failed to read previous crawl data for {}", specification.domain);
return new CrawlDataReference();
}
}
private static class CrawlRequest {
private final CrawlPlan plan;
private final MqMessage message;

View File

@ -1,123 +1,73 @@
package nu.marginalia.crawl.retreival;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import nu.marginalia.bigstring.BigString;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.lsh.EasyLSH;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import javax.annotation.Nullable;
import java.util.*;
import java.util.stream.Collectors;
/** A reference to a domain that has been crawled before. */
public class CrawlDataReference {
private final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class);
final Map<EdgeUrl, CrawledDocument> documents;
final Map<EdgeUrl, String> etags;
final Map<EdgeUrl, String> lastModified;
final Set<EdgeUrl> previouslyDeadUrls = new HashSet<>();
CrawlDataReference(CrawledDomain referenceDomain) {
private final Iterator<SerializableCrawlData> data;
private final HashFunction hashFunction = Hashing.murmur3_128();
if (referenceDomain == null || referenceDomain.doc == null) {
documents = Collections.emptyMap();
etags = Collections.emptyMap();
lastModified = Collections.emptyMap();
return;
public CrawlDataReference(Iterator<SerializableCrawlData> data) {
this.data = data;
}
documents = new HashMap<>(referenceDomain.doc.size());
etags = new HashMap<>(referenceDomain.doc.size());
lastModified = new HashMap<>(referenceDomain.doc.size());
for (var doc : referenceDomain.doc) {
try {
addReference(doc);
} catch (URISyntaxException ex) {
logger.warn("Failed to add reference document {}", doc.url);
}
}
public CrawlDataReference() {
this(Collections.emptyIterator());
}
private void addReference(CrawledDocument doc) throws URISyntaxException {
var url = new EdgeUrl(doc.url);
if (doc.httpStatus == 404) {
previouslyDeadUrls.add(url);
return;
@Nullable
public CrawledDocument nextDocument() {
while (data.hasNext()) {
if (data.next() instanceof CrawledDocument doc) {
return doc;
}
}
return null;
}
if (doc.httpStatus != 200) {
return;
public boolean isContentSame(CrawledDocument one, CrawledDocument other) {
assert one.documentBody != null;
assert other.documentBody != null;
final long contentHashOne = contentHash(one.documentBody);
final long contentHashOther = contentHash(other.documentBody);
return EasyLSH.hammingDistance(contentHashOne, contentHashOther) < 4;
}
documents.put(url, doc);
private long contentHash(BigString documentBody) {
String content = documentBody.decode();
EasyLSH hash = new EasyLSH();
int next = 0;
String headers = doc.headers;
if (headers != null) {
String[] headersLines = headers.split("\n");
String lastmod = null;
String etag = null;
for (String line : headersLines) {
if (line.toLowerCase().startsWith("etag:")) {
etag = line.substring(5).trim();
}
if (line.toLowerCase().startsWith("last-modified:")) {
lastmod = line.substring(14).trim();
boolean isInTag = false;
for (int i = 0; i < content.length(); i++) {
char c = content.charAt(i);
if (c == '<') {
isInTag = true;
} else if (c == '>') {
isInTag = false;
} else if (!isInTag) {
next = (next << 8) | (byte) c;
hash.addHashUnordered(hashFunction.hashInt(next).asInt());
}
}
if (lastmod != null) {
lastModified.put(url, lastmod);
}
if (etag != null) {
etags.put(url, etag);
}
}
return hash.get();
}
public boolean isPreviouslyDead(EdgeUrl url) {
return previouslyDeadUrls.contains(url);
}
public int size() {
return documents.size();
}
public String getEtag(EdgeUrl url) {
return etags.get(url);
}
public String getLastModified(EdgeUrl url) {
return lastModified.get(url);
}
public Map<EdgeUrl, CrawledDocument> allDocuments() {
return documents;
}
public Map<EdgeUrl, CrawledDocument> sample(int sampleSize) {
return documents.entrySet().stream().limit(sampleSize).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public void evict() {
documents.clear();
etags.clear();
lastModified.clear();
}
public CrawledDocument getDoc(EdgeUrl top) {
return documents.get(top);
}
// This bit of manual housekeeping is needed to keep the memory footprint low
public void dispose(EdgeUrl url) {
documents.remove(url);
etags.remove(url);
lastModified.remove(url);
}
}

View File

@ -97,10 +97,10 @@ public class CrawlerRetreiver {
}
public int fetch() {
return fetch(Collections.emptyIterator());
return fetch(new CrawlDataReference());
}
public int fetch(Iterator<SerializableCrawlData> oldCrawlData) {
public int fetch(CrawlDataReference oldCrawlData) {
final DomainProber.ProbeResult probeResult = domainProber.probeDomain(fetcher, domain, crawlFrontier.peek());
if (probeResult instanceof DomainProber.ProbeResultOk) {
@ -141,7 +141,7 @@ public class CrawlerRetreiver {
throw new IllegalStateException("Unknown probe result: " + probeResult);
};
private int crawlDomain(Iterator<SerializableCrawlData> oldCrawlData) {
private int crawlDomain(CrawlDataReference oldCrawlData) {
String ip = findIp(domain);
assert !crawlFrontier.isEmpty();
@ -207,14 +207,18 @@ public class CrawlerRetreiver {
return fetchedCount;
}
private int recrawl(Iterator<SerializableCrawlData> oldCrawlData,
private int recrawl(CrawlDataReference oldCrawlData,
SimpleRobotRules robotsRules,
long crawlDelay) {
int recrawled = 0;
int retained = 0;
while (oldCrawlData.hasNext()) {
if (!(oldCrawlData.next() instanceof CrawledDocument doc)) continue;
for (;;) {
CrawledDocument doc = oldCrawlData.nextDocument();
if (doc == null) {
break;
}
// This Shouldn't Happen (TM)
var urlMaybe = EdgeUrl.parse(doc.url);
@ -265,6 +269,9 @@ public class CrawlerRetreiver {
if (Objects.equals(fetchedDocOpt.get().recrawlState, retainedTag)) {
retained ++;
}
else if (oldCrawlData.isContentSame(doc, fetchedDocOpt.get())) {
retained ++;
}
recrawled ++;
}

View File

@ -70,7 +70,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://startrek.website/c/startrek"), "mock-crawl-data/lemmy/c_startrek.html");
registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "startrek.website", new ArrayList<>(), null), out::add)
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "startrek.website", new ArrayList<>()), out::add)
.withNoDelay()
.fetch();
@ -83,7 +83,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "en.wikipedia.org", new ArrayList<>(), null), out::add)
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "en.wikipedia.org", new ArrayList<>()), out::add)
.withNoDelay()
.fetch();
@ -98,7 +98,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/telegram-channel-to-idle-on/3501"), "mock-crawl-data/discourse/telegram.html");
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 100, "community.tt-rss.org", new ArrayList<>(), null), out::add)
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 100, "community.tt-rss.org", new ArrayList<>()), out::add)
.withNoDelay()
.fetch();

View File

@ -2,6 +2,7 @@ package nu.marginalia.crawling.retreival;
import lombok.SneakyThrows;
import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
@ -109,7 +110,7 @@ class CrawlerRetreiverTest {
var specs = CrawlingSpecification
.builder()
.id("whatever")
.id("123456")
.crawlDepth(12)
.domain("www.marginalia.nu")
.urls(List.of("https://www.marginalia.nu/some-dead-link"))
@ -117,7 +118,7 @@ class CrawlerRetreiverTest {
Path out = Files.createTempDirectory("crawling-process");
var writer = new CrawledDomainWriter(out, "test", "123456");
var writer = new CrawledDomainWriter(out, "www.marginalia.nu", "123456");
Map<Class<? extends SerializableCrawlData>, List<SerializableCrawlData>> data = new HashMap<>();
new CrawlerRetreiver(httpFetcher, specs, d -> {
@ -130,18 +131,16 @@ class CrawlerRetreiverTest {
writer.close();
var reader = new CrawledDomainReader();
var iter = reader.createIterator(CrawlerOutputFile.getOutputFile(out, "123456", "test"));
var iter = reader.createIterator(out, specs);
CrawledDomain domain = (CrawledDomain) data.get(CrawledDomain.class).get(0);
domain.doc = data.get(CrawledDocument.class).stream().map(CrawledDocument.class::cast).collect(Collectors.toList());
var newSpec = specs.withOldData(domain);
new CrawlerRetreiver(httpFetcher, newSpec, d -> {
new CrawlerRetreiver(httpFetcher, specs, d -> {
if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
}
}).fetch(iter);
}).fetch(new CrawlDataReference(iter));
}
}

View File

@ -31,9 +31,9 @@ public class CrawlJobSpecWriterTest {
@Test
public void testReadWrite() throws IOException {
try (CrawlJobSpecWriter writer = new CrawlJobSpecWriter(tempFile)) {
writer.accept(new CrawlingSpecification("first",1, "test1", List.of("a", "b", "c"), null));
writer.accept(new CrawlingSpecification("second",1, "test2", List.of("a", "b", "c", "d"), null));
writer.accept(new CrawlingSpecification("third",1, "test3", List.of("a", "b"), null));
writer.accept(new CrawlingSpecification("first",1, "test1", List.of("a", "b", "c")));
writer.accept(new CrawlingSpecification("second",1, "test2", List.of("a", "b", "c", "d")));
writer.accept(new CrawlingSpecification("third",1, "test3", List.of("a", "b")));
}
List<CrawlingSpecification> outputs = new ArrayList<>();