(crawler) Clean up the crawler code a bit, removing vestigial abstractions and historical debris

This commit is contained in:
Viktor Lofgren 2024-10-15 17:27:59 +02:00
parent 481f999b70
commit 7305afa0f8
11 changed files with 209 additions and 279 deletions

View File

@ -7,12 +7,12 @@ import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.crawl.DomainIndexingState;
@ -77,7 +77,7 @@ public class CrawlingThenConvertingIntegrationTest {
@Test
public void testInvalidDomain() throws IOException {
// Attempt to fetch an invalid domain
var specs = new CrawlSpecProvider.CrawlSpecRecord("invalid.invalid.invalid", 10);
var specs = new CrawlerMain.CrawlSpecRecord("invalid.invalid.invalid", 10);
CrawledDomain crawlData = crawl(specs);
@ -93,7 +93,7 @@ public class CrawlingThenConvertingIntegrationTest {
@Test
public void testRedirectingDomain() throws IOException {
// Attempt to fetch an invalid domain
var specs = new CrawlSpecProvider.CrawlSpecRecord("memex.marginalia.nu", 10);
var specs = new CrawlerMain.CrawlSpecRecord("memex.marginalia.nu", 10);
CrawledDomain crawlData = crawl(specs);
@ -112,7 +112,7 @@ public class CrawlingThenConvertingIntegrationTest {
@Test
public void testBlockedDomain() throws IOException {
// Attempt to fetch an invalid domain
var specs = new CrawlSpecProvider.CrawlSpecRecord("search.marginalia.nu", 10);
var specs = new CrawlerMain.CrawlSpecRecord("search.marginalia.nu", 10);
CrawledDomain crawlData = crawl(specs, d->false); // simulate blocking by blacklisting everything
@ -128,7 +128,7 @@ public class CrawlingThenConvertingIntegrationTest {
@Test
public void crawlSunnyDay() throws IOException {
var specs = new CrawlSpecProvider.CrawlSpecRecord("www.marginalia.nu", 10);
var specs = new CrawlerMain.CrawlSpecRecord("www.marginalia.nu", 10);
CrawledDomain domain = crawl(specs);
assertFalse(domain.doc.isEmpty());
@ -161,7 +161,7 @@ public class CrawlingThenConvertingIntegrationTest {
@Test
public void crawlContentTypes() throws IOException {
var specs = new CrawlSpecProvider.CrawlSpecRecord("www.marginalia.nu", 10,
var specs = new CrawlerMain.CrawlSpecRecord("www.marginalia.nu", 10,
List.of(
"https://www.marginalia.nu/sanic.png",
"https://www.marginalia.nu/invalid"
@ -199,7 +199,7 @@ public class CrawlingThenConvertingIntegrationTest {
@Test
public void crawlRobotsTxt() throws IOException {
var specs = new CrawlSpecProvider.CrawlSpecRecord("search.marginalia.nu", 5,
var specs = new CrawlerMain.CrawlSpecRecord("search.marginalia.nu", 5,
List.of("https://search.marginalia.nu/search?q=hello+world")
);
@ -238,11 +238,11 @@ public class CrawlingThenConvertingIntegrationTest {
return null; // unreachable
}
}
private CrawledDomain crawl(CrawlSpecProvider.CrawlSpecRecord specs) throws IOException {
private CrawledDomain crawl(CrawlerMain.CrawlSpecRecord specs) throws IOException {
return crawl(specs, domain -> true);
}
private CrawledDomain crawl(CrawlSpecProvider.CrawlSpecRecord specs, Predicate<EdgeDomain> domainBlacklist) throws IOException {
private CrawledDomain crawl(CrawlerMain.CrawlSpecRecord specs, Predicate<EdgeDomain> domainBlacklist) throws IOException {
List<SerializableCrawlData> data = new ArrayList<>();
try (var recorder = new WarcRecorder(fileName)) {

View File

@ -1,46 +0,0 @@
package nu.marginalia.crawl;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
public class AbortMonitor {
private volatile boolean abort = false;
private static volatile AbortMonitor instance = null;
private static final Logger logger = LoggerFactory.getLogger(AbortMonitor.class);
public static AbortMonitor getInstance() {
if (instance == null) {
synchronized (AbortMonitor.class) {
if (instance == null) {
instance = new AbortMonitor();
new Thread(instance::run, "AbortMon").start();
}
}
}
return instance;
}
private AbortMonitor() {
}
@SneakyThrows
public void run() {
for (;;) {
Thread.sleep(1000);
if (Files.exists(Path.of("/tmp/stop"))) {
logger.warn("Abort file found");
abort = true;
Files.delete(Path.of("/tmp/stop"));
}
}
}
public boolean isAlive() {
return !abort;
}
}

View File

@ -4,10 +4,13 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.zaxxer.hikari.HikariDataSource;
import lombok.Builder;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.atags.source.AnchorTagsSource;
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
@ -16,9 +19,9 @@ import nu.marginalia.crawl.logic.DomainLocks;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.crawl.warc.WarcArchiverFactory;
import nu.marginalia.crawl.warc.WarcArchiverIf;
import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.model.EdgeDomain;
@ -35,6 +38,7 @@ import nu.marginalia.storage.FileStorageService;
import nu.marginalia.util.SimpleBlockingThreadPool;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,10 +48,7 @@ import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.Security;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -62,22 +63,28 @@ public class CrawlerMain extends ProcessMainClass {
private final MessageQueueFactory messageQueueFactory;
private final DomainProber domainProber;
private final FileStorageService fileStorageService;
private final CrawlSpecProvider crawlSpecProvider;
private final AnchorTagsSourceFactory anchorTagsSourceFactory;
private final WarcArchiverFactory warcArchiverFactory;
private final HikariDataSource dataSource;
private final DomainBlacklist blacklist;
private final Gson gson;
private final int node;
private final SimpleBlockingThreadPool pool;
private final DomainLocks domainLocks = new DomainLocks();
private final Map<String, String> processingIds = new ConcurrentHashMap<>();
private final Map<String, CrawlTask> pendingCrawlTasks = new ConcurrentHashMap<>();
private final AbortMonitor abortMonitor = AbortMonitor.getInstance();
private final AtomicInteger tasksDone = new AtomicInteger(0);
private final HttpFetcherImpl fetcher;
private volatile int totalTasks;
private int totalTasks = 1;
private static final double URL_GROWTH_FACTOR = Double.parseDouble(System.getProperty("crawler.crawlSetGrowthFactor", "1.25"));
private static final int MIN_URLS_PER_DOMAIN = Integer.getInteger("crawler.minUrlsPerDomain", 100);
private static final int MID_URLS_PER_DOMAIN = Integer.getInteger("crawler.minUrlsPerDomain", 2_000);
private static final int MAX_URLS_PER_DOMAIN = Integer.getInteger("crawler.maxUrlsPerDomain", 10_000);
@Inject
public CrawlerMain(UserAgent userAgent,
@ -85,18 +92,20 @@ public class CrawlerMain extends ProcessMainClass {
MessageQueueFactory messageQueueFactory, DomainProber domainProber,
FileStorageService fileStorageService,
ProcessConfiguration processConfiguration,
CrawlSpecProvider crawlSpecProvider,
AnchorTagsSourceFactory anchorTagsSourceFactory,
WarcArchiverFactory warcArchiverFactory,
Gson gson) {
HikariDataSource dataSource,
DomainBlacklist blacklist,
Gson gson) throws InterruptedException {
this.userAgent = userAgent;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.domainProber = domainProber;
this.fileStorageService = fileStorageService;
this.crawlSpecProvider = crawlSpecProvider;
this.anchorTagsSourceFactory = anchorTagsSourceFactory;
this.warcArchiverFactory = warcArchiverFactory;
this.dataSource = dataSource;
this.blacklist = blacklist;
this.gson = gson;
this.node = processConfiguration.node();
@ -108,15 +117,13 @@ public class CrawlerMain extends ProcessMainClass {
new Dispatcher(),
new ConnectionPool(5, 10, TimeUnit.SECONDS)
);
// Wait for the blacklist to be loaded before starting the crawl
blacklist.waitUntilLoaded();
}
public static void main(String... args) throws Exception {
if (!AbortMonitor.getInstance().isAlive()) {
System.err.println("Remove abort file first");
return;
}
// Prevent Java from caching DNS lookups forever (filling up the system RAM as a result)
Security.setProperty("networkaddress.cache.ttl" , "3600");
@ -144,7 +151,7 @@ public class CrawlerMain extends ProcessMainClass {
crawler.runForSingleDomain(instructions.targetDomainName, instructions.outputDir);
}
else {
crawler.run(instructions.outputDir);
crawler.runForDatabaseDomains(instructions.outputDir);
}
instructions.ok();
} catch (Exception ex) {
@ -160,34 +167,99 @@ public class CrawlerMain extends ProcessMainClass {
System.exit(0);
}
public void run(Path outputDir) throws Exception {
public void runForDatabaseDomains(Path outputDir) throws Exception {
heartbeat.start();
logger.info("Loading domains to be crawled");
final List<CrawlSpecRecord> crawlSpecRecords = new ArrayList<>();
final List<EdgeDomain> domainsToCrawl = new ArrayList<>();
// Assign any domains with node_affinity=0 to this node, and then fetch all domains assigned to this node
// to be crawled.
try (var conn = dataSource.getConnection()) {
try (var assignFreeDomains = conn.prepareStatement(
"""
UPDATE EC_DOMAIN
SET NODE_AFFINITY=?
WHERE NODE_AFFINITY=0
"""))
{
// Assign any domains with node_affinity=0 to this node. We must do this now, before we start crawling
// to avoid race conditions with other crawl runs. We don't want multiple crawlers to crawl the same domain.
assignFreeDomains.setInt(1, node);
assignFreeDomains.executeUpdate();
}
try (var query = conn.prepareStatement("""
SELECT DOMAIN_NAME, COALESCE(VISITED_URLS, 0), EC_DOMAIN.ID
FROM EC_DOMAIN
LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
WHERE NODE_AFFINITY=?
""")) {
// Fetch the domains to be crawled
query.setInt(1, node);
query.setFetchSize(10_000);
var rs = query.executeQuery();
while (rs.next()) {
// Skip blacklisted domains
int domainId = rs.getInt(3);
if (blacklist.isBlacklisted(domainId))
continue;
int existingUrls = rs.getInt(2);
String domainName = rs.getString(1);
domainsToCrawl.add(new EdgeDomain(domainName));
crawlSpecRecords.add(CrawlSpecRecord.growExistingDomain(domainName, existingUrls));
totalTasks++;
}
}
}
logger.info("Loaded {} domains", crawlSpecRecords.size());
// Shuffle the domains to ensure we get a good mix of domains in each crawl,
// so that e.g. the big domains don't get all crawled at once, or we end up
// crawling the same server in parallel from different subdomains...
Collections.shuffle(crawlSpecRecords);
// First a validation run to ensure the file is all good to parse
totalTasks = crawlSpecProvider.totalCount();
if (totalTasks == 0) {
if (crawlSpecRecords.isEmpty()) {
// This is an error state, and we should make noise about it
throw new IllegalStateException("No crawl tasks found, refusing to continue");
}
logger.info("Queued {} crawl tasks, let's go", totalTasks);
else {
logger.info("Queued {} crawl tasks, let's go", crawlSpecRecords.size());
}
// Set up the work log and the warc archiver so we can keep track of what we've done
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"));
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(crawlSpecProvider.getDomains())
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(domainsToCrawl)
) {
// Set the number of tasks done to the number of tasks that are already finished,
// (this happens when the process is restarted after a crash or a shutdown)
tasksDone.set(workLog.countFinishedJobs());
// Process the crawl tasks
try (var specStream = crawlSpecProvider.stream()) {
specStream
.takeWhile((e) -> abortMonitor.isAlive())
.filter(e -> !workLog.isJobFinished(e.domain()))
.filter(e -> processingIds.put(e.domain(), "") == null)
.map(e -> new CrawlTask(e, anchorTagsSource, outputDir, warcArchiver, workLog))
.forEach(pool::submitQuietly);
// Create crawl tasks and submit them to the pool for execution
for (CrawlSpecRecord crawlSpec : crawlSpecRecords) {
if (workLog.isJobFinished(crawlSpec.domain()))
continue;
var task = new CrawlTask(
crawlSpec,
anchorTagsSource,
outputDir,
warcArchiver,
workLog);
if (pendingCrawlTasks.putIfAbsent(crawlSpec.domain(), task) == null) {
pool.submitQuietly(task);
}
}
logger.info("Shutting down the pool, waiting for tasks to complete...");
@ -222,7 +294,7 @@ public class CrawlerMain extends ProcessMainClass {
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(List.of(new EdgeDomain(targetDomainName)))
) {
var spec = new CrawlSpecProvider.CrawlSpecRecord(targetDomainName, 1000, List.of());
var spec = new CrawlSpecRecord(targetDomainName, 1000, List.of());
var task = new CrawlTask(spec, anchorTagsSource, outputDir, warcArchiver, workLog);
task.run();
}
@ -234,9 +306,9 @@ public class CrawlerMain extends ProcessMainClass {
}
}
class CrawlTask implements SimpleBlockingThreadPool.Task {
private class CrawlTask implements SimpleBlockingThreadPool.Task {
private final CrawlSpecProvider.CrawlSpecRecord specification;
private final CrawlSpecRecord specification;
private final String domain;
private final String id;
@ -246,7 +318,7 @@ public class CrawlerMain extends ProcessMainClass {
private final WarcArchiverIf warcArchiver;
private final WorkLog workLog;
CrawlTask(CrawlSpecProvider.CrawlSpecRecord specification,
CrawlTask(CrawlSpecRecord specification,
AnchorTagsSource anchorTagsSource,
Path outputDir,
WarcArchiverIf warcArchiver,
@ -269,6 +341,8 @@ public class CrawlerMain extends ProcessMainClass {
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path parquetFile = CrawlerOutputFile.createParquetPath(outputDir, id, domain);
// Move the WARC file to a temp file if it exists, so we can resume the crawl using the old data
// while writing to the same file name as before
if (Files.exists(newWarcFile)) {
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
}
@ -276,31 +350,29 @@ public class CrawlerMain extends ProcessMainClass {
Files.deleteIfExists(tempFile);
}
var domainLock = domainLocks.getSemaphore(new EdgeDomain(specification.domain()));
try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder);
CrawlDataReference reference = getReference())
CrawlDataReference reference = getReference();
)
{
// acquire the domain lock to prevent other threads from crawling the same domain,
// we release it at the end of the task to let them go ahead
Thread.currentThread().setName("crawling:" + domain + " [await domain lock]");
domainLock.acquire();
Thread.currentThread().setName("crawling:" + domain);
var domainLinks = anchorTagsSource.getAnchorTags(domain);
// Resume the crawl if it was aborted
if (Files.exists(tempFile)) {
retriever.syncAbortedRun(tempFile);
Files.delete(tempFile);
}
int size = retriever.crawlDomain(domainLinks, reference);
DomainLinks domainLinks = anchorTagsSource.getAnchorTags(domain);
int size;
try (var lock = domainLocks.lockDomain(new EdgeDomain(domain))) {
size = retriever.crawlDomain(domainLinks, reference);
}
// Delete the reference crawl data if it's not the same as the new one
// (mostly a case when migrating from legacy->warc)
reference.delete();
// Convert the WARC file to Parquet
CrawledDocumentParquetRecordFileWriter
.convertWarc(domain, userAgent, newWarcFile, parquetFile);
@ -308,7 +380,10 @@ public class CrawlerMain extends ProcessMainClass {
// otherwise delete it:
warcArchiver.consumeWarc(newWarcFile, domain);
// Mark the domain as finished in the work log
workLog.setJobToFinished(domain, parquetFile.toString(), size);
// Update the progress bar
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
logger.info("Fetched {}", domain);
@ -316,11 +391,8 @@ public class CrawlerMain extends ProcessMainClass {
logger.error("Error fetching domain " + domain, e);
}
finally {
// release the domain lock to permit other threads to crawl subdomains of this domain
domainLock.release();
// We don't need to double-count these; it's also kept int he workLog
processingIds.remove(domain);
pendingCrawlTasks.remove(domain);
Thread.currentThread().setName("[idle]");
Files.deleteIfExists(newWarcFile);
@ -379,12 +451,11 @@ public class CrawlerMain extends ProcessMainClass {
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class);
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var crawlStorage = fileStorageService.getStorage(request.crawlStorage);
return new CrawlRequest(
request.targetDomainName,
crawlData.asPath(),
crawlStorage.asPath(),
msg,
inbox);
}
@ -404,4 +475,25 @@ public class CrawlerMain extends ProcessMainClass {
}
}
@Builder
public record CrawlSpecRecord(@NotNull String domain, int crawlDepth, @NotNull List<String> urls) {
public CrawlSpecRecord(String domain, int crawlDepth) {
this(domain, crawlDepth, List.of());
}
public static CrawlSpecRecord growExistingDomain(String domain, int visitedUrls) {
// Calculate the number of URLs to fetch for this domain, based on the number of URLs
// already fetched, and a growth factor that gets a bonus for small domains
return new CrawlSpecRecord(domain,
(int) Math.clamp(
(visitedUrls * (visitedUrls < MID_URLS_PER_DOMAIN
? Math.max(2.5, URL_GROWTH_FACTOR)
: URL_GROWTH_FACTOR)
),
MIN_URLS_PER_DOMAIN,
MAX_URLS_PER_DOMAIN));
}
}
}

View File

@ -18,8 +18,9 @@ public class DomainLocks {
/** Returns a lock object corresponding to the given domain. The object is returned as-is,
* and may be held by another thread. The caller is responsible for locking and releasing the lock.
*/
public Semaphore getSemaphore(EdgeDomain domain) {
return locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits);
public DomainLock lockDomain(EdgeDomain domain) throws InterruptedException {
return new DomainLock(domain.toString(),
locks.computeIfAbsent(domain.topDomain.toLowerCase(), this::defaultPermits));
}
private Semaphore defaultPermits(String topDomain) {
@ -42,4 +43,24 @@ public class DomainLocks {
return new Semaphore(2);
}
public static class DomainLock implements AutoCloseable {
private final String domainName;
private final Semaphore semaphore;
DomainLock(String domainName, Semaphore semaphore) throws InterruptedException {
this.domainName = domainName;
this.semaphore = semaphore;
Thread.currentThread().setName("crawling:" + domainName + " [await domain lock]");
semaphore.acquire();
Thread.currentThread().setName("crawling:" + domainName);
}
@Override
public void close() throws Exception {
semaphore.release();
Thread.currentThread().setName("crawling:" + domainName + " [wrapping up]");
}
}
}

View File

@ -36,6 +36,10 @@ public class CrawlDataReference implements AutoCloseable {
}
}
/** Get the next document from the crawl data,
* returning null when there are no more documents
* available
*/
@Nullable
public CrawledDocument nextDocument() {
try {
@ -52,7 +56,7 @@ public class CrawlDataReference implements AutoCloseable {
return null;
}
public boolean isContentBodySame(String one, String other) {
public static boolean isContentBodySame(String one, String other) {
final long contentHashOne = contentHash(one);
final long contentHashOther = contentHash(other);
@ -60,7 +64,7 @@ public class CrawlDataReference implements AutoCloseable {
return EasyLSH.hammingDistance(contentHashOne, contentHashOther) < 4;
}
private long contentHash(String content) {
private static long contentHash(String content) {
EasyLSH hash = new EasyLSH();
int next = 0;
@ -83,8 +87,8 @@ public class CrawlDataReference implements AutoCloseable {
return hash.get();
}
private final HashFunction hashFunction = Hashing.murmur3_128();
private int hashInt(int v) {
private static final HashFunction hashFunction = Hashing.murmur3_128();
private static int hashInt(int v) {
return hashFunction.hashInt(v).asInt();
}

View File

@ -3,6 +3,7 @@ package nu.marginalia.crawl.retreival;
import crawlercommons.robots.SimpleRobotRules;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.contenttype.ContentType;
import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
@ -11,7 +12,6 @@ import nu.marginalia.crawl.logic.LinkFilterSelector;
import nu.marginalia.crawl.retreival.revisit.CrawlerRevisitor;
import nu.marginalia.crawl.retreival.revisit.DocumentWithReference;
import nu.marginalia.crawl.retreival.sitemap.SitemapFetcher;
import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.ip_blocklist.UrlBlocklist;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain;
@ -54,7 +54,7 @@ public class CrawlerRetreiver implements AutoCloseable {
public CrawlerRetreiver(HttpFetcher fetcher,
DomainProber domainProber,
CrawlSpecProvider.CrawlSpecRecord specs,
CrawlerMain.CrawlSpecRecord specs,
WarcRecorder warcRecorder)
{
this.warcRecorder = warcRecorder;
@ -117,9 +117,7 @@ public class CrawlerRetreiver implements AutoCloseable {
sniffRootDocument(rootUrl, delayTimer);
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
int fetchedCount = crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer);
if (fetchedCount > 0) {
if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) {
// If we have reference data, we will always grow the crawl depth a bit
crawlFrontier.increaseDepth(1.5, 2500);
}
@ -162,9 +160,7 @@ public class CrawlerRetreiver implements AutoCloseable {
continue;
try {
if (fetchContentWithReference(top, delayTimer, DocumentWithReference.empty()).isOk()) {
fetchedCount++;
}
fetchContentWithReference(top, delayTimer, DocumentWithReference.empty());
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
@ -172,7 +168,7 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
return fetchedCount;
return crawlFrontier.visitedSize();
}
public void syncAbortedRun(Path warcFile) {

View File

@ -165,7 +165,7 @@ public class DomainCrawlFrontier {
public int queueSize() {
return queue.size();
}
public int visitedSize() { return visited.size(); }
public void enqueueLinksFromDocument(EdgeUrl baseUrl, Document parsed) {
baseUrl = linkParser.getBaseLink(parsed, baseUrl);

View File

@ -42,7 +42,7 @@ public record DocumentWithReference(
return false;
}
return reference.isContentBodySame(doc.documentBody, bodyOk.body());
return CrawlDataReference.isContentBodySame(doc.documentBody, bodyOk.body());
}
public ContentTags getContentTags() {

View File

@ -1,137 +0,0 @@
package nu.marginalia.crawl.spec;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import lombok.Builder;
import lombok.SneakyThrows;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.model.EdgeDomain;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
// FIXME: This design is a vestige from when there were multiple sources of crawl data. It should be simplified and probably merged with CrawlerMain.
public class CrawlSpecProvider {
private final HikariDataSource dataSource;
private final ProcessConfiguration processConfiguration;
private final DomainBlacklist blacklist;
private List<CrawlSpecRecord> domains;
private static final Logger logger = LoggerFactory.getLogger(CrawlSpecProvider.class);
private static final double URL_GROWTH_FACTOR = Double.parseDouble(System.getProperty("crawler.crawlSetGrowthFactor", "1.25"));
private static final int MIN_URLS_PER_DOMAIN = Integer.getInteger("crawler.minUrlsPerDomain", 100);
private static final int MID_URLS_PER_DOMAIN = Integer.getInteger("crawler.minUrlsPerDomain", 2_000);
private static final int MAX_URLS_PER_DOMAIN = Integer.getInteger("crawler.maxUrlsPerDomain", 10_000);
@Inject
public CrawlSpecProvider(HikariDataSource dataSource,
ProcessConfiguration processConfiguration,
DomainBlacklist blacklist
) {
this.dataSource = dataSource;
this.processConfiguration = processConfiguration;
this.blacklist = blacklist;
}
// Load the domains into memory to ensure the crawler is resilient to database blips
private List<CrawlSpecRecord> loadData() throws Exception {
var domains = new ArrayList<CrawlSpecRecord>();
logger.info("Loading domains to be crawled");
blacklist.waitUntilLoaded();
try (var conn = dataSource.getConnection();
var assignFreeDomains = conn.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY=? WHERE NODE_AFFINITY=0");
var query = conn.prepareStatement("""
SELECT DOMAIN_NAME, COALESCE(VISITED_URLS, 0), EC_DOMAIN.ID
FROM EC_DOMAIN
LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
WHERE NODE_AFFINITY=?
""")
)
{
// Assign any domains with node_affinity=0 to this node. We must do this now, before we start crawling
// to avoid race conditions with other crawl runs. We don't want multiple crawlers to crawl the same domain.
assignFreeDomains.setInt(1, processConfiguration.node());
assignFreeDomains.executeUpdate();
// Fetch the domains to be crawled
query.setInt(1, processConfiguration.node());
query.setFetchSize(10_000);
var rs = query.executeQuery();
while (rs.next()) {
// Skip blacklisted domains
int id = rs.getInt(3);
if (blacklist.isBlacklisted(id))
continue;
int urls = rs.getInt(2);
double growthFactor = urls < MID_URLS_PER_DOMAIN
? Math.max(2.5, URL_GROWTH_FACTOR)
: URL_GROWTH_FACTOR;
int urlsToFetch = Math.clamp((int) (growthFactor * rs.getInt(2)), MIN_URLS_PER_DOMAIN, MAX_URLS_PER_DOMAIN);
var record = new CrawlSpecRecord(
rs.getString(1),
urlsToFetch,
List.of()
);
domains.add(record);
}
}
logger.info("Loaded {} domains", domains.size());
// Shuffle the domains to ensure we get a good mix of domains in each crawl,
// so that e.g. the big domains don't get all crawled at once, or we end up
// crawling the same server in parallel from different subdomains...
Collections.shuffle(domains);
return domains;
}
public List<EdgeDomain> getDomains() {
return stream().map(CrawlSpecRecord::domain).map(EdgeDomain::new).toList();
}
public int totalCount() throws Exception {
if (domains == null) {
domains = loadData();
}
return domains.size();
}
@SneakyThrows
public Stream<CrawlSpecRecord> stream() {
if (domains == null) {
domains = loadData();
}
return domains.stream();
}
@Builder
public record CrawlSpecRecord(@NotNull String domain,
int crawlDepth,
@NotNull List<String> urls) {
public CrawlSpecRecord(String domain, int crawlDepth) {
this(domain, crawlDepth, List.of());
}
}
}

View File

@ -2,6 +2,7 @@ package nu.marginalia.crawling.retreival;
import crawlercommons.robots.SimpleRobotRules;
import lombok.SneakyThrows;
import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
@ -9,7 +10,6 @@ import nu.marginalia.crawl.fetcher.SitemapRetriever;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.body.HttpFetchResult;
@ -68,7 +68,7 @@ public class CrawlerMockFetcherTest {
}
void crawl(CrawlSpecProvider.CrawlSpecRecord spec) throws IOException {
void crawl(CrawlerMain.CrawlSpecRecord spec) throws IOException {
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(fetcherMock, new DomainProber(d -> true), spec, recorder)
.crawlDomain();
@ -83,7 +83,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://startrek.website/c/startrek"), "mock-crawl-data/lemmy/c_startrek.html");
registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html");
crawl(new CrawlSpecProvider.CrawlSpecRecord("startrek.website", 10, new ArrayList<>()));
crawl(new CrawlerMain.CrawlSpecRecord("startrek.website", 10, new ArrayList<>()));
}
@Test
@ -92,7 +92,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html");
crawl(new CrawlSpecProvider.CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>()));
crawl(new CrawlerMain.CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>()));
}
@Test
@ -103,7 +103,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/telegram-channel-to-idle-on/3501"), "mock-crawl-data/discourse/telegram.html");
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html");
crawl(new CrawlSpecProvider.CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>()));
crawl(new CrawlerMain.CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>()));
}
class MockFetcher implements HttpFetcher {

View File

@ -4,11 +4,11 @@ import lombok.SneakyThrows;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.*;
import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeDomain;
@ -76,7 +76,7 @@ class CrawlerRetreiverTest {
@Test
public void testWarcOutput() throws IOException {
var specs = CrawlSpecProvider.CrawlSpecRecord
var specs = CrawlerMain.CrawlSpecRecord
.builder()
.crawlDepth(5)
.domain("www.marginalia.nu")
@ -118,7 +118,7 @@ class CrawlerRetreiverTest {
@Test
public void testWarcOutputNoKnownUrls() throws IOException {
var specs = CrawlSpecProvider.CrawlSpecRecord
var specs = CrawlerMain.CrawlSpecRecord
.builder()
.crawlDepth(5)
.domain("www.marginalia.nu")
@ -161,7 +161,7 @@ class CrawlerRetreiverTest {
@SneakyThrows
@Test
public void testResync() throws IOException {
var specs = CrawlSpecProvider.CrawlSpecRecord
var specs = CrawlerMain.CrawlSpecRecord
.builder()
.crawlDepth(5)
.domain("www.marginalia.nu")
@ -210,7 +210,7 @@ class CrawlerRetreiverTest {
@Test
public void testWithKnownDomains() throws IOException {
var specs = CrawlSpecProvider.CrawlSpecRecord
var specs = CrawlerMain.CrawlSpecRecord
.builder()
.crawlDepth(5)
.domain("www.marginalia.nu")
@ -254,7 +254,7 @@ class CrawlerRetreiverTest {
@Test
public void testRedirect() throws IOException, URISyntaxException {
var specs = CrawlSpecProvider.CrawlSpecRecord
var specs = CrawlerMain.CrawlSpecRecord
.builder()
.crawlDepth(3)
.domain("www.marginalia.nu")
@ -312,7 +312,7 @@ class CrawlerRetreiverTest {
@Test
public void testEmptySet() throws IOException {
var specs = CrawlSpecProvider.CrawlSpecRecord
var specs = CrawlerMain.CrawlSpecRecord
.builder()
.crawlDepth(5)
.domain("www.marginalia.nu")
@ -360,7 +360,7 @@ class CrawlerRetreiverTest {
@Test
public void testRecrawl() throws IOException {
var specs = CrawlSpecProvider.CrawlSpecRecord
var specs = CrawlerMain.CrawlSpecRecord
.builder()
.crawlDepth(12)
.domain("www.marginalia.nu")
@ -420,7 +420,7 @@ class CrawlerRetreiverTest {
@Test
public void testRecrawlWithResync() throws IOException {
var specs = CrawlSpecProvider.CrawlSpecRecord
var specs = CrawlerMain.CrawlSpecRecord
.builder()
.crawlDepth(12)
.domain("www.marginalia.nu")
@ -508,7 +508,7 @@ class CrawlerRetreiverTest {
}
}
private void doCrawlWithReferenceStream(CrawlSpecProvider.CrawlSpecRecord specs, SerializableCrawlDataStream stream) {
private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, SerializableCrawlDataStream stream) {
try (var recorder = new WarcRecorder(tempFileWarc2)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).crawlDomain(new DomainLinks(),
new CrawlDataReference(stream));
@ -519,7 +519,7 @@ class CrawlerRetreiverTest {
}
@NotNull
private DomainCrawlFrontier doCrawl(Path tempFileWarc1, CrawlSpecProvider.CrawlSpecRecord specs) {
private DomainCrawlFrontier doCrawl(Path tempFileWarc1, CrawlerMain.CrawlSpecRecord specs) {
try (var recorder = new WarcRecorder(tempFileWarc1)) {
var crawler = new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder);
crawler.crawlDomain();