2024-11-19 18:35:01 +00:00
|
|
|
package nu.marginalia.livecrawler;
|
|
|
|
|
|
|
|
import com.google.gson.Gson;
|
|
|
|
import com.google.inject.Guice;
|
|
|
|
import com.google.inject.Inject;
|
|
|
|
import com.google.inject.Injector;
|
|
|
|
import nu.marginalia.ProcessConfiguration;
|
|
|
|
import nu.marginalia.ProcessConfigurationModule;
|
|
|
|
import nu.marginalia.WmsaHome;
|
|
|
|
import nu.marginalia.api.feeds.FeedsClient;
|
|
|
|
import nu.marginalia.converting.ConverterModule;
|
|
|
|
import nu.marginalia.converting.processor.DomainProcessor;
|
|
|
|
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
|
|
|
import nu.marginalia.db.DbDomainQueries;
|
|
|
|
import nu.marginalia.db.DomainBlacklist;
|
|
|
|
import nu.marginalia.io.SerializableCrawlDataStream;
|
|
|
|
import nu.marginalia.loading.LoaderInputData;
|
|
|
|
import nu.marginalia.loading.documents.DocumentLoaderService;
|
|
|
|
import nu.marginalia.loading.documents.KeywordLoaderService;
|
|
|
|
import nu.marginalia.loading.domains.DbDomainIdRegistry;
|
|
|
|
import nu.marginalia.loading.domains.DomainIdRegistry;
|
|
|
|
import nu.marginalia.model.EdgeDomain;
|
|
|
|
import nu.marginalia.mq.MessageQueueFactory;
|
|
|
|
import nu.marginalia.mq.MqMessage;
|
|
|
|
import nu.marginalia.mq.inbox.MqInboxResponse;
|
|
|
|
import nu.marginalia.mq.inbox.MqSingleShotInbox;
|
|
|
|
import nu.marginalia.mqapi.crawling.LiveCrawlRequest;
|
|
|
|
import nu.marginalia.process.control.ProcessHeartbeat;
|
|
|
|
import nu.marginalia.service.ProcessMainClass;
|
|
|
|
import nu.marginalia.service.module.DatabaseModule;
|
|
|
|
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
|
|
|
import nu.marginalia.storage.FileStorageService;
|
2024-11-20 14:36:25 +00:00
|
|
|
import nu.marginalia.storage.model.FileStorageBaseType;
|
2024-11-19 18:35:01 +00:00
|
|
|
import org.apache.commons.io.FileUtils;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
2024-11-20 14:36:25 +00:00
|
|
|
import java.nio.file.Files;
|
2024-11-19 18:35:01 +00:00
|
|
|
import java.nio.file.Path;
|
|
|
|
import java.security.Security;
|
|
|
|
import java.sql.SQLException;
|
|
|
|
import java.time.Instant;
|
|
|
|
import java.time.temporal.ChronoUnit;
|
|
|
|
import java.util.*;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
import static nu.marginalia.mqapi.ProcessInboxNames.LIVE_CRAWLER_INBOX;
|
|
|
|
|
|
|
|
public class LiveCrawlerMain extends ProcessMainClass {
|
|
|
|
|
|
|
|
private static final Logger logger =
|
|
|
|
LoggerFactory.getLogger(LiveCrawlerMain.class);
|
|
|
|
|
|
|
|
private final FeedsClient feedsClient;
|
|
|
|
private final Gson gson;
|
|
|
|
private final ProcessHeartbeat heartbeat;
|
|
|
|
private final DbDomainQueries domainQueries;
|
|
|
|
private final DomainBlacklist domainBlacklist;
|
|
|
|
private final MessageQueueFactory messageQueueFactory;
|
|
|
|
private final DomainProcessor domainProcessor;
|
|
|
|
private final FileStorageService fileStorageService;
|
|
|
|
private final KeywordLoaderService keywordLoaderService;
|
|
|
|
private final DocumentLoaderService documentLoaderService;
|
|
|
|
private final int node;
|
|
|
|
|
|
|
|
@Inject
|
|
|
|
public LiveCrawlerMain(FeedsClient feedsClient,
|
|
|
|
Gson gson,
|
|
|
|
ProcessConfiguration config,
|
|
|
|
ProcessHeartbeat heartbeat,
|
|
|
|
DbDomainQueries domainQueries,
|
|
|
|
DomainBlacklist domainBlacklist,
|
|
|
|
MessageQueueFactory messageQueueFactory,
|
|
|
|
DomainProcessor domainProcessor,
|
|
|
|
FileStorageService fileStorageService,
|
|
|
|
KeywordLoaderService keywordLoaderService,
|
|
|
|
DocumentLoaderService documentLoaderService)
|
|
|
|
throws Exception
|
|
|
|
{
|
|
|
|
this.feedsClient = feedsClient;
|
|
|
|
this.gson = gson;
|
|
|
|
this.heartbeat = heartbeat;
|
|
|
|
this.domainQueries = domainQueries;
|
|
|
|
this.domainBlacklist = domainBlacklist;
|
|
|
|
this.messageQueueFactory = messageQueueFactory;
|
|
|
|
this.node = config.node();
|
|
|
|
this.domainProcessor = domainProcessor;
|
|
|
|
this.fileStorageService = fileStorageService;
|
|
|
|
this.keywordLoaderService = keywordLoaderService;
|
|
|
|
this.documentLoaderService = documentLoaderService;
|
|
|
|
|
|
|
|
domainBlacklist.waitUntilLoaded();
|
|
|
|
}
|
|
|
|
|
|
|
|
public static void main(String... args) throws Exception {
|
|
|
|
|
|
|
|
// Prevent Java from caching DNS lookups forever (filling up the system RAM as a result)
|
|
|
|
Security.setProperty("networkaddress.cache.ttl", "3600");
|
|
|
|
|
|
|
|
// This must run *early*
|
|
|
|
System.setProperty("http.agent", WmsaHome.getUserAgent().uaString());
|
|
|
|
|
|
|
|
// If these aren't set properly, the JVM will hang forever on some requests
|
|
|
|
System.setProperty("sun.net.client.defaultConnectTimeout", "30000");
|
|
|
|
System.setProperty("sun.net.client.defaultReadTimeout", "30000");
|
|
|
|
|
|
|
|
// We don't want to use too much memory caching sessions for https
|
|
|
|
System.setProperty("javax.net.ssl.sessionCacheSize", "2048");
|
|
|
|
|
|
|
|
try {
|
|
|
|
Injector injector = Guice.createInjector(
|
|
|
|
new LiveCrawlerModule(),
|
|
|
|
new ProcessConfigurationModule("crawler"),
|
|
|
|
new ConverterModule(),
|
|
|
|
new ServiceDiscoveryModule(),
|
|
|
|
new DatabaseModule(false)
|
|
|
|
);
|
|
|
|
|
|
|
|
var crawler = injector.getInstance(LiveCrawlerMain.class);
|
|
|
|
LiveCrawlInstructions instructions = crawler.fetchInstructions();
|
|
|
|
|
2024-11-19 20:00:18 +00:00
|
|
|
try{
|
2024-11-20 14:36:25 +00:00
|
|
|
crawler.run();
|
2024-11-19 18:35:01 +00:00
|
|
|
instructions.ok();
|
|
|
|
} catch (Exception e) {
|
|
|
|
instructions.err();
|
|
|
|
throw e;
|
|
|
|
}
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
logger.error("LiveCrawler failed", e);
|
|
|
|
|
|
|
|
System.exit(1);
|
|
|
|
}
|
|
|
|
System.exit(0);
|
|
|
|
}
|
|
|
|
|
|
|
|
enum LiveCrawlState {
|
|
|
|
PRUNE_DB,
|
|
|
|
FETCH_LINKS,
|
|
|
|
CRAWLING,
|
|
|
|
PROCESSING,
|
|
|
|
LOADING,
|
|
|
|
CONSTRUCTING,
|
|
|
|
DONE
|
|
|
|
}
|
|
|
|
|
2024-11-20 14:36:25 +00:00
|
|
|
private void run() throws Exception {
|
|
|
|
Path basePath = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE).asPath().resolve("live-crawl-data");
|
|
|
|
|
|
|
|
if (!Files.isDirectory(basePath)) {
|
|
|
|
Files.createDirectories(basePath);
|
|
|
|
}
|
|
|
|
|
2024-11-19 20:00:18 +00:00
|
|
|
run(basePath);
|
2024-11-19 18:35:01 +00:00
|
|
|
}
|
|
|
|
|
2024-11-19 20:00:18 +00:00
|
|
|
private void run(Path basePath) throws Exception {
|
|
|
|
try (var processHeartbeat = heartbeat.createProcessTaskHeartbeat(LiveCrawlState.class, "LiveCrawler");
|
|
|
|
LiveCrawlDataSet dataSet = new LiveCrawlDataSet(basePath))
|
|
|
|
{
|
2024-11-19 18:35:01 +00:00
|
|
|
final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS);
|
|
|
|
|
2024-11-19 20:00:18 +00:00
|
|
|
processHeartbeat.progress(LiveCrawlState.FETCH_LINKS);
|
2024-11-19 18:35:01 +00:00
|
|
|
|
|
|
|
Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000);
|
|
|
|
feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put);
|
|
|
|
|
|
|
|
logger.info("Fetched data for {} domains", urlsPerDomain.size());
|
|
|
|
|
2024-11-19 20:00:18 +00:00
|
|
|
processHeartbeat.progress(LiveCrawlState.PRUNE_DB);
|
2024-11-19 18:35:01 +00:00
|
|
|
|
|
|
|
// Remove data that is too old
|
|
|
|
dataSet.prune(cutoff);
|
|
|
|
|
2024-11-19 20:00:18 +00:00
|
|
|
processHeartbeat.progress(LiveCrawlState.CRAWLING);
|
2024-11-19 18:35:01 +00:00
|
|
|
|
|
|
|
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainQueries, domainBlacklist);
|
|
|
|
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
|
|
|
|
{
|
|
|
|
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
|
|
|
|
EdgeDomain domain = new EdgeDomain(entry.getKey());
|
|
|
|
List<String> urls = entry.getValue();
|
|
|
|
|
|
|
|
fetcher.scheduleRetrieval(domain, urls);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Path tempPath = dataSet.createWorkDir();
|
|
|
|
|
|
|
|
try {
|
2024-11-19 20:00:18 +00:00
|
|
|
processHeartbeat.progress(LiveCrawlState.PROCESSING);
|
2024-11-19 18:35:01 +00:00
|
|
|
|
|
|
|
try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing");
|
|
|
|
var writer = new ConverterBatchWriter(tempPath, 0)
|
|
|
|
) {
|
2024-11-20 15:01:10 +00:00
|
|
|
// Offset the documents' ordinals toward the upper range, to avoid an ID collisions with the
|
|
|
|
// main indexes (the maximum permissible for doc ordinal is value is 67_108_863, so this
|
|
|
|
// leaves us with a lot of headroom still)
|
|
|
|
writer.setOrdinalOffset(67_000_000);
|
|
|
|
|
2024-11-19 18:35:01 +00:00
|
|
|
for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) {
|
|
|
|
writer.write(domainProcessor.sideloadProcessing(stream, 0));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-11-19 20:00:18 +00:00
|
|
|
processHeartbeat.progress(LiveCrawlState.LOADING);
|
2024-11-19 18:35:01 +00:00
|
|
|
|
|
|
|
LoaderInputData lid = new LoaderInputData(tempPath, 1);
|
|
|
|
|
|
|
|
DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(domainQueries);
|
|
|
|
|
|
|
|
keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid);
|
|
|
|
documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, lid);
|
|
|
|
|
|
|
|
keywordLoaderService.close();
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
FileUtils.deleteDirectory(tempPath.toFile());
|
|
|
|
}
|
|
|
|
|
|
|
|
// Construct the index
|
|
|
|
|
2024-11-19 20:00:18 +00:00
|
|
|
processHeartbeat.progress(LiveCrawlState.DONE);
|
2024-11-19 18:35:01 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private LiveCrawlInstructions fetchInstructions() throws Exception {
|
|
|
|
|
|
|
|
var inbox = messageQueueFactory.createSingleShotInbox(LIVE_CRAWLER_INBOX, node, UUID.randomUUID());
|
|
|
|
|
|
|
|
logger.info("Waiting for instructions");
|
|
|
|
|
|
|
|
var msgOpt = getMessage(inbox, LiveCrawlRequest.class.getSimpleName());
|
|
|
|
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
|
|
|
|
|
|
|
|
// for live crawl, request is empty for now
|
|
|
|
LiveCrawlRequest request = gson.fromJson(msg.payload(), LiveCrawlRequest.class);
|
|
|
|
|
2024-11-20 14:36:25 +00:00
|
|
|
return new LiveCrawlInstructions(msg, inbox);
|
2024-11-19 18:35:01 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
|
|
|
|
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
|
|
|
|
if (opt.isPresent()) {
|
|
|
|
if (!opt.get().function().equals(expectedFunction)) {
|
|
|
|
throw new RuntimeException("Unexpected function: " + opt.get().function());
|
|
|
|
}
|
|
|
|
return opt;
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
|
|
|
|
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
|
|
|
|
return stolenMessage;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private static class LiveCrawlInstructions {
|
|
|
|
private final MqMessage message;
|
|
|
|
private final MqSingleShotInbox inbox;
|
|
|
|
|
|
|
|
LiveCrawlInstructions(MqMessage message,
|
2024-11-20 14:36:25 +00:00
|
|
|
MqSingleShotInbox inbox)
|
2024-11-19 18:35:01 +00:00
|
|
|
{
|
|
|
|
this.message = message;
|
|
|
|
this.inbox = inbox;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public void ok() {
|
|
|
|
inbox.sendResponse(message, MqInboxResponse.ok());
|
|
|
|
}
|
|
|
|
public void err() {
|
|
|
|
inbox.sendResponse(message, MqInboxResponse.err());
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|