(crawler) WIP

This commit is contained in:
Viktor Lofgren 2023-07-20 21:05:16 +02:00
parent 08ca6399ec
commit f91d92cccb
37 changed files with 1186 additions and 138 deletions

View File

@ -6,6 +6,6 @@ import nu.marginalia.db.storage.model.FileStorageId;
/** A request to start a crawl */
@AllArgsConstructor
public class CrawlRequest {
FileStorageId specStorage;
FileStorageId crawlStorage;
public FileStorageId specStorage;
public FileStorageId crawlStorage;
}

View File

@ -11,6 +11,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermissions;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
/** Manages file storage for processes and services
@ -63,6 +65,49 @@ public class FileStorageService {
return null;
}
public void relateFileStorages(FileStorageId source, FileStorageId target) {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
INSERT INTO FILE_STORAGE_RELATION(SOURCE_ID, TARGET_ID) VALUES (?, ?)
""")) {
stmt.setLong(1, source.id());
stmt.setLong(2, target.id());
stmt.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public List<FileStorage> getSourceFromStorage(FileStorage storage) throws SQLException {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT SOURCE_ID FROM FILE_STORAGE_RELATION WHERE TARGET_ID = ?
""")) {
stmt.setLong(1, storage.id().id());
var rs = stmt.executeQuery();
List<FileStorage> ret = new ArrayList<>();
while (rs.next()) {
ret.add(getStorage(new FileStorageId(rs.getLong(1))));
}
return ret;
}
}
public List<FileStorage> getTargetFromStorage(FileStorage storage) throws SQLException {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT TARGET_ID FROM FILE_STORAGE_RELATION WHERE SOURCE_ID = ?
""")) {
stmt.setLong(1, storage.id().id());
var rs = stmt.executeQuery();
List<FileStorage> ret = new ArrayList<>();
while (rs.next()) {
ret.add(getStorage(new FileStorageId(rs.getLong(1))));
}
return ret;
}
}
/** @return the storage base with the given type, or null if it does not exist */
public FileStorageBase getStorageBase(FileStorageBaseType type) throws SQLException {
try (var conn = dataSource.getConnection();
@ -153,13 +198,7 @@ public class FileStorageService {
var rs = query.executeQuery();
if (rs.next()) {
return new FileStorage(
new FileStorageId(rs.getLong("ID")),
base,
type,
tempDir.toString(),
description
);
return getStorage(new FileStorageId(rs.getLong("ID")));
}
}

View File

@ -1,6 +1,9 @@
package nu.marginalia.db.storage.model;
public record FileStorageId(long id) {
public static FileStorageId parse(String str) {
return new FileStorageId(Long.parseLong(str));
}
public static FileStorageId of(int storageId) {
return new FileStorageId(storageId);
}

View File

@ -23,6 +23,14 @@ CREATE TABLE IF NOT EXISTS FILE_STORAGE (
CHARACTER SET utf8mb4
COLLATE utf8mb4_bin;
CREATE TABLE IF NOT EXISTS FILE_STORAGE_RELATION (
SOURCE_ID BIGINT NOT NULL,
TARGET_ID BIGINT NOT NULL,
CONSTRAINT CONS UNIQUE (SOURCE_ID, TARGET_ID),
FOREIGN KEY (SOURCE_ID) REFERENCES FILE_STORAGE(ID) ON DELETE CASCADE,
FOREIGN KEY (TARGET_ID) REFERENCES FILE_STORAGE(ID) ON DELETE CASCADE
);
CREATE VIEW FILE_STORAGE_VIEW
AS SELECT
CONCAT(BASE.PATH, '/', STORAGE.PATH) AS PATH,

View File

@ -64,7 +64,6 @@ public class CrawledDomainReader {
return Optional.of(read(path));
}
catch (Exception ex) {
logger.warn("Failed to read domain " + path, ex);
return Optional.empty();
}
}

View File

@ -14,12 +14,15 @@ import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
public class CrawledDomainWriter implements AutoCloseable {
private final Path outputDir;
private final Gson gson = GsonFactory.get();
private static final Logger logger = LoggerFactory.getLogger(CrawledDomainWriter.class);
private final Writer writer;
private final Path tmpFile;
private final Path outputFile;
public CrawledDomainWriter(Path outputDir, String name, String id) throws IOException {
@ -29,8 +32,10 @@ public class CrawledDomainWriter implements AutoCloseable {
throw new IllegalArgumentException("Output dir " + outputDir + " does not exist");
}
tmpFile = getOutputFile(id, name + "_tmp");
outputFile = getOutputFile(id, name);
writer = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(outputFile))));
writer = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(tmpFile,
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING))));
}
public Path getOutputFile() {
@ -46,32 +51,12 @@ public class CrawledDomainWriter implements AutoCloseable {
}
private Path getOutputFile(String id, String name) throws IOException {
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = outputDir.resolve(first).resolve(second);
if (!Files.exists(destDir)) {
Files.createDirectories(destDir);
}
return destDir.resolve(id + "-" + filesystemSafeName(name) + ".zstd");
}
private String filesystemSafeName(String name) {
StringBuilder nameSaneBuilder = new StringBuilder();
name.chars()
.map(Character::toLowerCase)
.map(c -> (c & ~0x7F) == 0 ? c : 'X')
.map(c -> (Character.isDigit(c) || Character.isAlphabetic(c) || c == '.') ? c : 'X')
.limit(128)
.forEach(c -> nameSaneBuilder.append((char) c));
return nameSaneBuilder.toString();
return CrawlerOutputFile.createOutputPath(outputDir, id, name);
}
@Override
public void close() throws IOException {
Files.move(tmpFile, outputFile, StandardCopyOption.REPLACE_EXISTING);
writer.close();
}
}

View File

@ -0,0 +1,53 @@
package nu.marginalia.crawling.io;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class CrawlerOutputFile {
public static Path getOutputFile(Path base, CrawlingSpecification spec) {
return getOutputFile(base, spec.id, spec.domain);
}
/** Return the Path to a file for the given id and name */
public static Path getOutputFile(Path base, String id, String name) {
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = base.resolve(first).resolve(second);
return destDir.resolve(id + "-" + filesystemSafeName(name) + ".zstd");
}
/** Return the Path to a file for the given id and name, creating the prerequisite
* directory structure as necessary. */
public static Path createOutputPath(Path base, String id, String name) throws IOException {
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = base.resolve(first).resolve(second);
if (!Files.exists(destDir)) {
Files.createDirectories(destDir);
}
return destDir.resolve(id + "-" + filesystemSafeName(name) + ".zstd");
}
private static String filesystemSafeName(String name) {
StringBuilder nameSaneBuilder = new StringBuilder();
name.chars()
.map(Character::toLowerCase)
.map(c -> (c & ~0x7F) == 0 ? c : 'X')
.map(c -> (Character.isDigit(c) || Character.isAlphabetic(c) || c == '.') ? c : 'X')
.limit(128)
.forEach(c -> nameSaneBuilder.append((char) c));
return nameSaneBuilder.toString();
}
}

View File

@ -27,6 +27,8 @@ public class CrawledDocument implements SerializableCrawlData {
public String canonicalUrl;
public String redirectUrl;
public String recrawlState;
public static final String SERIAL_IDENTIFIER = "// DOCUMENT";
@Override
public String getSerialIdentifier() {

View File

@ -3,10 +3,12 @@ package nu.marginalia.crawling.model.spec;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.crawling.model.CrawledDomain;
import java.util.List;
@AllArgsConstructor @NoArgsConstructor @Builder
@AllArgsConstructor @NoArgsConstructor @Builder @With
public class CrawlingSpecification {
public String id;
@ -16,6 +18,8 @@ public class CrawlingSpecification {
public String domain;
public List<String> urls;
public CrawledDomain oldData;
@Override
public String toString() {
return String.format(getClass().getSimpleName() + "[" + id + "/" + domain + ": " + crawlDepth + "[ " + urls.size() + "]");

View File

@ -138,7 +138,7 @@ public class ConverterMain {
// Advance the progress bar to the current position if this is a resumption
processedDomains.set(processLog.countFinishedJobs());
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
for (var domain : plan.domainsIterable(id -> !processLog.isJobFinished(id)))
{

View File

@ -113,7 +113,8 @@ public class ConvertingIntegrationTest {
BigString.encode(readClassPathFile(p.toString())),
Double.toString(Math.random()),
"https://memex.marginalia.nu/" + file,
null
null,
""
);
docs.add(doc);
}

View File

@ -27,9 +27,12 @@ dependencies {
implementation project(':code:common:service')
implementation project(':code:libraries:big-string')
implementation project(':code:api:index-api')
implementation project(':code:api:process-mqapi')
implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client')
implementation project(':code:common:message-queue')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh')
implementation project(':code:process-models:crawling-model')
implementation project(':code:process-models:converting-model')

View File

@ -0,0 +1,72 @@
package nu.marginalia.crawl;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.Semaphore;
public class CrawlLimiter {
public static final int maxPoolSize = Integer.getInteger("crawler.pool-size", 512);
// We'll round up to this size when we're crawling a new domain to prevent
// too many concurrent connections
public static final int minCrawlDataSizeKb = 128; // 100 Kb
// The largest size on disk where we'll permit a refresh crawl
// (these files easily grow into the gigabytes, we don't want that in RAM)
public static final int maxRefreshableCrawlDataSizeKBytes = 1024*128; // 128 Mb
// This limits how many concurrent crawl tasks we can have running at once
// based on their size on disk. The on-disk size is compressed, and the
// in-ram size is partially compressed (i.e. only the document body); so
// maybe a fair estimate is something like 2-4x this figure for RAM usage
//
public static final int maxConcurrentCrawlTaskSizeKb = 512*1024; // 512 Mb
static {
// Sanity check; if this is false we'll get a deadlock on taskSemRAM
assert maxConcurrentCrawlTaskSizeKb >= maxRefreshableCrawlDataSizeKBytes
: "maxConcurrentCrawlTaskSizeKb must be larger than maxRefreshableCrawlDataSizeKBytes";
}
public record CrawlTaskLimits(Path refreshPath, boolean isRefreshable, int taskSize) {}
// We use two semaphores to keep track of the number of concurrent crawls;
// first a RAM sempahore to limit the amount of RAM used by refresh crawls.
// then a count semaphore to limit the number of concurrent threads (this keeps the connection count manageable)
private final Semaphore taskSemRAM = new Semaphore(maxConcurrentCrawlTaskSizeKb);
private final Semaphore taskSemCount = new Semaphore(maxPoolSize);
public CrawlTaskLimits getTaskLimits(Path fileName) {
long size;
try {
size = Math.max(minCrawlDataSizeKb, Files.size(fileName) / 1024);
} catch (IOException ex) {
// If we can't read the file, we'll assume it's small since we won't be able to read it later for the refresh either
return new CrawlTaskLimits(null,false, minCrawlDataSizeKb);
}
// We'll only permit refresh crawls if the file is small enough
boolean isRefreshable = size < maxRefreshableCrawlDataSizeKBytes;
// We'll truncate this down to maxRefreshableCrawlDataSizeKBytes to ensure
// it's possible to acquire the RAM semaphore
int effectiveSize = (int) Math.min(maxRefreshableCrawlDataSizeKBytes, size);
return new CrawlTaskLimits(fileName, isRefreshable, effectiveSize);
}
public void acquire(CrawlTaskLimits properties) throws InterruptedException {
// It's very important that we acquire the RAM semaphore first to avoid a deadlock
taskSemRAM.acquire(properties.taskSize);
taskSemCount.acquire(1);
}
public void release(CrawlTaskLimits properties) {
taskSemCount.release(1);
taskSemRAM.release(properties.taskSize);
}
}

View File

@ -1,13 +1,23 @@
package nu.marginalia.crawl;
import nu.marginalia.ProcessConfiguration;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.CrawlerOutputFile;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
import plan.CrawlPlanLoader;
import plan.CrawlPlan;
import nu.marginalia.crawling.io.CrawledDomainWriter;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
@ -19,49 +29,63 @@ import okhttp3.internal.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static nu.marginalia.mqapi.ProcessInboxNames.CRAWLER_INBOX;
public class CrawlerMain implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final CrawlPlan plan;
private final Path crawlDataDir;
private final WorkLog workLog;
private Path crawlDataDir;
private WorkLog workLog;
private final ProcessHeartbeat heartbeat;
private final ConnectionPool connectionPool = new ConnectionPool(5, 10, TimeUnit.SECONDS);
private final Dispatcher dispatcher = new Dispatcher(new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", true)));
private final UserAgent userAgent;
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final Gson gson;
private final ThreadPoolExecutor pool;
final int poolSize = Integer.getInteger("crawler.pool-size", 512);
final int poolQueueSize = 32;
public final CrawlLimiter crawlLimiter = new CrawlLimiter();
private final Set<String> processedIds = new HashSet<>();
AbortMonitor abortMonitor = AbortMonitor.getInstance();
Semaphore taskSem = new Semaphore(poolSize);
final AbortMonitor abortMonitor = AbortMonitor.getInstance();
private static ProcessHeartbeat heartbeat;
volatile int totalTasks;
final AtomicInteger tasksDone = new AtomicInteger(0);
public CrawlerMain(CrawlPlan plan) throws Exception {
this.plan = plan;
this.userAgent = WmsaHome.getUserAgent();
@Inject
public CrawlerMain(UserAgent userAgent,
ProcessHeartbeat heartbeat,
MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService,
Gson gson) {
this.heartbeat = heartbeat;
this.userAgent = userAgent;
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.gson = gson;
// Ensure that the user agent is set for Java's HTTP requests
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(poolQueueSize);
pool = new ThreadPoolExecutor(poolSize/128, poolSize, 5, TimeUnit.MINUTES, queue); // maybe need to set -Xss for JVM to deal with this?
workLog = plan.createCrawlWorkLog();
crawlDataDir = plan.crawl.getDir();
// maybe need to set -Xss for JVM to deal with this?
pool = new ThreadPoolExecutor(
CrawlLimiter.maxPoolSize /128,
CrawlLimiter.maxPoolSize,
5, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(32)
);
}
public static void main(String... args) throws Exception {
@ -77,46 +101,65 @@ public class CrawlerMain implements AutoCloseable {
System.setProperty("sun.net.client.defaultConnectTimeout", "30000");
System.setProperty("sun.net.client.defaultReadTimeout", "30000");
if (args.length != 1) {
System.err.println("Arguments: crawl-plan.yaml");
System.exit(0);
}
var plan = new CrawlPlanLoader().load(Path.of(args[0]));
Injector injector = Guice.createInjector(
new CrawlerModule(),
new DatabaseModule()
);
var crawler = injector.getInstance(CrawlerMain.class);
heartbeat = new ProcessHeartbeat(new ProcessConfiguration("crawler", 0, UUID.randomUUID()),
new DatabaseModule().provideConnection());
var instructions = crawler.fetchInstructions();
try {
crawler.run(instructions.getPlan());
instructions.ok();
}
catch (Exception ex) {
System.err.println("Crawler failed");
ex.printStackTrace();
instructions.err();
}
try (var crawler = new CrawlerMain(plan)) {
heartbeat.start();
crawler.run();
}
finally {
heartbeat.shutDown();
}
TimeUnit.SECONDS.sleep(5);
System.exit(0);
}
public void run() throws InterruptedException {
// First a validation run to ensure the file is all good to parse
logger.info("Validating JSON");
int countTotal = 0;
int countProcessed = 0;
public void run(CrawlPlan plan) throws InterruptedException, IOException {
for (var unused : plan.crawlingSpecificationIterable()) {
countTotal++;
heartbeat.start();
try {
// First a validation run to ensure the file is all good to parse
logger.info("Validating JSON");
workLog = plan.createCrawlWorkLog();
crawlDataDir = plan.crawl.getDir();
int countTotal = 0;
for (var unused : plan.crawlingSpecificationIterable()) {
countTotal++;
}
totalTasks = countTotal;
logger.info("Let's go");
for (var spec : plan.crawlingSpecificationIterable()) {
startCrawlTask(plan, spec);
}
pool.shutdown();
do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
}
logger.info("Let's go");
for (var spec : plan.crawlingSpecificationIterable()) {
heartbeat.setProgress(countProcessed / (double) countTotal);
startCrawlTask(spec);
finally {
heartbeat.shutDown();
}
}
CrawledDomainReader reader = new CrawledDomainReader();
private void startCrawlTask(CrawlingSpecification crawlingSpecification) {
private void startCrawlTask(CrawlPlan plan, CrawlingSpecification crawlingSpecification) {
if (!processedIds.add(crawlingSpecification.id)) {
@ -132,28 +175,41 @@ public class CrawlerMain implements AutoCloseable {
return;
}
var limits = crawlLimiter.getTaskLimits(CrawlerOutputFile.getOutputFile(crawlDataDir, crawlingSpecification));
try {
taskSem.acquire();
crawlLimiter.acquire(limits);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
pool.execute(() -> {
try {
fetchDomain(crawlingSpecification);
fetchDomain(crawlingSpecification, limits);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
}
finally {
taskSem.release();
crawlLimiter.release(limits);
}
});
}
private void fetchDomain(CrawlingSpecification specification) {
private void fetchDomain(CrawlingSpecification specification, CrawlLimiter.CrawlTaskLimits limits) {
if (workLog.isJobFinished(specification.id))
return;
HttpFetcher fetcher = new HttpFetcherImpl(userAgent.uaString(), dispatcher, connectionPool);
// Read the previous crawl's data for this domain, if it exists and has a reasonable size
Optional<CrawledDomain> domain;
if (limits.isRefreshable()) {
domain = reader.readOptionally(limits.refreshPath());
if (domain.isPresent()) {
specification = specification.withOldData(domain.get());
}
}
try (CrawledDomainWriter writer = new CrawledDomainWriter(crawlDataDir, specification.domain, specification.id)) {
var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept);
@ -167,6 +223,65 @@ public class CrawlerMain implements AutoCloseable {
}
}
private static class CrawlRequest {
private final CrawlPlan plan;
private final MqMessage message;
private final MqSingleShotInbox inbox;
CrawlRequest(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) {
this.plan = plan;
this.message = message;
this.inbox = inbox;
}
public CrawlPlan getPlan() {
return plan;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private CrawlRequest fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(CRAWLER_INBOX, UUID.randomUUID());
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.crawling.CrawlRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class);
var specData = fileStorageService.getStorage(request.specStorage);
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var plan = new CrawlPlan(specData.asPath().resolve("crawler.spec").toString(),
new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"),
null);
return new CrawlRequest(plan, msg, inbox);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
public void close() throws Exception {
logger.info("Awaiting termination");
pool.shutdown();
@ -176,8 +291,6 @@ public class CrawlerMain implements AutoCloseable {
workLog.close();
dispatcher.executorService().shutdownNow();
}
}

View File

@ -0,0 +1,24 @@
package nu.marginalia.crawl;
import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import lombok.SneakyThrows;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.model.gson.GsonFactory;
import java.util.UUID;
public class CrawlerModule extends AbstractModule {
@SneakyThrows
public void configure() {
bind(Gson.class).toInstance(createGson());
bind(UserAgent.class).toInstance(WmsaHome.getUserAgent());
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("crawler", 0, UUID.randomUUID()));
}
private Gson createGson() {
return GsonFactory.get();
}
}

View File

@ -0,0 +1,123 @@
package nu.marginalia.crawl.retreival;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.*;
import java.util.stream.Collectors;
/** A reference to a domain that has been crawled before. */
public class CrawlDataReference {
private final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class);
final Map<EdgeUrl, CrawledDocument> documents;
final Map<EdgeUrl, String> etags;
final Map<EdgeUrl, String> lastModified;
final Set<EdgeUrl> previouslyDeadUrls = new HashSet<>();
CrawlDataReference(CrawledDomain referenceDomain) {
if (referenceDomain == null || referenceDomain.doc == null) {
documents = Collections.emptyMap();
etags = Collections.emptyMap();
lastModified = Collections.emptyMap();
return;
}
documents = new HashMap<>(referenceDomain.doc.size());
etags = new HashMap<>(referenceDomain.doc.size());
lastModified = new HashMap<>(referenceDomain.doc.size());
for (var doc : referenceDomain.doc) {
try {
addReference(doc);
} catch (URISyntaxException ex) {
logger.warn("Failed to add reference document {}", doc.url);
}
}
}
private void addReference(CrawledDocument doc) throws URISyntaxException {
var url = new EdgeUrl(doc.url);
if (doc.httpStatus == 404) {
previouslyDeadUrls.add(url);
return;
}
if (doc.httpStatus != 200) {
return;
}
documents.put(url, doc);
String headers = doc.headers;
if (headers != null) {
String[] headersLines = headers.split("\n");
String lastmod = null;
String etag = null;
for (String line : headersLines) {
if (line.toLowerCase().startsWith("etag:")) {
etag = line.substring(5).trim();
}
if (line.toLowerCase().startsWith("last-modified:")) {
lastmod = line.substring(14).trim();
}
}
if (lastmod != null) {
lastModified.put(url, lastmod);
}
if (etag != null) {
etags.put(url, etag);
}
}
}
public boolean isPreviouslyDead(EdgeUrl url) {
return previouslyDeadUrls.contains(url);
}
public int size() {
return documents.size();
}
public String getEtag(EdgeUrl url) {
return etags.get(url);
}
public String getLastModified(EdgeUrl url) {
return lastModified.get(url);
}
public Map<EdgeUrl, CrawledDocument> allDocuments() {
return documents;
}
public Map<EdgeUrl, CrawledDocument> sample(int sampleSize) {
return documents.entrySet().stream().limit(sampleSize).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public void evict() {
documents.clear();
etags.clear();
lastModified.clear();
}
public CrawledDocument getDoc(EdgeUrl top) {
return documents.get(top);
}
// This bit of manual housekeeping is needed to keep the memory footprint low
public void dispose(EdgeUrl url) {
documents.remove(url);
etags.remove(url);
lastModified.remove(url);
}
}

View File

@ -10,6 +10,7 @@ import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.crawling.model.*;
import nu.marginalia.ip_blocklist.UrlBlocklist;
import nu.marginalia.lsh.EasyLSH;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.jsoup.Jsoup;
@ -57,6 +58,7 @@ public class CrawlerRetreiver {
private final SitemapRetriever sitemapRetriever;
private final DomainCrawlFrontier crawlFrontier;
private final CrawlDataReference oldCrawlData;
int errorCount = 0;
@ -64,6 +66,7 @@ public class CrawlerRetreiver {
CrawlingSpecification specs,
Consumer<SerializableCrawlData> writer) {
this.fetcher = fetcher;
this.oldCrawlData = new CrawlDataReference(specs.oldData);
id = specs.id;
domain = specs.domain;
@ -73,9 +76,9 @@ public class CrawlerRetreiver {
this.crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), specs.urls, specs.crawlDepth);
sitemapRetriever = fetcher.createSitemapRetriever();
// We must always crawl the index page first, this is assumed when fingerprinting the server
var fst = crawlFrontier.peek();
if (fst != null) {
// Ensure the index page is always crawled
var root = fst.withPathAndParam("/", null);
@ -141,6 +144,29 @@ public class CrawlerRetreiver {
var robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain);
CrawlDataComparison comparison = compareWithOldData(robotsRules);
logger.info("Comparison result for {} : {}", domain, comparison);
// If we have reference data, we will always grow the crawl depth a bit
if (oldCrawlData.size() > 0) {
crawlFrontier.increaseDepth(1.5);
}
// When the reference data doesn't appear to have changed, we'll forego
// re-fetching it and just use the old data
if (comparison == CrawlDataComparison.NO_CHANGES) {
oldCrawlData.allDocuments().forEach((url, doc) -> {
if (crawlFrontier.addVisited(url)) {
doc.recrawlState = "RETAINED";
crawledDomainWriter.accept(doc);
}
});
// We don't need to hold onto this in RAM anymore
oldCrawlData.evict();
}
downloadSitemaps(robotsRules);
sniffRootDocument();
@ -161,18 +187,31 @@ public class CrawlerRetreiver {
continue;
}
// Don't re-fetch links that were previously found dead as it's very unlikely that a
// 404:ing link will suddenly start working at a later point
if (oldCrawlData.isPreviouslyDead(top))
continue;
// Check the link filter if the endpoint should be fetched based on site-type
if (!crawlFrontier.filterLink(top))
continue;
// Check vs blocklist
if (urlBlocklist.isUrlBlocked(top))
continue;
if (!isAllowedProtocol(top.proto))
continue;
// Check if the URL is too long to insert into the DB
if (top.toString().length() > 255)
continue;
if (!crawlFrontier.addVisited(top))
continue;
if (fetchDocument(top, crawlDelay)) {
if (fetchDocument(top, crawlDelay).isPresent()) {
fetchedCount++;
}
}
@ -184,6 +223,76 @@ public class CrawlerRetreiver {
return fetchedCount;
}
private CrawlDataComparison compareWithOldData(SimpleRobotRules robotsRules) {
int numGoodDocuments = oldCrawlData.size();
if (numGoodDocuments == 0)
return CrawlDataComparison.NO_OLD_DATA;
if (numGoodDocuments < 10)
return CrawlDataComparison.SMALL_SAMPLE;
// We fetch a sample of the data to assess how much it has changed
int sampleSize = (int) Math.min(20, 0.25 * numGoodDocuments);
Map<EdgeUrl, CrawledDocument> referenceUrls = oldCrawlData.sample(sampleSize);
int differences = 0;
long crawlDelay = robotsRules.getCrawlDelay();
for (var url : referenceUrls.keySet()) {
var docMaybe = fetchDocument(url, crawlDelay);
if (docMaybe.isEmpty()) {
differences++;
continue;
}
var newDoc = docMaybe.get();
var referenceDoc = referenceUrls.get(url);
// This looks like a bug but it is not, we want to compare references
// to detect if the page has bounced off etag or last-modified headers
// to avoid having to do a full content comparison
if (newDoc == referenceDoc)
continue;
if (newDoc.httpStatus != referenceDoc.httpStatus) {
differences++;
continue;
}
if (newDoc.documentBody == null) {
differences++;
continue;
}
long referenceLsh = hashDoc(referenceDoc);
long newLsh = hashDoc(newDoc);
if (EasyLSH.hammingDistance(referenceLsh, newLsh) > 5) {
differences++;
}
}
if (differences > sampleSize/4) {
return CrawlDataComparison.CHANGES_FOUND;
}
else {
return CrawlDataComparison.NO_CHANGES;
}
}
private static final HashFunction hasher = Hashing.murmur3_128(0);
private long hashDoc(CrawledDocument doc) {
var hash = new EasyLSH();
long val = 0;
for (var b : doc.documentBody.decode().getBytes()) {
val = val << 8 | (b & 0xFF);
hash.addUnordered(hasher.hashLong(val).asLong());
}
return hash.get();
}
private void downloadSitemaps(SimpleRobotRules robotsRules) {
List<String> sitemaps = robotsRules.getSitemaps();
@ -235,7 +344,7 @@ public class CrawlerRetreiver {
try {
logger.debug("Configuring link filter");
var url = crawlFrontier.peek();
var url = crawlFrontier.peek().withPathAndParam("/", null);
var maybeSample = fetchUrl(url).filter(sample -> sample.httpStatus == 200);
if (maybeSample.isEmpty())
@ -273,7 +382,7 @@ public class CrawlerRetreiver {
}
}
private boolean fetchDocument(EdgeUrl top, long crawlDelay) {
private Optional<CrawledDocument> fetchDocument(EdgeUrl top, long crawlDelay) {
logger.debug("Fetching {}", top);
long startTime = System.currentTimeMillis();
@ -282,9 +391,14 @@ public class CrawlerRetreiver {
if (doc.isPresent()) {
var d = doc.get();
crawledDomainWriter.accept(d);
oldCrawlData.dispose(top);
if (d.url != null) {
EdgeUrl.parse(d.url).ifPresent(crawlFrontier::addVisited);
// We may have redirected to a different path
EdgeUrl.parse(d.url).ifPresent(url -> {
crawlFrontier.addVisited(url);
oldCrawlData.dispose(url);
});
}
if ("ERROR".equals(d.crawlerStatus) && d.httpStatus != 404) {
@ -296,7 +410,7 @@ public class CrawlerRetreiver {
long crawledTime = System.currentTimeMillis() - startTime;
delay(crawlDelay, crawledTime);
return doc.isPresent();
return doc;
}
private boolean isAllowedProtocol(String proto) {
@ -333,7 +447,20 @@ public class CrawlerRetreiver {
private CrawledDocument fetchContent(EdgeUrl top) {
for (int i = 0; i < 2; i++) {
try {
return fetcher.fetchContent(top);
var doc = fetcher.fetchContent(top, oldCrawlData.getEtag(top), oldCrawlData.getLastModified(top));
doc.recrawlState = "NEW";
if (doc.httpStatus == 304) {
var referenceData = oldCrawlData.getDoc(top);
if (referenceData != null) {
referenceData.recrawlState = "304/UNCHANGED";
return referenceData;
}
}
return doc;
}
catch (RateLimitException ex) {
slowDown = true;
@ -443,4 +570,12 @@ public class CrawlerRetreiver {
.build();
}
enum CrawlDataComparison {
NO_OLD_DATA,
SMALL_SAMPLE,
CHANGES_FOUND,
NO_CHANGES
};
}

View File

@ -17,7 +17,7 @@ public class DomainCrawlFrontier {
private Predicate<EdgeUrl> linkFilter = url -> true;
final int depth;
private int depth;
public DomainCrawlFrontier(EdgeDomain thisDomain, Collection<String> urls, int depth) {
this.thisDomain = thisDomain;
@ -32,6 +32,9 @@ public class DomainCrawlFrontier {
}
}
public void increaseDepth(double depthIncreaseFactor) {
depth = (int)(depth * depthIncreaseFactor);
}
public void setLinkFilter(Predicate<EdgeUrl> linkFilter) {
this.linkFilter = linkFilter;
}
@ -80,6 +83,9 @@ public class DomainCrawlFrontier {
if (queue.size() + visited.size() >= depth + 100)
return;
if (visited.contains(url.toString()))
return;
if (known.add(url.toString())) {
queue.addLast(url);
}

View File

@ -18,7 +18,7 @@ public interface HttpFetcher {
FetchResult probeDomain(EdgeUrl url);
CrawledDocument fetchContent(EdgeUrl url) throws RateLimitException;
CrawledDocument fetchContent(EdgeUrl url, String etag, String lastMod) throws RateLimitException;
SimpleRobotRules fetchRobotRules(EdgeDomain domain);

View File

@ -125,29 +125,20 @@ public class HttpFetcherImpl implements HttpFetcher {
}
}
private Request createHeadRequest(EdgeUrl url) {
return new Request.Builder().head().addHeader("User-agent", userAgent)
.url(url.toString())
.addHeader("Accept-Encoding", "gzip")
.build();
}
private Request createGetRequest(EdgeUrl url) {
return new Request.Builder().get().addHeader("User-agent", userAgent)
.url(url.toString())
.addHeader("Accept-Encoding", "gzip")
.build();
}
@Override
@SneakyThrows
public CrawledDocument fetchContent(EdgeUrl url) throws RateLimitException {
public CrawledDocument fetchContent(EdgeUrl url, String etag, String lastMod) throws RateLimitException {
if (contentTypeLogic.isUrlLikeBinary(url)) {
logger.debug("Probing suspected binary {}", url);
var head = createHeadRequest(url);
var headBuilder = new Request.Builder().head()
.addHeader("User-agent", userAgent)
.url(url.toString())
.addHeader("Accept-Encoding", "gzip");
var head = headBuilder.build();
var call = client.newCall(head);
try (var rsp = call.execute()) {
@ -165,7 +156,15 @@ public class HttpFetcherImpl implements HttpFetcher {
}
}
var get = createGetRequest(url);
var getBuilder = new Request.Builder().get();
getBuilder.addHeader("User-agent", userAgent)
.url(url.toString())
.addHeader("Accept-Encoding", "gzip");
if (etag != null) getBuilder.addHeader("If-None-Match", etag);
if (lastMod != null) getBuilder.addHeader("If-Modified-Since", lastMod);
var get = getBuilder.build();
var call = client.newCall(get);
try (var rsp = call.execute()) {
@ -315,7 +314,7 @@ public class HttpFetcherImpl implements HttpFetcher {
private Optional<SimpleRobotRules> fetchRobotsForProto(String proto, EdgeDomain domain) {
try {
var url = new EdgeUrl(proto, domain, null, "/robots.txt", null);
return Optional.of(parseRobotsTxt(fetchContent(url)));
return Optional.of(parseRobotsTxt(fetchContent(url, null, null)));
}
catch (Exception ex) {
return Optional.empty();

View File

@ -29,14 +29,14 @@ class HttpFetcherTest {
@Test
void fetchUTF8() throws URISyntaxException, RateLimitException {
var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler");
var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"));
var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), null, null);
System.out.println(str.contentType);
}
@Test
void fetchText() throws URISyntaxException, RateLimitException {
var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler");
var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"));
var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), null, null);
System.out.println(str);
}
}

View File

@ -33,7 +33,6 @@ public class CrawlerMockFetcherTest {
Map<EdgeUrl, CrawledDocument> mockData = new HashMap<>();
HttpFetcher fetcherMock = new MockFetcher();
SitemapRetriever sitemapRetriever = new SitemapRetriever();
@AfterEach
public void tearDown() {
@ -74,7 +73,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://startrek.website/c/startrek"), "mock-crawl-data/lemmy/c_startrek.html");
registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "startrek.website", new ArrayList<>()), out::add)
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "startrek.website", new ArrayList<>(), null), out::add)
.withNoDelay()
.fetch();
@ -87,7 +86,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "en.wikipedia.org", new ArrayList<>()), out::add)
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "en.wikipedia.org", new ArrayList<>(), null), out::add)
.withNoDelay()
.fetch();
@ -102,7 +101,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/telegram-channel-to-idle-on/3501"), "mock-crawl-data/discourse/telegram.html");
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 100, "community.tt-rss.org", new ArrayList<>()), out::add)
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 100, "community.tt-rss.org", new ArrayList<>(), null), out::add)
.withNoDelay()
.fetch();
@ -127,7 +126,7 @@ public class CrawlerMockFetcherTest {
}
@Override
public CrawledDocument fetchContent(EdgeUrl url) {
public CrawledDocument fetchContent(EdgeUrl url, String etag, String lastModified) {
logger.info("Fetching {}", url);
if (mockData.containsKey(url)) {
return mockData.get(url);

View File

@ -6,12 +6,15 @@ import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.crawling.model.SerializableCrawlData;
import org.junit.jupiter.api.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -95,4 +98,36 @@ class CrawlerRetreiverTest {
);
}
@Test
public void testRecrawl() {
var specs = CrawlingSpecification
.builder()
.id("whatever")
.crawlDepth(12)
.domain("www.marginalia.nu")
.urls(List.of("https://www.marginalia.nu/some-dead-link"))
.build();
Map<Class<? extends SerializableCrawlData>, List<SerializableCrawlData>> data = new HashMap<>();
new CrawlerRetreiver(httpFetcher, specs, d -> {
data.computeIfAbsent(d.getClass(), k->new ArrayList<>()).add(d);
if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
}
}).fetch();
CrawledDomain domain = (CrawledDomain) data.get(CrawledDomain.class).get(0);
domain.doc = data.get(CrawledDocument.class).stream().map(CrawledDocument.class::cast).collect(Collectors.toList());
var newSpec = specs.withOldData(domain);
new CrawlerRetreiver(httpFetcher, newSpec, d -> {
if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
}
}).fetch();
}
}

View File

@ -82,6 +82,8 @@ public class ControlService extends Service {
Spark.post("/public/fsms/:fsm/start", controlActorService::startFsm, redirectToProcesses);
Spark.post("/public/fsms/:fsm/stop", controlActorService::stopFsm, redirectToProcesses);
Spark.post("/public/storage/:fid/crawl", controlActorService::triggerCrawling, redirectToProcesses);
Spark.post("/public/storage/:fid/recrawl", controlActorService::triggerRecrawling, redirectToProcesses);
Spark.post("/public/storage/:fid/process", controlActorService::triggerProcessing, redirectToProcesses);
Spark.post("/public/storage/:fid/load", controlActorService::loadProcessedData, redirectToProcesses);

View File

@ -4,6 +4,8 @@ import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.control.actor.task.CrawlActor;
import nu.marginalia.control.actor.task.RecrawlActor;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.actor.monitor.*;
import nu.marginalia.control.actor.monitor.ConverterMonitorActor;
@ -22,6 +24,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/** This class is responsible for starting and stopping the various actors in the controller service */
@Singleton
public class ControlActors {
private final ServiceEventLog eventLog;
@ -35,7 +38,10 @@ public class ControlActors {
GsonFactory gsonFactory,
BaseServiceParams baseServiceParams,
ReconvertAndLoadActor reconvertAndLoadActor,
CrawlActor crawlActor,
RecrawlActor recrawlActor,
ConverterMonitorActor converterMonitorFSM,
CrawlerMonitorActor crawlerMonitorActor,
LoaderMonitorActor loaderMonitor,
MessageQueueMonitorActor messageQueueMonitor,
ProcessLivenessMonitorActor processMonitorFSM,
@ -45,9 +51,12 @@ public class ControlActors {
this.eventLog = baseServiceParams.eventLog;
this.gson = gsonFactory.get();
register(Actor.CRAWL, crawlActor);
register(Actor.RECRAWL, recrawlActor);
register(Actor.RECONVERT_LOAD, reconvertAndLoadActor);
register(Actor.CONVERTER_MONITOR, converterMonitorFSM);
register(Actor.LOADER_MONITOR, loaderMonitor);
register(Actor.CRAWLER_MONITOR, crawlerMonitorActor);
register(Actor.MESSAGE_QUEUE_MONITOR, messageQueueMonitor);
register(Actor.PROCESS_LIVENESS_MONITOR, processMonitorFSM);
register(Actor.FILE_STORAGE_MONITOR, fileStorageMonitorActor);
@ -100,9 +109,6 @@ public class ControlActors {
Map.Entry::getKey, e -> e.getValue().getState())
);
}
public MachineState getActorStates(Actor actor) {
return stateMachines.get(actor).getState();
}
public AbstractStateGraph getActorDefinition(Actor actor) {
return actorDefinitions.get(actor);

View File

@ -64,17 +64,28 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph {
description = """
Monitors the inbox of the process for messages.
If a message is found, transition to RUN.
The state takes an optional Integer parameter errorAttempts
that is passed to run. errorAttempts is set to zero after
a few seconds of silence.
"""
)
public void monitor() throws SQLException, InterruptedException {
public void monitor(Integer errorAttempts) throws SQLException, InterruptedException {
if (errorAttempts == null) {
errorAttempts = 0;
}
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
TimeUnit.SECONDS.sleep(5);
if (errorAttempts > 0) { // Reset the error counter if there is silence in the inbox
transition(MONITOR, 0);
}
// else continue
} else {
transition(RUN);
transition(RUN, errorAttempts);
}
}
}
@ -87,7 +98,7 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph {
If the process fails, retransition to RUN up to MAX_ATTEMPTS times.
After MAX_ATTEMPTS at restarting the process, transition to ERROR.
If the process is cancelled, transition to ABORTED.
If the process is successful, transition to MONITOR.
If the process is successful, transition to MONITOR(errorAttempts).
"""
)
public void run(Integer attempts) throws Exception {
@ -108,7 +119,7 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph {
transition(ABORTED);
}
transition(MONITOR);
transition(MONITOR, attempts);
}
@TerminalState(name = ABORTED, description = "The process was manually aborted")

View File

@ -0,0 +1,25 @@
package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqsm.StateFactory;
@Singleton
public class CrawlerMonitorActor extends AbstractProcessSpawnerActor {
@Inject
public CrawlerMonitorActor(StateFactory stateFactory,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory,
persistence,
processService,
ProcessInboxNames.CRAWLER_INBOX,
ProcessService.ProcessId.CRAWLER);
}
}

View File

@ -0,0 +1,171 @@
package nu.marginalia.control.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.control.svc.ProcessOutboxFactory;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorageBaseType;
import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.mqapi.crawling.CrawlRequest;
import nu.marginalia.mqapi.loading.LoadRequest;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Singleton
public class CrawlActor extends AbstractStateGraph {
// STATES
public static final String INITIAL = "INITIAL";
public static final String CRAWL = "CRAWL";
public static final String CRAWL_WAIT = "CRAWL-WAIT";
public static final String END = "END";
private final ProcessService processService;
private final MqOutbox mqCrawlerOutbox;
private final FileStorageService storageService;
private final Gson gson;
private final Logger logger = LoggerFactory.getLogger(getClass());
@AllArgsConstructor @With @NoArgsConstructor
public static class Message {
public FileStorageId crawlSpecId = null;
public FileStorageId crawlStorageId = null;
public long crawlerMsgId = 0L;
};
@Inject
public CrawlActor(StateFactory stateFactory,
ProcessService processService,
ProcessOutboxFactory processOutboxFactory,
FileStorageService storageService,
Gson gson
)
{
super(stateFactory);
this.processService = processService;
this.mqCrawlerOutbox = processOutboxFactory.createCrawlerOutbox();
this.storageService = storageService;
this.gson = gson;
}
@GraphState(name = INITIAL,
next = CRAWL,
description = """
Validate the input and transition to CRAWL
""")
public Message init(FileStorageId crawlStorageId) throws Exception {
if (null == crawlStorageId) {
error("This Actor requires a FileStorageId to be passed in as a parameter to INITIAL");
}
var storage = storageService.getStorage(crawlStorageId);
if (storage == null) error("Bad storage id");
if (storage.type() != FileStorageType.CRAWL_SPEC) error("Bad storage type " + storage.type());
return new Message().withCrawlSpecId(crawlStorageId);
}
@GraphState(name = CRAWL,
next = CRAWL_WAIT,
resume = ResumeBehavior.ERROR,
description = """
Allocate a storage area for the crawled data,
then send a crawl request to the crawler and transition to CRAWL_WAIT.
"""
)
public Message crawl(Message message) throws Exception {
// Create processed data area
var toCrawl = storageService.getStorage(message.crawlSpecId);
var base = storageService.getStorageBase(FileStorageBaseType.SLOW);
var dataArea = storageService.allocateTemporaryStorage(
base,
FileStorageType.CRAWL_DATA,
"crawl-data",
toCrawl.description());
storageService.relateFileStorages(toCrawl.id(), dataArea.id());
// Pre-send convert request
var request = new CrawlRequest(message.crawlSpecId, dataArea.id());
long id = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request));
return message
.withCrawlStorageId(dataArea.id())
.withCrawlerMsgId(id);
}
@GraphState(
name = CRAWL_WAIT,
next = END,
resume = ResumeBehavior.RETRY,
description = """
Wait for the crawler to finish retreiving the data.
"""
)
public Message crawlerWait(Message message) throws Exception {
var rsp = waitResponse(mqCrawlerOutbox, ProcessService.ProcessId.CRAWLER, message.crawlerMsgId);
if (rsp.state() != MqMessageState.OK)
error("Crawler failed");
return message;
}
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception {
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
error("Process " + processId + " did not launch");
}
for (;;) {
try {
return outbox.waitResponse(id, 1, TimeUnit.SECONDS);
}
catch (TimeoutException ex) {
// Maybe the process died, wait a moment for it to restart
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
error("Process " + processId + " died and did not re-launch");
}
}
}
}
public boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException {
// Wait for process to start
long deadline = System.currentTimeMillis() + unit.toMillis(duration);
while (System.currentTimeMillis() < deadline) {
if (processService.isRunning(processId))
return true;
TimeUnit.SECONDS.sleep(1);
}
return false;
}
}

View File

@ -118,6 +118,8 @@ public class ReconvertAndLoadActor extends AbstractStateGraph {
var processedArea = storageService.allocateTemporaryStorage(base, FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Data; " + toProcess.description());
storageService.relateFileStorages(toProcess.id(), processedArea.id());
// Pre-send convert request
var request = new ConvertRequest(message.crawlStorageId, processedArea.id());
long id = mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));

View File

@ -0,0 +1,185 @@
package nu.marginalia.control.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.control.svc.ProcessOutboxFactory;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.CrawlRequest;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.sql.SQLException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Singleton
public class RecrawlActor extends AbstractStateGraph {
// STATES
public static final String INITIAL = "INITIAL";
public static final String CRAWL = "CRAWL";
public static final String CRAWL_WAIT = "CRAWL-WAIT";
public static final String END = "END";
private final ProcessService processService;
private final MqOutbox mqCrawlerOutbox;
private final FileStorageService storageService;
private final Gson gson;
private final Logger logger = LoggerFactory.getLogger(getClass());
@AllArgsConstructor @With @NoArgsConstructor
public static class RecrawlMessage {
public FileStorageId crawlSpecId = null;
public FileStorageId crawlStorageId = null;
public long crawlerMsgId = 0L;
};
public static RecrawlMessage recrawlFromCrawlData(FileStorageId crawlData) {
return new RecrawlMessage(null, crawlData, 0L);
}
public static RecrawlMessage recrawlFromCrawlDataAndCralSpec(FileStorageId crawlData, FileStorageId crawlSpec) {
return new RecrawlMessage(crawlSpec, crawlData, 0L);
}
@Inject
public RecrawlActor(StateFactory stateFactory,
ProcessService processService,
ProcessOutboxFactory processOutboxFactory,
FileStorageService storageService,
Gson gson
)
{
super(stateFactory);
this.processService = processService;
this.mqCrawlerOutbox = processOutboxFactory.createCrawlerOutbox();
this.storageService = storageService;
this.gson = gson;
}
@GraphState(name = INITIAL,
next = CRAWL,
description = """
Validate the input and transition to CRAWL
""")
public RecrawlMessage init(RecrawlMessage recrawlMessage) throws Exception {
if (null == recrawlMessage) {
error("This Actor requires a message as an argument");
}
var crawlStorage = storageService.getStorage(recrawlMessage.crawlStorageId);
FileStorage specStorage;
if (recrawlMessage.crawlSpecId != null) {
specStorage = storageService.getStorage(recrawlMessage.crawlSpecId);
}
else {
specStorage = getSpec(crawlStorage).orElse(null);
}
if (specStorage == null) error("Bad storage id");
if (specStorage.type() != FileStorageType.CRAWL_SPEC) error("Bad storage type " + specStorage.type());
if (crawlStorage == null) error("Bad storage id");
if (crawlStorage.type() != FileStorageType.CRAWL_DATA) error("Bad storage type " + specStorage.type());
Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log"));
return recrawlMessage
.withCrawlSpecId(specStorage.id());
}
private Optional<FileStorage> getSpec(FileStorage crawlStorage) throws SQLException {
return storageService.getSourceFromStorage(crawlStorage)
.stream()
.filter(storage -> storage.type().equals(FileStorageType.CRAWL_SPEC))
.findFirst();
}
@GraphState(name = CRAWL,
next = CRAWL_WAIT,
resume = ResumeBehavior.ERROR,
description = """
Send a crawl request to the crawler and transition to CRAWL_WAIT.
"""
)
public RecrawlMessage crawl(RecrawlMessage recrawlMessage) throws Exception {
// Create processed data area
var toCrawl = storageService.getStorage(recrawlMessage.crawlSpecId);
// Pre-send crawl request
var request = new CrawlRequest(recrawlMessage.crawlSpecId, recrawlMessage.crawlStorageId);
long id = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request));
return recrawlMessage.withCrawlerMsgId(id);
}
@GraphState(
name = CRAWL_WAIT,
next = END,
resume = ResumeBehavior.RETRY,
description = """
Wait for the crawler to finish retreiving the data.
"""
)
public RecrawlMessage crawlerWait(RecrawlMessage recrawlMessage) throws Exception {
var rsp = waitResponse(mqCrawlerOutbox, ProcessService.ProcessId.CRAWLER, recrawlMessage.crawlerMsgId);
if (rsp.state() != MqMessageState.OK)
error("Crawler failed");
return recrawlMessage;
}
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception {
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
error("Process " + processId + " did not launch");
}
for (;;) {
try {
return outbox.waitResponse(id, 1, TimeUnit.SECONDS);
}
catch (TimeoutException ex) {
// Maybe the process died, wait a moment for it to restart
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
error("Process " + processId + " died and did not re-launch");
}
}
}
}
public boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException {
// Wait for process to start
long deadline = System.currentTimeMillis() + unit.toMillis(duration);
while (System.currentTimeMillis() < deadline) {
if (processService.isRunning(processId))
return true;
TimeUnit.SECONDS.sleep(1);
}
return false;
}
}

View File

@ -1,9 +1,12 @@
package nu.marginalia.control.model;
public enum Actor {
CRAWL,
RECRAWL,
RECONVERT_LOAD,
CONVERTER_MONITOR,
LOADER_MONITOR,
CRAWLER_MONITOR,
MESSAGE_QUEUE_MONITOR,
PROCESS_LIVENESS_MONITOR,
FILE_STORAGE_MONITOR

View File

@ -4,6 +4,13 @@ import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageType;
public record FileStorageWithActions(FileStorage storage) {
public boolean isCrawlable() {
return storage.type() == FileStorageType.CRAWL_SPEC;
}
public boolean isRecrawlable() {
return storage.type() == FileStorageType.CRAWL_DATA;
}
public boolean isLoadable() {
return storage.type() == FileStorageType.PROCESSED_DATA;
}

View File

@ -4,6 +4,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.actor.ControlActors;
import nu.marginalia.control.actor.task.ReconvertAndLoadActor;
import nu.marginalia.control.actor.task.RecrawlActor;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.model.ActorRunState;
import nu.marginalia.control.model.ActorStateGraph;
@ -43,16 +44,33 @@ public class ControlActorService {
return "";
}
public Object triggerCrawling(Request request, Response response) throws Exception {
controlActors.start(
Actor.CRAWL,
FileStorageId.parse(request.params("fid"))
);
return "";
}
public Object triggerRecrawling(Request request, Response response) throws Exception {
controlActors.start(
Actor.RECRAWL,
RecrawlActor.recrawlFromCrawlData(
FileStorageId.parse(request.params("fid"))
)
);
return "";
}
public Object triggerProcessing(Request request, Response response) throws Exception {
controlActors.start(
Actor.RECONVERT_LOAD,
FileStorageId.of(Integer.parseInt(request.params("fid")))
FileStorageId.parse(request.params("fid"))
);
return "";
}
public Object loadProcessedData(Request request, Response response) throws Exception {
var fid = FileStorageId.of(Integer.parseInt(request.params("fid")));
var fid = FileStorageId.parse(request.params("fid"));
// Start the FSM from the intermediate state that triggers the load
controlActors.startFrom(

View File

@ -24,4 +24,8 @@ public class ProcessOutboxFactory {
public MqOutbox createLoaderOutbox() {
return new MqOutbox(persistence, ProcessInboxNames.LOADER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid());
}
public MqOutbox createCrawlerOutbox() {
return new MqOutbox(persistence, ProcessInboxNames.CRAWLER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid());
}
}

View File

@ -34,6 +34,11 @@
{{#each storage}}
<tr>
<td>
{{#if isCrawlable}}
<form method="post" action="/storage/{{storage.id}}/crawl">
<button type="submit">Crawl</button>
</form>
{{/if}}
{{#if isLoadable}}
<form method="post" action="/storage/{{storage.id}}/load">
<button type="submit">Load</button>
@ -44,6 +49,11 @@
<button type="submit">Process</button>
</form>
{{/if}}
{{#if isRecrawlable}}
<form method="post" action="/storage/{{storage.id}}/recrawl">
<button type="submit">Recrawl</button>
</form>
{{/if}}
{{#if isDeletable}}
<form method="post" action="/storage/{{storage.id}}/delete" onsubmit="return confirm('Confirm deletion of {{storage.path}}')">
<button type="submit">Delete</button>

View File

@ -31,9 +31,9 @@ public class CrawlJobSpecWriterTest {
@Test
public void testReadWrite() throws IOException {
try (CrawlJobSpecWriter writer = new CrawlJobSpecWriter(tempFile)) {
writer.accept(new CrawlingSpecification("first",1, "test1", List.of("a", "b", "c")));
writer.accept(new CrawlingSpecification("second",1, "test2", List.of("a", "b", "c", "d")));
writer.accept(new CrawlingSpecification("third",1, "test3", List.of("a", "b")));
writer.accept(new CrawlingSpecification("first",1, "test1", List.of("a", "b", "c"), null));
writer.accept(new CrawlingSpecification("second",1, "test2", List.of("a", "b", "c", "d"), null));
writer.accept(new CrawlingSpecification("third",1, "test3", List.of("a", "b"), null));
}
List<CrawlingSpecification> outputs = new ArrayList<>();

3
run/env/service.env vendored
View File

@ -1,3 +1,4 @@
WMSA_HOME=run/
CONTROL_SERVICE_OPTS="-DdistPath=/dist"
CONVERTER_OPTS="-ea -Xmx16G -XX:-CompactStrings -XX:+UseParallelGC -XX:GCTimeRatio=14 -XX:ParallelGCThreads=15"
CONVERTER_OPTS="-ea -Xmx16G -XX:-CompactStrings -XX:+UseParallelGC -XX:GCTimeRatio=14 -XX:ParallelGCThreads=15"
CRAWLER_OPTS="-Xmx16G -XX:+UseParallelGC -XX:GCTimeRatio=14 -XX:ParallelGCThreads=15"