MarginaliaSearch/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java

372 lines
14 KiB
Java
Raw Normal View History

2023-03-04 12:19:01 +00:00
package nu.marginalia.crawl;
2022-05-19 15:45:26 +00:00
2023-07-20 19:05:16 +00:00
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
2023-03-04 12:19:01 +00:00
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.atags.source.AnchorTagsSource;
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.crawl.spec.DbCrawlSpecProvider;
import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider;
import nu.marginalia.crawl.warc.WarcArchiverFactory;
import nu.marginalia.crawl.warc.WarcArchiverIf;
2023-07-20 19:05:16 +00:00
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.CrawlerOutputFile;
import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter;
import nu.marginalia.crawlspec.CrawlSpecFileNames;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
2023-07-20 19:05:16 +00:00
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
2023-03-04 12:19:01 +00:00
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.util.SimpleBlockingThreadPool;
import okhttp3.ConnectionPool;
2022-05-19 15:45:26 +00:00
import okhttp3.Dispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
2023-07-20 19:05:16 +00:00
import java.io.IOException;
import java.nio.file.Files;
2022-05-19 15:45:26 +00:00
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.Security;
2023-07-20 19:05:16 +00:00
import java.sql.SQLException;
import java.util.*;
2022-08-10 15:03:58 +00:00
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
2022-05-19 15:45:26 +00:00
2023-07-20 19:05:16 +00:00
import static nu.marginalia.mqapi.ProcessInboxNames.CRAWLER_INBOX;
public class CrawlerMain extends ProcessMainClass {
private final static Logger logger = LoggerFactory.getLogger(CrawlerMain.class);
2022-05-19 15:45:26 +00:00
private final UserAgent userAgent;
private final ProcessHeartbeatImpl heartbeat;
2023-07-20 19:05:16 +00:00
private final MessageQueueFactory messageQueueFactory;
private final DomainProber domainProber;
2023-07-20 19:05:16 +00:00
private final FileStorageService fileStorageService;
private final DbCrawlSpecProvider dbCrawlSpecProvider;
private final AnchorTagsSourceFactory anchorTagsSourceFactory;
private final WarcArchiverFactory warcArchiverFactory;
2023-07-20 19:05:16 +00:00
private final Gson gson;
private final int node;
private final SimpleBlockingThreadPool pool;
2022-05-19 15:45:26 +00:00
private final Map<String, String> processingIds = new ConcurrentHashMap<>();
2023-07-20 19:05:16 +00:00
final AbortMonitor abortMonitor = AbortMonitor.getInstance();
volatile int totalTasks;
final AtomicInteger tasksDone = new AtomicInteger(0);
private HttpFetcherImpl fetcher;
2023-07-20 19:05:16 +00:00
@Inject
public CrawlerMain(UserAgent userAgent,
ProcessHeartbeatImpl heartbeat,
MessageQueueFactory messageQueueFactory, DomainProber domainProber,
2023-07-20 19:05:16 +00:00
FileStorageService fileStorageService,
ProcessConfiguration processConfiguration,
DbCrawlSpecProvider dbCrawlSpecProvider,
AnchorTagsSourceFactory anchorTagsSourceFactory,
WarcArchiverFactory warcArchiverFactory,
2023-07-20 19:05:16 +00:00
Gson gson) {
this.userAgent = userAgent;
2023-07-20 19:05:16 +00:00
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.domainProber = domainProber;
2023-07-20 19:05:16 +00:00
this.fileStorageService = fileStorageService;
this.dbCrawlSpecProvider = dbCrawlSpecProvider;
this.anchorTagsSourceFactory = anchorTagsSourceFactory;
this.warcArchiverFactory = warcArchiverFactory;
2023-07-20 19:05:16 +00:00
this.gson = gson;
this.node = processConfiguration.node();
2023-07-20 19:05:16 +00:00
pool = new SimpleBlockingThreadPool("CrawlerPool",
Integer.getInteger("crawler.poolSize", 256),
1);
fetcher = new HttpFetcherImpl(userAgent,
new Dispatcher(),
new ConnectionPool(5, 10, TimeUnit.SECONDS)
);
2022-05-19 15:45:26 +00:00
}
public static void main(String... args) throws Exception {
2022-05-19 15:45:26 +00:00
if (!AbortMonitor.getInstance().isAlive()) {
System.err.println("Remove abort file first");
return;
}
// Prevent Java from caching DNS lookups forever (filling up the system RAM as a result)
Security.setProperty("networkaddress.cache.ttl" , "3600");
// This must run *early*
System.setProperty("http.agent", WmsaHome.getUserAgent().uaString());
// If these aren't set properly, the JVM will hang forever on some requests
System.setProperty("sun.net.client.defaultConnectTimeout", "30000");
System.setProperty("sun.net.client.defaultReadTimeout", "30000");
// We don't want to use too much memory caching sessions for https
System.setProperty("javax.net.ssl.sessionCacheSize", "2048");
2023-07-20 19:05:16 +00:00
try {
Injector injector = Guice.createInjector(
new CrawlerModule(),
new ProcessConfigurationModule("crawler"),
new DatabaseModule(false)
);
var crawler = injector.getInstance(CrawlerMain.class);
var instructions = crawler.fetchInstructions();
try {
crawler.run(instructions.specProvider, instructions.outputDir);
instructions.ok();
} catch (Exception ex) {
logger.error("Crawler failed", ex);
instructions.err();
}
TimeUnit.SECONDS.sleep(5);
2022-05-19 15:45:26 +00:00
}
2023-07-20 19:05:16 +00:00
catch (Exception ex) {
logger.error("Uncaught exception", ex);
}
System.exit(0);
2022-05-19 15:45:26 +00:00
}
public void run(CrawlSpecProvider specProvider, Path outputDir) throws Exception {
2023-07-20 19:05:16 +00:00
heartbeat.start();
// First a validation run to ensure the file is all good to parse
totalTasks = specProvider.totalCount();
if (totalTasks == 0) {
// This is an error state, and we should make noise about it
throw new IllegalStateException("No crawl tasks found, refusing to continue");
}
logger.info("Queued {} crawl tasks, let's go", totalTasks);
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"));
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains())
) {
try (var specStream = specProvider.stream()) {
specStream
.takeWhile((e) -> abortMonitor.isAlive())
.filter(e -> !workLog.isJobFinished(e.domain))
.filter(e -> processingIds.put(e.domain, "") == null)
.map(e -> new CrawlTask(e, anchorTagsSource, outputDir, warcArchiver, workLog))
.forEach(pool::submitQuietly);
2023-07-20 19:05:16 +00:00
}
2023-03-04 12:19:01 +00:00
logger.info("Shutting down the pool, waiting for tasks to complete...");
pool.shutDown();
int activePoolCount = pool.getActiveCount();
while (!pool.awaitTermination(5, TimeUnit.HOURS)) {
int newActivePoolCount = pool.getActiveCount();
if (activePoolCount == newActivePoolCount) {
logger.warn("Aborting the last {} jobs of the crawl, taking too long", newActivePoolCount);
pool.shutDownNow();
} else {
activePoolCount = newActivePoolCount;
}
}
}
catch (Exception ex) {
logger.warn("Exception in crawler", ex);
2023-07-20 19:05:16 +00:00
}
finally {
heartbeat.shutDown();
}
2023-03-04 12:19:01 +00:00
}
class CrawlTask implements SimpleBlockingThreadPool.Task {
private final CrawlSpecRecord specification;
private final String domain;
private final String id;
private final AnchorTagsSource anchorTagsSource;
private final Path outputDir;
private final WarcArchiverIf warcArchiver;
private final WorkLog workLog;
2023-03-04 12:19:01 +00:00
CrawlTask(CrawlSpecRecord specification,
AnchorTagsSource anchorTagsSource,
Path outputDir,
WarcArchiverIf warcArchiver,
WorkLog workLog) {
this.specification = specification;
this.anchorTagsSource = anchorTagsSource;
this.outputDir = outputDir;
this.warcArchiver = warcArchiver;
this.workLog = workLog;
this.domain = specification.domain;
this.id = Integer.toHexString(domain.hashCode());
}
@Override
public void run() throws Exception {
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path parquetFile = CrawlerOutputFile.createParquetPath(outputDir, id, domain);
if (Files.exists(newWarcFile)) {
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
}
else {
Files.deleteIfExists(tempFile);
}
try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder);
CrawlDataReference reference = getReference())
{
Thread.currentThread().setName("crawling:" + domain);
var domainLinks = anchorTagsSource.getAnchorTags(domain);
2022-05-19 15:45:26 +00:00
if (Files.exists(tempFile)) {
retriever.syncAbortedRun(tempFile);
Files.delete(tempFile);
}
int size = retriever.fetch(domainLinks, reference);
// Delete the reference crawl data if it's not the same as the new one
// (mostly a case when migrating from legacy->warc)
reference.delete();
2022-05-19 15:45:26 +00:00
CrawledDocumentParquetRecordFileWriter
.convertWarc(domain, userAgent, newWarcFile, parquetFile);
warcArchiver.consumeWarc(newWarcFile, domain);
workLog.setJobToFinished(domain, parquetFile.toString(), size);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
2022-05-19 15:45:26 +00:00
logger.info("Fetched {}", domain);
} catch (Exception e) {
logger.error("Error fetching domain " + domain, e);
}
finally {
// We don't need to double-count these; it's also kept int he workLog
processingIds.remove(domain);
Thread.currentThread().setName("[idle]");
Files.deleteIfExists(newWarcFile);
Files.deleteIfExists(tempFile);
}
}
2022-05-19 15:45:26 +00:00
private CrawlDataReference getReference() {
try {
return new CrawlDataReference(CrawledDomainReader.createDataStream(outputDir, domain, id));
} catch (IOException e) {
2023-08-16 09:12:09 +00:00
logger.debug("Failed to read previous crawl data for {}", specification.domain);
return new CrawlDataReference();
}
}
}
2023-07-20 19:05:16 +00:00
private static class CrawlRequest {
private final CrawlSpecProvider specProvider;
private final Path outputDir;
2023-07-20 19:05:16 +00:00
private final MqMessage message;
private final MqSingleShotInbox inbox;
CrawlRequest(CrawlSpecProvider specProvider, Path outputDir, MqMessage message, MqSingleShotInbox inbox) {
2023-07-20 19:05:16 +00:00
this.message = message;
this.inbox = inbox;
this.specProvider = specProvider;
this.outputDir = outputDir;
2023-07-20 19:05:16 +00:00
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private CrawlRequest fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(CRAWLER_INBOX, node, UUID.randomUUID());
2023-07-20 19:05:16 +00:00
logger.info("Waiting for instructions");
2023-07-20 19:05:16 +00:00
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.crawling.CrawlRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class);
CrawlSpecProvider specProvider;
if (request.specStorage != null) {
var specData = fileStorageService.getStorage(request.specStorage);
var parquetProvider = new ParquetCrawlSpecProvider(CrawlSpecFileNames.resolve(specData));
// Ensure the parquet domains are loaded into the database to avoid
// rare data-loss scenarios
dbCrawlSpecProvider.ensureParquetDomainsLoaded(parquetProvider);
specProvider = parquetProvider;
}
else {
specProvider = dbCrawlSpecProvider;
}
2023-07-20 19:05:16 +00:00
var crawlData = fileStorageService.getStorage(request.crawlStorage);
return new CrawlRequest(
specProvider,
crawlData.asPath(),
msg,
inbox);
2023-07-20 19:05:16 +00:00
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
2022-05-19 15:45:26 +00:00
}