MarginaliaSearch/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java
Viktor Lofgren aebb2652e8 (wip) Extract and encode spans data
Refactoring keyword extraction to extract spans information.

Modifying the intermediate storage of converted data to use the new slop library, which is allows for easier storage of ad-hoc binary data like spans and positions.

This is a bit of a katamari damacy commit that ended up dragging along a bunch of other fairly tangentially related changes that are hard to break out into separate commits after the fact.  Will push as-is to get back to being able to do more isolated work.
2024-07-27 11:44:13 +02:00

415 lines
16 KiB
Java

package nu.marginalia.crawl;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.atags.source.AnchorTagsSource;
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.crawl.spec.DbCrawlSpecProvider;
import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider;
import nu.marginalia.crawl.warc.WarcArchiverFactory;
import nu.marginalia.crawl.warc.WarcArchiverIf;
import nu.marginalia.crawlspec.CrawlSpecFileNames;
import nu.marginalia.io.crawldata.CrawledDomainReader;
import nu.marginalia.io.crawldata.CrawlerOutputFile;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.util.SimpleBlockingThreadPool;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.Security;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static nu.marginalia.mqapi.ProcessInboxNames.CRAWLER_INBOX;
public class CrawlerMain extends ProcessMainClass {
private final static Logger logger = LoggerFactory.getLogger(CrawlerMain.class);
private final UserAgent userAgent;
private final ProcessHeartbeatImpl heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final DomainProber domainProber;
private final FileStorageService fileStorageService;
private final DbCrawlSpecProvider dbCrawlSpecProvider;
private final AnchorTagsSourceFactory anchorTagsSourceFactory;
private final WarcArchiverFactory warcArchiverFactory;
private final Gson gson;
private final int node;
private final SimpleBlockingThreadPool pool;
private final Map<String, String> processingIds = new ConcurrentHashMap<>();
private final AbortMonitor abortMonitor = AbortMonitor.getInstance();
private final AtomicInteger tasksDone = new AtomicInteger(0);
private final HttpFetcherImpl fetcher;
private volatile int totalTasks;
@Inject
public CrawlerMain(UserAgent userAgent,
ProcessHeartbeatImpl heartbeat,
MessageQueueFactory messageQueueFactory, DomainProber domainProber,
FileStorageService fileStorageService,
ProcessConfiguration processConfiguration,
DbCrawlSpecProvider dbCrawlSpecProvider,
AnchorTagsSourceFactory anchorTagsSourceFactory,
WarcArchiverFactory warcArchiverFactory,
Gson gson) {
this.userAgent = userAgent;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.domainProber = domainProber;
this.fileStorageService = fileStorageService;
this.dbCrawlSpecProvider = dbCrawlSpecProvider;
this.anchorTagsSourceFactory = anchorTagsSourceFactory;
this.warcArchiverFactory = warcArchiverFactory;
this.gson = gson;
this.node = processConfiguration.node();
pool = new SimpleBlockingThreadPool("CrawlerPool",
Integer.getInteger("crawler.poolSize", 256),
1);
fetcher = new HttpFetcherImpl(userAgent,
new Dispatcher(),
new ConnectionPool(5, 10, TimeUnit.SECONDS)
);
}
public static void main(String... args) throws Exception {
if (!AbortMonitor.getInstance().isAlive()) {
System.err.println("Remove abort file first");
return;
}
// Prevent Java from caching DNS lookups forever (filling up the system RAM as a result)
Security.setProperty("networkaddress.cache.ttl" , "3600");
// This must run *early*
System.setProperty("http.agent", WmsaHome.getUserAgent().uaString());
// If these aren't set properly, the JVM will hang forever on some requests
System.setProperty("sun.net.client.defaultConnectTimeout", "30000");
System.setProperty("sun.net.client.defaultReadTimeout", "30000");
// We don't want to use too much memory caching sessions for https
System.setProperty("javax.net.ssl.sessionCacheSize", "2048");
try {
Injector injector = Guice.createInjector(
new CrawlerModule(),
new ProcessConfigurationModule("crawler"),
new DatabaseModule(false)
);
var crawler = injector.getInstance(CrawlerMain.class);
var instructions = crawler.fetchInstructions();
try {
if (instructions.targetDomainName != null) {
crawler.runForSingleDomain(instructions.targetDomainName, instructions.outputDir);
}
else {
crawler.run(instructions.specProvider, instructions.outputDir);
}
instructions.ok();
} catch (Exception ex) {
logger.error("Crawler failed", ex);
instructions.err();
}
TimeUnit.SECONDS.sleep(5);
}
catch (Exception ex) {
logger.error("Uncaught exception", ex);
}
System.exit(0);
}
public void run(CrawlSpecProvider specProvider, Path outputDir) throws Exception {
heartbeat.start();
// First a validation run to ensure the file is all good to parse
totalTasks = specProvider.totalCount();
if (totalTasks == 0) {
// This is an error state, and we should make noise about it
throw new IllegalStateException("No crawl tasks found, refusing to continue");
}
logger.info("Queued {} crawl tasks, let's go", totalTasks);
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"));
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains())
) {
try (var specStream = specProvider.stream()) {
specStream
.takeWhile((e) -> abortMonitor.isAlive())
.filter(e -> !workLog.isJobFinished(e.domain))
.filter(e -> processingIds.put(e.domain, "") == null)
.map(e -> new CrawlTask(e, anchorTagsSource, outputDir, warcArchiver, workLog))
.forEach(pool::submitQuietly);
}
logger.info("Shutting down the pool, waiting for tasks to complete...");
pool.shutDown();
int activePoolCount = pool.getActiveCount();
while (!pool.awaitTermination(5, TimeUnit.HOURS)) {
int newActivePoolCount = pool.getActiveCount();
if (activePoolCount == newActivePoolCount) {
logger.warn("Aborting the last {} jobs of the crawl, taking too long", newActivePoolCount);
pool.shutDownNow();
} else {
activePoolCount = newActivePoolCount;
}
}
}
catch (Exception ex) {
logger.warn("Exception in crawler", ex);
}
finally {
heartbeat.shutDown();
}
}
public void runForSingleDomain(String targetDomainName, Path outputDir) throws Exception {
heartbeat.start();
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler-" + targetDomainName.replace('/', '-') + ".log"));
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(List.of(new EdgeDomain(targetDomainName)))
) {
var spec = new CrawlSpecRecord(targetDomainName, 1000, null);
var task = new CrawlTask(spec, anchorTagsSource, outputDir, warcArchiver, workLog);
task.run();
}
catch (Exception ex) {
logger.warn("Exception in crawler", ex);
}
finally {
heartbeat.shutDown();
}
}
class CrawlTask implements SimpleBlockingThreadPool.Task {
private final CrawlSpecRecord specification;
private final String domain;
private final String id;
private final AnchorTagsSource anchorTagsSource;
private final Path outputDir;
private final WarcArchiverIf warcArchiver;
private final WorkLog workLog;
CrawlTask(CrawlSpecRecord specification,
AnchorTagsSource anchorTagsSource,
Path outputDir,
WarcArchiverIf warcArchiver,
WorkLog workLog)
{
this.specification = specification;
this.anchorTagsSource = anchorTagsSource;
this.outputDir = outputDir;
this.warcArchiver = warcArchiver;
this.workLog = workLog;
this.domain = specification.domain;
this.id = Integer.toHexString(domain.hashCode());
}
@Override
public void run() throws Exception {
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path parquetFile = CrawlerOutputFile.createParquetPath(outputDir, id, domain);
if (Files.exists(newWarcFile)) {
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
}
else {
Files.deleteIfExists(tempFile);
}
try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder);
CrawlDataReference reference = getReference())
{
Thread.currentThread().setName("crawling:" + domain);
var domainLinks = anchorTagsSource.getAnchorTags(domain);
if (Files.exists(tempFile)) {
retriever.syncAbortedRun(tempFile);
Files.delete(tempFile);
}
int size = retriever.fetch(domainLinks, reference);
// Delete the reference crawl data if it's not the same as the new one
// (mostly a case when migrating from legacy->warc)
reference.delete();
CrawledDocumentParquetRecordFileWriter
.convertWarc(domain, userAgent, newWarcFile, parquetFile);
// Optionally archive the WARC file if full retention is enabled,
// otherwise delete it:
warcArchiver.consumeWarc(newWarcFile, domain);
workLog.setJobToFinished(domain, parquetFile.toString(), size);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
logger.info("Fetched {}", domain);
} catch (Exception e) {
logger.error("Error fetching domain " + domain, e);
}
finally {
// We don't need to double-count these; it's also kept int he workLog
processingIds.remove(domain);
Thread.currentThread().setName("[idle]");
Files.deleteIfExists(newWarcFile);
Files.deleteIfExists(tempFile);
}
}
private CrawlDataReference getReference() {
try {
return new CrawlDataReference(CrawledDomainReader.createDataStream(outputDir, domain, id));
} catch (IOException e) {
logger.debug("Failed to read previous crawl data for {}", specification.domain);
return new CrawlDataReference();
}
}
}
private static class CrawlRequest {
private final CrawlSpecProvider specProvider;
private final Path outputDir;
private final MqMessage message;
private final MqSingleShotInbox inbox;
private final String targetDomainName;
CrawlRequest(CrawlSpecProvider specProvider,
String targetDomainName,
Path outputDir,
MqMessage message,
MqSingleShotInbox inbox)
{
this.message = message;
this.inbox = inbox;
this.specProvider = specProvider;
this.outputDir = outputDir;
this.targetDomainName = targetDomainName;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private CrawlRequest fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(CRAWLER_INBOX, node, UUID.randomUUID());
logger.info("Waiting for instructions");
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.crawling.CrawlRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class);
CrawlSpecProvider specProvider;
if (request.specStorage != null) {
var specData = fileStorageService.getStorage(request.specStorage);
var parquetProvider = new ParquetCrawlSpecProvider(CrawlSpecFileNames.resolve(specData));
// Ensure the parquet domains are loaded into the database to avoid
// rare data-loss scenarios
dbCrawlSpecProvider.ensureParquetDomainsLoaded(parquetProvider);
specProvider = parquetProvider;
}
else {
specProvider = dbCrawlSpecProvider;
}
var crawlData = fileStorageService.getStorage(request.crawlStorage);
return new CrawlRequest(
specProvider,
request.targetDomainName,
crawlData.asPath(),
msg,
inbox);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
}