diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java index 5aaecc5d..16cdc6f3 100644 --- a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java @@ -6,6 +6,6 @@ import nu.marginalia.db.storage.model.FileStorageId; /** A request to start a crawl */ @AllArgsConstructor public class CrawlRequest { - FileStorageId specStorage; - FileStorageId crawlStorage; + public FileStorageId specStorage; + public FileStorageId crawlStorage; } diff --git a/code/common/db/src/main/java/nu/marginalia/db/storage/FileStorageService.java b/code/common/db/src/main/java/nu/marginalia/db/storage/FileStorageService.java index 7ed94a46..334643b1 100644 --- a/code/common/db/src/main/java/nu/marginalia/db/storage/FileStorageService.java +++ b/code/common/db/src/main/java/nu/marginalia/db/storage/FileStorageService.java @@ -11,6 +11,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.attribute.PosixFilePermissions; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; /** Manages file storage for processes and services @@ -63,6 +65,49 @@ public class FileStorageService { return null; } + public void relateFileStorages(FileStorageId source, FileStorageId target) { + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement(""" + INSERT INTO FILE_STORAGE_RELATION(SOURCE_ID, TARGET_ID) VALUES (?, ?) + """)) { + stmt.setLong(1, source.id()); + stmt.setLong(2, target.id()); + stmt.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public List getSourceFromStorage(FileStorage storage) throws SQLException { + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement(""" + SELECT SOURCE_ID FROM FILE_STORAGE_RELATION WHERE TARGET_ID = ? + """)) { + stmt.setLong(1, storage.id().id()); + var rs = stmt.executeQuery(); + List ret = new ArrayList<>(); + while (rs.next()) { + ret.add(getStorage(new FileStorageId(rs.getLong(1)))); + } + return ret; + } + } + + public List getTargetFromStorage(FileStorage storage) throws SQLException { + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement(""" + SELECT TARGET_ID FROM FILE_STORAGE_RELATION WHERE SOURCE_ID = ? + """)) { + stmt.setLong(1, storage.id().id()); + var rs = stmt.executeQuery(); + List ret = new ArrayList<>(); + while (rs.next()) { + ret.add(getStorage(new FileStorageId(rs.getLong(1)))); + } + return ret; + } + } + /** @return the storage base with the given type, or null if it does not exist */ public FileStorageBase getStorageBase(FileStorageBaseType type) throws SQLException { try (var conn = dataSource.getConnection(); @@ -153,13 +198,7 @@ public class FileStorageService { var rs = query.executeQuery(); if (rs.next()) { - return new FileStorage( - new FileStorageId(rs.getLong("ID")), - base, - type, - tempDir.toString(), - description - ); + return getStorage(new FileStorageId(rs.getLong("ID"))); } } diff --git a/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageId.java b/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageId.java index 3d6331e3..a89ad9f8 100644 --- a/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageId.java +++ b/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageId.java @@ -1,6 +1,9 @@ package nu.marginalia.db.storage.model; public record FileStorageId(long id) { + public static FileStorageId parse(String str) { + return new FileStorageId(Long.parseLong(str)); + } public static FileStorageId of(int storageId) { return new FileStorageId(storageId); } diff --git a/code/common/db/src/main/resources/sql/current/13-file-storage.sql b/code/common/db/src/main/resources/sql/current/13-file-storage.sql index 763f39a0..b2063fc8 100644 --- a/code/common/db/src/main/resources/sql/current/13-file-storage.sql +++ b/code/common/db/src/main/resources/sql/current/13-file-storage.sql @@ -23,6 +23,14 @@ CREATE TABLE IF NOT EXISTS FILE_STORAGE ( CHARACTER SET utf8mb4 COLLATE utf8mb4_bin; +CREATE TABLE IF NOT EXISTS FILE_STORAGE_RELATION ( + SOURCE_ID BIGINT NOT NULL, + TARGET_ID BIGINT NOT NULL, + CONSTRAINT CONS UNIQUE (SOURCE_ID, TARGET_ID), + FOREIGN KEY (SOURCE_ID) REFERENCES FILE_STORAGE(ID) ON DELETE CASCADE, + FOREIGN KEY (TARGET_ID) REFERENCES FILE_STORAGE(ID) ON DELETE CASCADE +); + CREATE VIEW FILE_STORAGE_VIEW AS SELECT CONCAT(BASE.PATH, '/', STORAGE.PATH) AS PATH, diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java index 9c293af7..67b95484 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java @@ -64,7 +64,6 @@ public class CrawledDomainReader { return Optional.of(read(path)); } catch (Exception ex) { - logger.warn("Failed to read domain " + path, ex); return Optional.empty(); } } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java index 51ffab18..83582212 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java @@ -14,12 +14,15 @@ import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; public class CrawledDomainWriter implements AutoCloseable { private final Path outputDir; private final Gson gson = GsonFactory.get(); private static final Logger logger = LoggerFactory.getLogger(CrawledDomainWriter.class); private final Writer writer; + private final Path tmpFile; private final Path outputFile; public CrawledDomainWriter(Path outputDir, String name, String id) throws IOException { @@ -29,8 +32,10 @@ public class CrawledDomainWriter implements AutoCloseable { throw new IllegalArgumentException("Output dir " + outputDir + " does not exist"); } + tmpFile = getOutputFile(id, name + "_tmp"); outputFile = getOutputFile(id, name); - writer = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(outputFile)))); + writer = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(tmpFile, + StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)))); } public Path getOutputFile() { @@ -46,32 +51,12 @@ public class CrawledDomainWriter implements AutoCloseable { } private Path getOutputFile(String id, String name) throws IOException { - String first = id.substring(0, 2); - String second = id.substring(2, 4); - - Path destDir = outputDir.resolve(first).resolve(second); - if (!Files.exists(destDir)) { - Files.createDirectories(destDir); - } - return destDir.resolve(id + "-" + filesystemSafeName(name) + ".zstd"); - } - - private String filesystemSafeName(String name) { - StringBuilder nameSaneBuilder = new StringBuilder(); - - name.chars() - .map(Character::toLowerCase) - .map(c -> (c & ~0x7F) == 0 ? c : 'X') - .map(c -> (Character.isDigit(c) || Character.isAlphabetic(c) || c == '.') ? c : 'X') - .limit(128) - .forEach(c -> nameSaneBuilder.append((char) c)); - - return nameSaneBuilder.toString(); - + return CrawlerOutputFile.createOutputPath(outputDir, id, name); } @Override public void close() throws IOException { + Files.move(tmpFile, outputFile, StandardCopyOption.REPLACE_EXISTING); writer.close(); } } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java new file mode 100644 index 00000000..6cf5857f --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java @@ -0,0 +1,53 @@ +package nu.marginalia.crawling.io; + +import nu.marginalia.crawling.model.spec.CrawlingSpecification; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class CrawlerOutputFile { + + public static Path getOutputFile(Path base, CrawlingSpecification spec) { + return getOutputFile(base, spec.id, spec.domain); + } + + + /** Return the Path to a file for the given id and name */ + public static Path getOutputFile(Path base, String id, String name) { + String first = id.substring(0, 2); + String second = id.substring(2, 4); + + Path destDir = base.resolve(first).resolve(second); + return destDir.resolve(id + "-" + filesystemSafeName(name) + ".zstd"); + } + + /** Return the Path to a file for the given id and name, creating the prerequisite + * directory structure as necessary. */ + public static Path createOutputPath(Path base, String id, String name) throws IOException { + String first = id.substring(0, 2); + String second = id.substring(2, 4); + + Path destDir = base.resolve(first).resolve(second); + if (!Files.exists(destDir)) { + Files.createDirectories(destDir); + } + return destDir.resolve(id + "-" + filesystemSafeName(name) + ".zstd"); + } + + + private static String filesystemSafeName(String name) { + StringBuilder nameSaneBuilder = new StringBuilder(); + + name.chars() + .map(Character::toLowerCase) + .map(c -> (c & ~0x7F) == 0 ? c : 'X') + .map(c -> (Character.isDigit(c) || Character.isAlphabetic(c) || c == '.') ? c : 'X') + .limit(128) + .forEach(c -> nameSaneBuilder.append((char) c)); + + return nameSaneBuilder.toString(); + + } + +} diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/CrawledDocument.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/CrawledDocument.java index 004408eb..0066ddf2 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/CrawledDocument.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/CrawledDocument.java @@ -27,6 +27,8 @@ public class CrawledDocument implements SerializableCrawlData { public String canonicalUrl; public String redirectUrl; + public String recrawlState; + public static final String SERIAL_IDENTIFIER = "// DOCUMENT"; @Override public String getSerialIdentifier() { diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlingSpecification.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlingSpecification.java index 47ecf921..f6001166 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlingSpecification.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlingSpecification.java @@ -3,10 +3,12 @@ package nu.marginalia.crawling.model.spec; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.NoArgsConstructor; +import lombok.With; +import nu.marginalia.crawling.model.CrawledDomain; import java.util.List; -@AllArgsConstructor @NoArgsConstructor @Builder +@AllArgsConstructor @NoArgsConstructor @Builder @With public class CrawlingSpecification { public String id; @@ -16,6 +18,8 @@ public class CrawlingSpecification { public String domain; public List urls; + public CrawledDomain oldData; + @Override public String toString() { return String.format(getClass().getSimpleName() + "[" + id + "/" + domain + ": " + crawlDepth + "[ " + urls.size() + "]"); diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index 5488a6c2..55c022ba 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -138,7 +138,7 @@ public class ConverterMain { // Advance the progress bar to the current position if this is a resumption processedDomains.set(processLog.countFinishedJobs()); - heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains); + heartbeat.setProgress(processedDomains.get() / (double) totalDomains); for (var domain : plan.domainsIterable(id -> !processLog.isJobFinished(id))) { diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java index 5b78ac9e..67aa5299 100644 --- a/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java @@ -113,7 +113,8 @@ public class ConvertingIntegrationTest { BigString.encode(readClassPathFile(p.toString())), Double.toString(Math.random()), "https://memex.marginalia.nu/" + file, - null + null, + "" ); docs.add(doc); } diff --git a/code/processes/crawling-process/build.gradle b/code/processes/crawling-process/build.gradle index b62b3a68..48068620 100644 --- a/code/processes/crawling-process/build.gradle +++ b/code/processes/crawling-process/build.gradle @@ -27,9 +27,12 @@ dependencies { implementation project(':code:common:service') implementation project(':code:libraries:big-string') implementation project(':code:api:index-api') + implementation project(':code:api:process-mqapi') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') + implementation project(':code:common:message-queue') implementation project(':code:libraries:language-processing') + implementation project(':code:libraries:easy-lsh') implementation project(':code:process-models:crawling-model') implementation project(':code:process-models:converting-model') diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlLimiter.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlLimiter.java new file mode 100644 index 00000000..29f02e4f --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlLimiter.java @@ -0,0 +1,72 @@ +package nu.marginalia.crawl; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.Semaphore; + +public class CrawlLimiter { + public static final int maxPoolSize = Integer.getInteger("crawler.pool-size", 512); + + // We'll round up to this size when we're crawling a new domain to prevent + // too many concurrent connections + public static final int minCrawlDataSizeKb = 128; // 100 Kb + + // The largest size on disk where we'll permit a refresh crawl + // (these files easily grow into the gigabytes, we don't want that in RAM) + public static final int maxRefreshableCrawlDataSizeKBytes = 1024*128; // 128 Mb + + // This limits how many concurrent crawl tasks we can have running at once + // based on their size on disk. The on-disk size is compressed, and the + // in-ram size is partially compressed (i.e. only the document body); so + // maybe a fair estimate is something like 2-4x this figure for RAM usage + // + public static final int maxConcurrentCrawlTaskSizeKb = 512*1024; // 512 Mb + + static { + // Sanity check; if this is false we'll get a deadlock on taskSemRAM + assert maxConcurrentCrawlTaskSizeKb >= maxRefreshableCrawlDataSizeKBytes + : "maxConcurrentCrawlTaskSizeKb must be larger than maxRefreshableCrawlDataSizeKBytes"; + } + + public record CrawlTaskLimits(Path refreshPath, boolean isRefreshable, int taskSize) {} + + // We use two semaphores to keep track of the number of concurrent crawls; + // first a RAM sempahore to limit the amount of RAM used by refresh crawls. + // then a count semaphore to limit the number of concurrent threads (this keeps the connection count manageable) + private final Semaphore taskSemRAM = new Semaphore(maxConcurrentCrawlTaskSizeKb); + private final Semaphore taskSemCount = new Semaphore(maxPoolSize); + + + public CrawlTaskLimits getTaskLimits(Path fileName) { + long size; + + try { + size = Math.max(minCrawlDataSizeKb, Files.size(fileName) / 1024); + } catch (IOException ex) { + // If we can't read the file, we'll assume it's small since we won't be able to read it later for the refresh either + return new CrawlTaskLimits(null,false, minCrawlDataSizeKb); + } + + // We'll only permit refresh crawls if the file is small enough + boolean isRefreshable = size < maxRefreshableCrawlDataSizeKBytes; + + // We'll truncate this down to maxRefreshableCrawlDataSizeKBytes to ensure + // it's possible to acquire the RAM semaphore + int effectiveSize = (int) Math.min(maxRefreshableCrawlDataSizeKBytes, size); + + return new CrawlTaskLimits(fileName, isRefreshable, effectiveSize); + } + + + public void acquire(CrawlTaskLimits properties) throws InterruptedException { + // It's very important that we acquire the RAM semaphore first to avoid a deadlock + taskSemRAM.acquire(properties.taskSize); + taskSemCount.acquire(1); + } + + public void release(CrawlTaskLimits properties) { + taskSemCount.release(1); + taskSemRAM.release(properties.taskSize); + } +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index a0a3f8b7..3dd096cb 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -1,13 +1,23 @@ package nu.marginalia.crawl; -import nu.marginalia.ProcessConfiguration; +import com.google.gson.Gson; +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl; +import nu.marginalia.crawling.io.CrawledDomainReader; +import nu.marginalia.crawling.io.CrawlerOutputFile; +import nu.marginalia.crawling.model.CrawledDomain; +import nu.marginalia.db.storage.FileStorageService; +import nu.marginalia.mq.MessageQueueFactory; +import nu.marginalia.mq.MqMessage; +import nu.marginalia.mq.inbox.MqInboxResponse; +import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.log.WorkLog; import nu.marginalia.service.module.DatabaseModule; -import plan.CrawlPlanLoader; import plan.CrawlPlan; import nu.marginalia.crawling.io.CrawledDomainWriter; import nu.marginalia.crawling.model.spec.CrawlingSpecification; @@ -19,49 +29,63 @@ import okhttp3.internal.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.nio.file.Path; +import java.sql.SQLException; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import static nu.marginalia.mqapi.ProcessInboxNames.CRAWLER_INBOX; + public class CrawlerMain implements AutoCloseable { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final CrawlPlan plan; - private final Path crawlDataDir; - - private final WorkLog workLog; + private Path crawlDataDir; + private WorkLog workLog; + private final ProcessHeartbeat heartbeat; private final ConnectionPool connectionPool = new ConnectionPool(5, 10, TimeUnit.SECONDS); private final Dispatcher dispatcher = new Dispatcher(new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", true))); private final UserAgent userAgent; + private final MessageQueueFactory messageQueueFactory; + private final FileStorageService fileStorageService; + private final Gson gson; private final ThreadPoolExecutor pool; - final int poolSize = Integer.getInteger("crawler.pool-size", 512); - final int poolQueueSize = 32; + public final CrawlLimiter crawlLimiter = new CrawlLimiter(); private final Set processedIds = new HashSet<>(); - AbortMonitor abortMonitor = AbortMonitor.getInstance(); - Semaphore taskSem = new Semaphore(poolSize); + final AbortMonitor abortMonitor = AbortMonitor.getInstance(); - private static ProcessHeartbeat heartbeat; + volatile int totalTasks; + final AtomicInteger tasksDone = new AtomicInteger(0); - public CrawlerMain(CrawlPlan plan) throws Exception { - this.plan = plan; - this.userAgent = WmsaHome.getUserAgent(); + @Inject + public CrawlerMain(UserAgent userAgent, + ProcessHeartbeat heartbeat, + MessageQueueFactory messageQueueFactory, + FileStorageService fileStorageService, + Gson gson) { + this.heartbeat = heartbeat; + this.userAgent = userAgent; + this.messageQueueFactory = messageQueueFactory; + this.fileStorageService = fileStorageService; + this.gson = gson; - // Ensure that the user agent is set for Java's HTTP requests - - BlockingQueue queue = new LinkedBlockingQueue<>(poolQueueSize); - pool = new ThreadPoolExecutor(poolSize/128, poolSize, 5, TimeUnit.MINUTES, queue); // maybe need to set -Xss for JVM to deal with this? - - workLog = plan.createCrawlWorkLog(); - crawlDataDir = plan.crawl.getDir(); + // maybe need to set -Xss for JVM to deal with this? + pool = new ThreadPoolExecutor( + CrawlLimiter.maxPoolSize /128, + CrawlLimiter.maxPoolSize, + 5, TimeUnit.MINUTES, + new LinkedBlockingQueue<>(32) + ); } public static void main(String... args) throws Exception { @@ -77,46 +101,65 @@ public class CrawlerMain implements AutoCloseable { System.setProperty("sun.net.client.defaultConnectTimeout", "30000"); System.setProperty("sun.net.client.defaultReadTimeout", "30000"); - if (args.length != 1) { - System.err.println("Arguments: crawl-plan.yaml"); - System.exit(0); - } - var plan = new CrawlPlanLoader().load(Path.of(args[0])); + Injector injector = Guice.createInjector( + new CrawlerModule(), + new DatabaseModule() + ); + var crawler = injector.getInstance(CrawlerMain.class); - heartbeat = new ProcessHeartbeat(new ProcessConfiguration("crawler", 0, UUID.randomUUID()), - new DatabaseModule().provideConnection()); + var instructions = crawler.fetchInstructions(); + try { + crawler.run(instructions.getPlan()); + instructions.ok(); + } + catch (Exception ex) { + System.err.println("Crawler failed"); + ex.printStackTrace(); + instructions.err(); + } - try (var crawler = new CrawlerMain(plan)) { - heartbeat.start(); - crawler.run(); - } - finally { - heartbeat.shutDown(); - } + TimeUnit.SECONDS.sleep(5); System.exit(0); } - public void run() throws InterruptedException { - // First a validation run to ensure the file is all good to parse - logger.info("Validating JSON"); - int countTotal = 0; - int countProcessed = 0; + public void run(CrawlPlan plan) throws InterruptedException, IOException { - for (var unused : plan.crawlingSpecificationIterable()) { - countTotal++; + heartbeat.start(); + try { + // First a validation run to ensure the file is all good to parse + logger.info("Validating JSON"); + + + workLog = plan.createCrawlWorkLog(); + crawlDataDir = plan.crawl.getDir(); + + int countTotal = 0; + for (var unused : plan.crawlingSpecificationIterable()) { + countTotal++; + } + totalTasks = countTotal; + + logger.info("Let's go"); + + for (var spec : plan.crawlingSpecificationIterable()) { + startCrawlTask(plan, spec); + } + + pool.shutdown(); + do { + System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining"); + } while (!pool.awaitTermination(60, TimeUnit.SECONDS)); } - - logger.info("Let's go"); - - for (var spec : plan.crawlingSpecificationIterable()) { - heartbeat.setProgress(countProcessed / (double) countTotal); - startCrawlTask(spec); + finally { + heartbeat.shutDown(); } } + CrawledDomainReader reader = new CrawledDomainReader(); - private void startCrawlTask(CrawlingSpecification crawlingSpecification) { + + private void startCrawlTask(CrawlPlan plan, CrawlingSpecification crawlingSpecification) { if (!processedIds.add(crawlingSpecification.id)) { @@ -132,28 +175,41 @@ public class CrawlerMain implements AutoCloseable { return; } + var limits = crawlLimiter.getTaskLimits(CrawlerOutputFile.getOutputFile(crawlDataDir, crawlingSpecification)); + try { - taskSem.acquire(); + crawlLimiter.acquire(limits); } catch (InterruptedException e) { throw new RuntimeException(e); } pool.execute(() -> { try { - fetchDomain(crawlingSpecification); + fetchDomain(crawlingSpecification, limits); + heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); } finally { - taskSem.release(); + crawlLimiter.release(limits); } }); } - private void fetchDomain(CrawlingSpecification specification) { + + private void fetchDomain(CrawlingSpecification specification, CrawlLimiter.CrawlTaskLimits limits) { if (workLog.isJobFinished(specification.id)) return; HttpFetcher fetcher = new HttpFetcherImpl(userAgent.uaString(), dispatcher, connectionPool); + // Read the previous crawl's data for this domain, if it exists and has a reasonable size + Optional domain; + if (limits.isRefreshable()) { + domain = reader.readOptionally(limits.refreshPath()); + if (domain.isPresent()) { + specification = specification.withOldData(domain.get()); + } + } + try (CrawledDomainWriter writer = new CrawledDomainWriter(crawlDataDir, specification.domain, specification.id)) { var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept); @@ -167,6 +223,65 @@ public class CrawlerMain implements AutoCloseable { } } + private static class CrawlRequest { + private final CrawlPlan plan; + private final MqMessage message; + private final MqSingleShotInbox inbox; + + CrawlRequest(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) { + this.plan = plan; + this.message = message; + this.inbox = inbox; + } + + public CrawlPlan getPlan() { + return plan; + } + + public void ok() { + inbox.sendResponse(message, MqInboxResponse.ok()); + } + public void err() { + inbox.sendResponse(message, MqInboxResponse.err()); + } + + } + + private CrawlRequest fetchInstructions() throws Exception { + + var inbox = messageQueueFactory.createSingleShotInbox(CRAWLER_INBOX, UUID.randomUUID()); + + var msgOpt = getMessage(inbox, nu.marginalia.mqapi.crawling.CrawlRequest.class.getSimpleName()); + var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); + + var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class); + + var specData = fileStorageService.getStorage(request.specStorage); + var crawlData = fileStorageService.getStorage(request.crawlStorage); + + var plan = new CrawlPlan(specData.asPath().resolve("crawler.spec").toString(), + new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"), + null); + + return new CrawlRequest(plan, msg, inbox); + } + + private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { + var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); + if (opt.isPresent()) { + if (!opt.get().function().equals(expectedFunction)) { + throw new RuntimeException("Unexpected function: " + opt.get().function()); + } + return opt; + } + else { + var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); + stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); + return stolenMessage; + } + } + + public void close() throws Exception { logger.info("Awaiting termination"); pool.shutdown(); @@ -176,8 +291,6 @@ public class CrawlerMain implements AutoCloseable { workLog.close(); dispatcher.executorService().shutdownNow(); - - } } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerModule.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerModule.java new file mode 100644 index 00000000..ebf6d33f --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerModule.java @@ -0,0 +1,24 @@ +package nu.marginalia.crawl; + +import com.google.gson.Gson; +import com.google.inject.AbstractModule; +import lombok.SneakyThrows; +import nu.marginalia.ProcessConfiguration; +import nu.marginalia.UserAgent; +import nu.marginalia.WmsaHome; +import nu.marginalia.model.gson.GsonFactory; + +import java.util.UUID; + +public class CrawlerModule extends AbstractModule { + @SneakyThrows + public void configure() { + bind(Gson.class).toInstance(createGson()); + bind(UserAgent.class).toInstance(WmsaHome.getUserAgent()); + bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("crawler", 0, UUID.randomUUID())); + } + + private Gson createGson() { + return GsonFactory.get(); + } +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java new file mode 100644 index 00000000..cc827084 --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java @@ -0,0 +1,123 @@ +package nu.marginalia.crawl.retreival; + +import nu.marginalia.crawling.model.CrawledDocument; +import nu.marginalia.crawling.model.CrawledDomain; +import nu.marginalia.model.EdgeUrl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URISyntaxException; +import java.util.*; +import java.util.stream.Collectors; + +/** A reference to a domain that has been crawled before. */ +public class CrawlDataReference { + private final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class); + final Map documents; + final Map etags; + final Map lastModified; + final Set previouslyDeadUrls = new HashSet<>(); + + CrawlDataReference(CrawledDomain referenceDomain) { + + if (referenceDomain == null || referenceDomain.doc == null) { + documents = Collections.emptyMap(); + etags = Collections.emptyMap(); + lastModified = Collections.emptyMap(); + return; + } + + documents = new HashMap<>(referenceDomain.doc.size()); + etags = new HashMap<>(referenceDomain.doc.size()); + lastModified = new HashMap<>(referenceDomain.doc.size()); + + for (var doc : referenceDomain.doc) { + try { + addReference(doc); + } catch (URISyntaxException ex) { + logger.warn("Failed to add reference document {}", doc.url); + } + } + } + + private void addReference(CrawledDocument doc) throws URISyntaxException { + var url = new EdgeUrl(doc.url); + + if (doc.httpStatus == 404) { + previouslyDeadUrls.add(url); + return; + } + + if (doc.httpStatus != 200) { + return; + } + + + documents.put(url, doc); + + String headers = doc.headers; + if (headers != null) { + String[] headersLines = headers.split("\n"); + + String lastmod = null; + String etag = null; + + for (String line : headersLines) { + if (line.toLowerCase().startsWith("etag:")) { + etag = line.substring(5).trim(); + } + if (line.toLowerCase().startsWith("last-modified:")) { + lastmod = line.substring(14).trim(); + } + } + + if (lastmod != null) { + lastModified.put(url, lastmod); + } + if (etag != null) { + etags.put(url, etag); + } + } + } + + public boolean isPreviouslyDead(EdgeUrl url) { + return previouslyDeadUrls.contains(url); + } + public int size() { + return documents.size(); + } + + public String getEtag(EdgeUrl url) { + return etags.get(url); + } + + public String getLastModified(EdgeUrl url) { + return lastModified.get(url); + } + + public Map allDocuments() { + return documents; + } + + + public Map sample(int sampleSize) { + return documents.entrySet().stream().limit(sampleSize).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public void evict() { + documents.clear(); + etags.clear(); + lastModified.clear(); + } + + public CrawledDocument getDoc(EdgeUrl top) { + return documents.get(top); + } + + // This bit of manual housekeeping is needed to keep the memory footprint low + public void dispose(EdgeUrl url) { + documents.remove(url); + etags.remove(url); + lastModified.remove(url); + } +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java index 3af0110a..52927f38 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java @@ -10,6 +10,7 @@ import nu.marginalia.crawling.model.spec.CrawlingSpecification; import nu.marginalia.link_parser.LinkParser; import nu.marginalia.crawling.model.*; import nu.marginalia.ip_blocklist.UrlBlocklist; +import nu.marginalia.lsh.EasyLSH; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import org.jsoup.Jsoup; @@ -57,6 +58,7 @@ public class CrawlerRetreiver { private final SitemapRetriever sitemapRetriever; private final DomainCrawlFrontier crawlFrontier; + private final CrawlDataReference oldCrawlData; int errorCount = 0; @@ -64,6 +66,7 @@ public class CrawlerRetreiver { CrawlingSpecification specs, Consumer writer) { this.fetcher = fetcher; + this.oldCrawlData = new CrawlDataReference(specs.oldData); id = specs.id; domain = specs.domain; @@ -73,9 +76,9 @@ public class CrawlerRetreiver { this.crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), specs.urls, specs.crawlDepth); sitemapRetriever = fetcher.createSitemapRetriever(); + // We must always crawl the index page first, this is assumed when fingerprinting the server var fst = crawlFrontier.peek(); if (fst != null) { - // Ensure the index page is always crawled var root = fst.withPathAndParam("/", null); @@ -141,6 +144,29 @@ public class CrawlerRetreiver { var robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain); + CrawlDataComparison comparison = compareWithOldData(robotsRules); + logger.info("Comparison result for {} : {}", domain, comparison); + + // If we have reference data, we will always grow the crawl depth a bit + if (oldCrawlData.size() > 0) { + crawlFrontier.increaseDepth(1.5); + } + + // When the reference data doesn't appear to have changed, we'll forego + // re-fetching it and just use the old data + if (comparison == CrawlDataComparison.NO_CHANGES) { + oldCrawlData.allDocuments().forEach((url, doc) -> { + if (crawlFrontier.addVisited(url)) { + doc.recrawlState = "RETAINED"; + crawledDomainWriter.accept(doc); + } + }); + + // We don't need to hold onto this in RAM anymore + oldCrawlData.evict(); + } + + downloadSitemaps(robotsRules); sniffRootDocument(); @@ -161,18 +187,31 @@ public class CrawlerRetreiver { continue; } + // Don't re-fetch links that were previously found dead as it's very unlikely that a + // 404:ing link will suddenly start working at a later point + if (oldCrawlData.isPreviouslyDead(top)) + continue; + + // Check the link filter if the endpoint should be fetched based on site-type if (!crawlFrontier.filterLink(top)) continue; + + // Check vs blocklist if (urlBlocklist.isUrlBlocked(top)) continue; + if (!isAllowedProtocol(top.proto)) continue; + + // Check if the URL is too long to insert into the DB if (top.toString().length() > 255) continue; + if (!crawlFrontier.addVisited(top)) continue; - if (fetchDocument(top, crawlDelay)) { + + if (fetchDocument(top, crawlDelay).isPresent()) { fetchedCount++; } } @@ -184,6 +223,76 @@ public class CrawlerRetreiver { return fetchedCount; } + private CrawlDataComparison compareWithOldData(SimpleRobotRules robotsRules) { + + int numGoodDocuments = oldCrawlData.size(); + + if (numGoodDocuments == 0) + return CrawlDataComparison.NO_OLD_DATA; + + if (numGoodDocuments < 10) + return CrawlDataComparison.SMALL_SAMPLE; + + // We fetch a sample of the data to assess how much it has changed + int sampleSize = (int) Math.min(20, 0.25 * numGoodDocuments); + Map referenceUrls = oldCrawlData.sample(sampleSize); + + int differences = 0; + + long crawlDelay = robotsRules.getCrawlDelay(); + for (var url : referenceUrls.keySet()) { + + var docMaybe = fetchDocument(url, crawlDelay); + if (docMaybe.isEmpty()) { + differences++; + continue; + } + + var newDoc = docMaybe.get(); + var referenceDoc = referenceUrls.get(url); + + // This looks like a bug but it is not, we want to compare references + // to detect if the page has bounced off etag or last-modified headers + // to avoid having to do a full content comparison + if (newDoc == referenceDoc) + continue; + + if (newDoc.httpStatus != referenceDoc.httpStatus) { + differences++; + continue; + } + + if (newDoc.documentBody == null) { + differences++; + continue; + } + + long referenceLsh = hashDoc(referenceDoc); + long newLsh = hashDoc(newDoc); + + if (EasyLSH.hammingDistance(referenceLsh, newLsh) > 5) { + differences++; + } + } + if (differences > sampleSize/4) { + return CrawlDataComparison.CHANGES_FOUND; + } + else { + return CrawlDataComparison.NO_CHANGES; + } + } + + private static final HashFunction hasher = Hashing.murmur3_128(0); + private long hashDoc(CrawledDocument doc) { + var hash = new EasyLSH(); + long val = 0; + for (var b : doc.documentBody.decode().getBytes()) { + val = val << 8 | (b & 0xFF); + hash.addUnordered(hasher.hashLong(val).asLong()); + } + return hash.get(); + } + private void downloadSitemaps(SimpleRobotRules robotsRules) { List sitemaps = robotsRules.getSitemaps(); @@ -235,7 +344,7 @@ public class CrawlerRetreiver { try { logger.debug("Configuring link filter"); - var url = crawlFrontier.peek(); + var url = crawlFrontier.peek().withPathAndParam("/", null); var maybeSample = fetchUrl(url).filter(sample -> sample.httpStatus == 200); if (maybeSample.isEmpty()) @@ -273,7 +382,7 @@ public class CrawlerRetreiver { } } - private boolean fetchDocument(EdgeUrl top, long crawlDelay) { + private Optional fetchDocument(EdgeUrl top, long crawlDelay) { logger.debug("Fetching {}", top); long startTime = System.currentTimeMillis(); @@ -282,9 +391,14 @@ public class CrawlerRetreiver { if (doc.isPresent()) { var d = doc.get(); crawledDomainWriter.accept(d); + oldCrawlData.dispose(top); if (d.url != null) { - EdgeUrl.parse(d.url).ifPresent(crawlFrontier::addVisited); + // We may have redirected to a different path + EdgeUrl.parse(d.url).ifPresent(url -> { + crawlFrontier.addVisited(url); + oldCrawlData.dispose(url); + }); } if ("ERROR".equals(d.crawlerStatus) && d.httpStatus != 404) { @@ -296,7 +410,7 @@ public class CrawlerRetreiver { long crawledTime = System.currentTimeMillis() - startTime; delay(crawlDelay, crawledTime); - return doc.isPresent(); + return doc; } private boolean isAllowedProtocol(String proto) { @@ -333,7 +447,20 @@ public class CrawlerRetreiver { private CrawledDocument fetchContent(EdgeUrl top) { for (int i = 0; i < 2; i++) { try { - return fetcher.fetchContent(top); + var doc = fetcher.fetchContent(top, oldCrawlData.getEtag(top), oldCrawlData.getLastModified(top)); + + doc.recrawlState = "NEW"; + + if (doc.httpStatus == 304) { + var referenceData = oldCrawlData.getDoc(top); + if (referenceData != null) { + referenceData.recrawlState = "304/UNCHANGED"; + return referenceData; + } + } + + + return doc; } catch (RateLimitException ex) { slowDown = true; @@ -443,4 +570,12 @@ public class CrawlerRetreiver { .build(); } + + enum CrawlDataComparison { + NO_OLD_DATA, + SMALL_SAMPLE, + CHANGES_FOUND, + NO_CHANGES + }; + } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java index b6e23f0c..7d5fc214 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/DomainCrawlFrontier.java @@ -17,7 +17,7 @@ public class DomainCrawlFrontier { private Predicate linkFilter = url -> true; - final int depth; + private int depth; public DomainCrawlFrontier(EdgeDomain thisDomain, Collection urls, int depth) { this.thisDomain = thisDomain; @@ -32,6 +32,9 @@ public class DomainCrawlFrontier { } } + public void increaseDepth(double depthIncreaseFactor) { + depth = (int)(depth * depthIncreaseFactor); + } public void setLinkFilter(Predicate linkFilter) { this.linkFilter = linkFilter; } @@ -80,6 +83,9 @@ public class DomainCrawlFrontier { if (queue.size() + visited.size() >= depth + 100) return; + if (visited.contains(url.toString())) + return; + if (known.add(url.toString())) { queue.addLast(url); } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java index 1f630ac5..7f588783 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java @@ -18,7 +18,7 @@ public interface HttpFetcher { FetchResult probeDomain(EdgeUrl url); - CrawledDocument fetchContent(EdgeUrl url) throws RateLimitException; + CrawledDocument fetchContent(EdgeUrl url, String etag, String lastMod) throws RateLimitException; SimpleRobotRules fetchRobotRules(EdgeDomain domain); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java index 55a6d296..36c8bd34 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java @@ -125,29 +125,20 @@ public class HttpFetcherImpl implements HttpFetcher { } } - private Request createHeadRequest(EdgeUrl url) { - return new Request.Builder().head().addHeader("User-agent", userAgent) - .url(url.toString()) - .addHeader("Accept-Encoding", "gzip") - .build(); - } - - private Request createGetRequest(EdgeUrl url) { - return new Request.Builder().get().addHeader("User-agent", userAgent) - .url(url.toString()) - .addHeader("Accept-Encoding", "gzip") - .build(); - - } @Override @SneakyThrows - public CrawledDocument fetchContent(EdgeUrl url) throws RateLimitException { + public CrawledDocument fetchContent(EdgeUrl url, String etag, String lastMod) throws RateLimitException { if (contentTypeLogic.isUrlLikeBinary(url)) { logger.debug("Probing suspected binary {}", url); - var head = createHeadRequest(url); + var headBuilder = new Request.Builder().head() + .addHeader("User-agent", userAgent) + .url(url.toString()) + .addHeader("Accept-Encoding", "gzip"); + + var head = headBuilder.build(); var call = client.newCall(head); try (var rsp = call.execute()) { @@ -165,7 +156,15 @@ public class HttpFetcherImpl implements HttpFetcher { } } - var get = createGetRequest(url); + var getBuilder = new Request.Builder().get(); + getBuilder.addHeader("User-agent", userAgent) + .url(url.toString()) + .addHeader("Accept-Encoding", "gzip"); + + if (etag != null) getBuilder.addHeader("If-None-Match", etag); + if (lastMod != null) getBuilder.addHeader("If-Modified-Since", lastMod); + + var get = getBuilder.build(); var call = client.newCall(get); try (var rsp = call.execute()) { @@ -315,7 +314,7 @@ public class HttpFetcherImpl implements HttpFetcher { private Optional fetchRobotsForProto(String proto, EdgeDomain domain) { try { var url = new EdgeUrl(proto, domain, null, "/robots.txt", null); - return Optional.of(parseRobotsTxt(fetchContent(url))); + return Optional.of(parseRobotsTxt(fetchContent(url, null, null))); } catch (Exception ex) { return Optional.empty(); diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java index f6c2f3a4..2ea9c763 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java @@ -29,14 +29,14 @@ class HttpFetcherTest { @Test void fetchUTF8() throws URISyntaxException, RateLimitException { var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler"); - var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu")); + var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), null, null); System.out.println(str.contentType); } @Test void fetchText() throws URISyntaxException, RateLimitException { var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler"); - var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt")); + var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), null, null); System.out.println(str); } } \ No newline at end of file diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java index 7462b62c..f580a123 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java @@ -33,7 +33,6 @@ public class CrawlerMockFetcherTest { Map mockData = new HashMap<>(); HttpFetcher fetcherMock = new MockFetcher(); - SitemapRetriever sitemapRetriever = new SitemapRetriever(); @AfterEach public void tearDown() { @@ -74,7 +73,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://startrek.website/c/startrek"), "mock-crawl-data/lemmy/c_startrek.html"); registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html"); - new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "startrek.website", new ArrayList<>()), out::add) + new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "startrek.website", new ArrayList<>(), null), out::add) .withNoDelay() .fetch(); @@ -87,7 +86,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html"); - new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "en.wikipedia.org", new ArrayList<>()), out::add) + new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "en.wikipedia.org", new ArrayList<>(), null), out::add) .withNoDelay() .fetch(); @@ -102,7 +101,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/telegram-channel-to-idle-on/3501"), "mock-crawl-data/discourse/telegram.html"); registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html"); - new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 100, "community.tt-rss.org", new ArrayList<>()), out::add) + new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 100, "community.tt-rss.org", new ArrayList<>(), null), out::add) .withNoDelay() .fetch(); @@ -127,7 +126,7 @@ public class CrawlerMockFetcherTest { } @Override - public CrawledDocument fetchContent(EdgeUrl url) { + public CrawledDocument fetchContent(EdgeUrl url, String etag, String lastModified) { logger.info("Fetching {}", url); if (mockData.containsKey(url)) { return mockData.get(url); diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java index 64c7e890..741c8704 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java @@ -6,12 +6,15 @@ import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl; import nu.marginalia.crawling.model.CrawledDocument; +import nu.marginalia.crawling.model.CrawledDomain; import nu.marginalia.crawling.model.spec.CrawlingSpecification; import nu.marginalia.crawling.model.SerializableCrawlData; import org.junit.jupiter.api.*; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -95,4 +98,36 @@ class CrawlerRetreiverTest { ); } + @Test + public void testRecrawl() { + + var specs = CrawlingSpecification + .builder() + .id("whatever") + .crawlDepth(12) + .domain("www.marginalia.nu") + .urls(List.of("https://www.marginalia.nu/some-dead-link")) + .build(); + + + Map, List> data = new HashMap<>(); + + new CrawlerRetreiver(httpFetcher, specs, d -> { + data.computeIfAbsent(d.getClass(), k->new ArrayList<>()).add(d); + if (d instanceof CrawledDocument doc) { + System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus); + } + }).fetch(); + + CrawledDomain domain = (CrawledDomain) data.get(CrawledDomain.class).get(0); + domain.doc = data.get(CrawledDocument.class).stream().map(CrawledDocument.class::cast).collect(Collectors.toList()); + + var newSpec = specs.withOldData(domain); + + new CrawlerRetreiver(httpFetcher, newSpec, d -> { + if (d instanceof CrawledDocument doc) { + System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus); + } + }).fetch(); + } } \ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java index 49ed3dff..82869816 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -82,6 +82,8 @@ public class ControlService extends Service { Spark.post("/public/fsms/:fsm/start", controlActorService::startFsm, redirectToProcesses); Spark.post("/public/fsms/:fsm/stop", controlActorService::stopFsm, redirectToProcesses); + Spark.post("/public/storage/:fid/crawl", controlActorService::triggerCrawling, redirectToProcesses); + Spark.post("/public/storage/:fid/recrawl", controlActorService::triggerRecrawling, redirectToProcesses); Spark.post("/public/storage/:fid/process", controlActorService::triggerProcessing, redirectToProcesses); Spark.post("/public/storage/:fid/load", controlActorService::loadProcessedData, redirectToProcesses); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java index c470341c..bfa90be1 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java @@ -4,6 +4,8 @@ import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; import lombok.SneakyThrows; +import nu.marginalia.control.actor.task.CrawlActor; +import nu.marginalia.control.actor.task.RecrawlActor; import nu.marginalia.control.model.Actor; import nu.marginalia.control.actor.monitor.*; import nu.marginalia.control.actor.monitor.ConverterMonitorActor; @@ -22,6 +24,7 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; +/** This class is responsible for starting and stopping the various actors in the controller service */ @Singleton public class ControlActors { private final ServiceEventLog eventLog; @@ -35,7 +38,10 @@ public class ControlActors { GsonFactory gsonFactory, BaseServiceParams baseServiceParams, ReconvertAndLoadActor reconvertAndLoadActor, + CrawlActor crawlActor, + RecrawlActor recrawlActor, ConverterMonitorActor converterMonitorFSM, + CrawlerMonitorActor crawlerMonitorActor, LoaderMonitorActor loaderMonitor, MessageQueueMonitorActor messageQueueMonitor, ProcessLivenessMonitorActor processMonitorFSM, @@ -45,9 +51,12 @@ public class ControlActors { this.eventLog = baseServiceParams.eventLog; this.gson = gsonFactory.get(); + register(Actor.CRAWL, crawlActor); + register(Actor.RECRAWL, recrawlActor); register(Actor.RECONVERT_LOAD, reconvertAndLoadActor); register(Actor.CONVERTER_MONITOR, converterMonitorFSM); register(Actor.LOADER_MONITOR, loaderMonitor); + register(Actor.CRAWLER_MONITOR, crawlerMonitorActor); register(Actor.MESSAGE_QUEUE_MONITOR, messageQueueMonitor); register(Actor.PROCESS_LIVENESS_MONITOR, processMonitorFSM); register(Actor.FILE_STORAGE_MONITOR, fileStorageMonitorActor); @@ -100,9 +109,6 @@ public class ControlActors { Map.Entry::getKey, e -> e.getValue().getState()) ); } - public MachineState getActorStates(Actor actor) { - return stateMachines.get(actor).getState(); - } public AbstractStateGraph getActorDefinition(Actor actor) { return actorDefinitions.get(actor); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java index 7b5b1e11..0f608138 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java @@ -64,17 +64,28 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph { description = """ Monitors the inbox of the process for messages. If a message is found, transition to RUN. + The state takes an optional Integer parameter errorAttempts + that is passed to run. errorAttempts is set to zero after + a few seconds of silence. """ ) - public void monitor() throws SQLException, InterruptedException { + public void monitor(Integer errorAttempts) throws SQLException, InterruptedException { + if (errorAttempts == null) { + errorAttempts = 0; + } for (;;) { var messages = persistence.eavesdrop(inboxName, 1); if (messages.isEmpty() && !processService.isRunning(processId)) { TimeUnit.SECONDS.sleep(5); + + if (errorAttempts > 0) { // Reset the error counter if there is silence in the inbox + transition(MONITOR, 0); + } + // else continue } else { - transition(RUN); + transition(RUN, errorAttempts); } } } @@ -87,7 +98,7 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph { If the process fails, retransition to RUN up to MAX_ATTEMPTS times. After MAX_ATTEMPTS at restarting the process, transition to ERROR. If the process is cancelled, transition to ABORTED. - If the process is successful, transition to MONITOR. + If the process is successful, transition to MONITOR(errorAttempts). """ ) public void run(Integer attempts) throws Exception { @@ -108,7 +119,7 @@ public class AbstractProcessSpawnerActor extends AbstractStateGraph { transition(ABORTED); } - transition(MONITOR); + transition(MONITOR, attempts); } @TerminalState(name = ABORTED, description = "The process was manually aborted") diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/monitor/CrawlerMonitorActor.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/monitor/CrawlerMonitorActor.java new file mode 100644 index 00000000..f50f7b73 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/monitor/CrawlerMonitorActor.java @@ -0,0 +1,25 @@ +package nu.marginalia.control.actor.monitor; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqapi.ProcessInboxNames; +import nu.marginalia.mqsm.StateFactory; + +@Singleton +public class CrawlerMonitorActor extends AbstractProcessSpawnerActor { + + @Inject + public CrawlerMonitorActor(StateFactory stateFactory, + MqPersistence persistence, + ProcessService processService) { + super(stateFactory, + persistence, + processService, + ProcessInboxNames.CRAWLER_INBOX, + ProcessService.ProcessId.CRAWLER); + } + + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlActor.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlActor.java new file mode 100644 index 00000000..4db5b3e1 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlActor.java @@ -0,0 +1,171 @@ +package nu.marginalia.control.actor.task; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import lombok.With; +import nu.marginalia.control.svc.ProcessOutboxFactory; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.db.storage.FileStorageService; +import nu.marginalia.db.storage.model.FileStorageBaseType; +import nu.marginalia.db.storage.model.FileStorageId; +import nu.marginalia.db.storage.model.FileStorageType; +import nu.marginalia.index.client.IndexClient; +import nu.marginalia.index.client.IndexMqEndpoints; +import nu.marginalia.mq.MqMessage; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.converting.ConvertRequest; +import nu.marginalia.mqapi.crawling.CrawlRequest; +import nu.marginalia.mqapi.loading.LoadRequest; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.mqsm.graph.ResumeBehavior; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@Singleton +public class CrawlActor extends AbstractStateGraph { + + // STATES + + public static final String INITIAL = "INITIAL"; + public static final String CRAWL = "CRAWL"; + public static final String CRAWL_WAIT = "CRAWL-WAIT"; + public static final String END = "END"; + private final ProcessService processService; + private final MqOutbox mqCrawlerOutbox; + private final FileStorageService storageService; + private final Gson gson; + private final Logger logger = LoggerFactory.getLogger(getClass()); + + + @AllArgsConstructor @With @NoArgsConstructor + public static class Message { + public FileStorageId crawlSpecId = null; + public FileStorageId crawlStorageId = null; + public long crawlerMsgId = 0L; + }; + + @Inject + public CrawlActor(StateFactory stateFactory, + ProcessService processService, + ProcessOutboxFactory processOutboxFactory, + FileStorageService storageService, + Gson gson + ) + { + super(stateFactory); + this.processService = processService; + this.mqCrawlerOutbox = processOutboxFactory.createCrawlerOutbox(); + this.storageService = storageService; + this.gson = gson; + } + + @GraphState(name = INITIAL, + next = CRAWL, + description = """ + Validate the input and transition to CRAWL + """) + public Message init(FileStorageId crawlStorageId) throws Exception { + if (null == crawlStorageId) { + error("This Actor requires a FileStorageId to be passed in as a parameter to INITIAL"); + } + + var storage = storageService.getStorage(crawlStorageId); + + if (storage == null) error("Bad storage id"); + if (storage.type() != FileStorageType.CRAWL_SPEC) error("Bad storage type " + storage.type()); + + return new Message().withCrawlSpecId(crawlStorageId); + } + + @GraphState(name = CRAWL, + next = CRAWL_WAIT, + resume = ResumeBehavior.ERROR, + description = """ + Allocate a storage area for the crawled data, + then send a crawl request to the crawler and transition to CRAWL_WAIT. + """ + ) + public Message crawl(Message message) throws Exception { + // Create processed data area + + var toCrawl = storageService.getStorage(message.crawlSpecId); + + var base = storageService.getStorageBase(FileStorageBaseType.SLOW); + var dataArea = storageService.allocateTemporaryStorage( + base, + FileStorageType.CRAWL_DATA, + "crawl-data", + toCrawl.description()); + + storageService.relateFileStorages(toCrawl.id(), dataArea.id()); + + // Pre-send convert request + var request = new CrawlRequest(message.crawlSpecId, dataArea.id()); + long id = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request)); + + return message + .withCrawlStorageId(dataArea.id()) + .withCrawlerMsgId(id); + } + + @GraphState( + name = CRAWL_WAIT, + next = END, + resume = ResumeBehavior.RETRY, + description = """ + Wait for the crawler to finish retreiving the data. + """ + ) + public Message crawlerWait(Message message) throws Exception { + var rsp = waitResponse(mqCrawlerOutbox, ProcessService.ProcessId.CRAWLER, message.crawlerMsgId); + + if (rsp.state() != MqMessageState.OK) + error("Crawler failed"); + + return message; + } + + + public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception { + if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { + error("Process " + processId + " did not launch"); + } + for (;;) { + try { + return outbox.waitResponse(id, 1, TimeUnit.SECONDS); + } + catch (TimeoutException ex) { + // Maybe the process died, wait a moment for it to restart + if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { + error("Process " + processId + " died and did not re-launch"); + } + } + } + } + + public boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException { + + // Wait for process to start + long deadline = System.currentTimeMillis() + unit.toMillis(duration); + while (System.currentTimeMillis() < deadline) { + if (processService.isRunning(processId)) + return true; + + TimeUnit.SECONDS.sleep(1); + } + + return false; + } + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/ReconvertAndLoadActor.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/ReconvertAndLoadActor.java index 2ffde9b2..96730aa2 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/ReconvertAndLoadActor.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/ReconvertAndLoadActor.java @@ -118,6 +118,8 @@ public class ReconvertAndLoadActor extends AbstractStateGraph { var processedArea = storageService.allocateTemporaryStorage(base, FileStorageType.PROCESSED_DATA, "processed-data", "Processed Data; " + toProcess.description()); + storageService.relateFileStorages(toProcess.id(), processedArea.id()); + // Pre-send convert request var request = new ConvertRequest(message.crawlStorageId, processedArea.id()); long id = mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/RecrawlActor.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/RecrawlActor.java new file mode 100644 index 00000000..bfa847f2 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/RecrawlActor.java @@ -0,0 +1,185 @@ +package nu.marginalia.control.actor.task; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import lombok.With; +import nu.marginalia.control.svc.ProcessOutboxFactory; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.db.storage.FileStorageService; +import nu.marginalia.db.storage.model.FileStorage; +import nu.marginalia.db.storage.model.FileStorageId; +import nu.marginalia.db.storage.model.FileStorageType; +import nu.marginalia.index.client.IndexClient; +import nu.marginalia.mq.MqMessage; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.crawling.CrawlRequest; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.mqsm.graph.ResumeBehavior; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.sql.SQLException; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@Singleton +public class RecrawlActor extends AbstractStateGraph { + + // STATES + + public static final String INITIAL = "INITIAL"; + public static final String CRAWL = "CRAWL"; + public static final String CRAWL_WAIT = "CRAWL-WAIT"; + public static final String END = "END"; + private final ProcessService processService; + private final MqOutbox mqCrawlerOutbox; + private final FileStorageService storageService; + private final Gson gson; + private final Logger logger = LoggerFactory.getLogger(getClass()); + + + @AllArgsConstructor @With @NoArgsConstructor + public static class RecrawlMessage { + public FileStorageId crawlSpecId = null; + public FileStorageId crawlStorageId = null; + public long crawlerMsgId = 0L; + }; + + public static RecrawlMessage recrawlFromCrawlData(FileStorageId crawlData) { + return new RecrawlMessage(null, crawlData, 0L); + } + public static RecrawlMessage recrawlFromCrawlDataAndCralSpec(FileStorageId crawlData, FileStorageId crawlSpec) { + return new RecrawlMessage(crawlSpec, crawlData, 0L); + } + + @Inject + public RecrawlActor(StateFactory stateFactory, + ProcessService processService, + ProcessOutboxFactory processOutboxFactory, + FileStorageService storageService, + Gson gson + ) + { + super(stateFactory); + this.processService = processService; + this.mqCrawlerOutbox = processOutboxFactory.createCrawlerOutbox(); + this.storageService = storageService; + this.gson = gson; + } + + @GraphState(name = INITIAL, + next = CRAWL, + description = """ + Validate the input and transition to CRAWL + """) + public RecrawlMessage init(RecrawlMessage recrawlMessage) throws Exception { + if (null == recrawlMessage) { + error("This Actor requires a message as an argument"); + } + + + var crawlStorage = storageService.getStorage(recrawlMessage.crawlStorageId); + FileStorage specStorage; + + if (recrawlMessage.crawlSpecId != null) { + specStorage = storageService.getStorage(recrawlMessage.crawlSpecId); + } + else { + specStorage = getSpec(crawlStorage).orElse(null); + } + + if (specStorage == null) error("Bad storage id"); + if (specStorage.type() != FileStorageType.CRAWL_SPEC) error("Bad storage type " + specStorage.type()); + if (crawlStorage == null) error("Bad storage id"); + if (crawlStorage.type() != FileStorageType.CRAWL_DATA) error("Bad storage type " + specStorage.type()); + + Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log")); + + return recrawlMessage + .withCrawlSpecId(specStorage.id()); + } + + private Optional getSpec(FileStorage crawlStorage) throws SQLException { + return storageService.getSourceFromStorage(crawlStorage) + .stream() + .filter(storage -> storage.type().equals(FileStorageType.CRAWL_SPEC)) + .findFirst(); + } + + @GraphState(name = CRAWL, + next = CRAWL_WAIT, + resume = ResumeBehavior.ERROR, + description = """ + Send a crawl request to the crawler and transition to CRAWL_WAIT. + """ + ) + public RecrawlMessage crawl(RecrawlMessage recrawlMessage) throws Exception { + // Create processed data area + + var toCrawl = storageService.getStorage(recrawlMessage.crawlSpecId); + + // Pre-send crawl request + var request = new CrawlRequest(recrawlMessage.crawlSpecId, recrawlMessage.crawlStorageId); + long id = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request)); + + return recrawlMessage.withCrawlerMsgId(id); + } + + @GraphState( + name = CRAWL_WAIT, + next = END, + resume = ResumeBehavior.RETRY, + description = """ + Wait for the crawler to finish retreiving the data. + """ + ) + public RecrawlMessage crawlerWait(RecrawlMessage recrawlMessage) throws Exception { + var rsp = waitResponse(mqCrawlerOutbox, ProcessService.ProcessId.CRAWLER, recrawlMessage.crawlerMsgId); + + if (rsp.state() != MqMessageState.OK) + error("Crawler failed"); + + return recrawlMessage; + } + + + public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception { + if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { + error("Process " + processId + " did not launch"); + } + for (;;) { + try { + return outbox.waitResponse(id, 1, TimeUnit.SECONDS); + } + catch (TimeoutException ex) { + // Maybe the process died, wait a moment for it to restart + if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { + error("Process " + processId + " died and did not re-launch"); + } + } + } + } + + public boolean waitForProcess(ProcessService.ProcessId processId, TimeUnit unit, int duration) throws InterruptedException { + + // Wait for process to start + long deadline = System.currentTimeMillis() + unit.toMillis(duration); + while (System.currentTimeMillis() < deadline) { + if (processService.isRunning(processId)) + return true; + + TimeUnit.SECONDS.sleep(1); + } + + return false; + } + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/Actor.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/Actor.java index dcced17e..83d0b810 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/Actor.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/Actor.java @@ -1,9 +1,12 @@ package nu.marginalia.control.model; public enum Actor { + CRAWL, + RECRAWL, RECONVERT_LOAD, CONVERTER_MONITOR, LOADER_MONITOR, + CRAWLER_MONITOR, MESSAGE_QUEUE_MONITOR, PROCESS_LIVENESS_MONITOR, FILE_STORAGE_MONITOR diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java index 674e92bc..4ef9a394 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java @@ -4,6 +4,13 @@ import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.db.storage.model.FileStorageType; public record FileStorageWithActions(FileStorage storage) { + public boolean isCrawlable() { + return storage.type() == FileStorageType.CRAWL_SPEC; + } + public boolean isRecrawlable() { + return storage.type() == FileStorageType.CRAWL_DATA; + } + public boolean isLoadable() { return storage.type() == FileStorageType.PROCESSED_DATA; } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java index d39f9d4f..c7bab07f 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java @@ -4,6 +4,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.control.actor.ControlActors; import nu.marginalia.control.actor.task.ReconvertAndLoadActor; +import nu.marginalia.control.actor.task.RecrawlActor; import nu.marginalia.control.model.Actor; import nu.marginalia.control.model.ActorRunState; import nu.marginalia.control.model.ActorStateGraph; @@ -43,16 +44,33 @@ public class ControlActorService { return ""; } + public Object triggerCrawling(Request request, Response response) throws Exception { + controlActors.start( + Actor.CRAWL, + FileStorageId.parse(request.params("fid")) + ); + return ""; + } + + public Object triggerRecrawling(Request request, Response response) throws Exception { + controlActors.start( + Actor.RECRAWL, + RecrawlActor.recrawlFromCrawlData( + FileStorageId.parse(request.params("fid")) + ) + ); + return ""; + } public Object triggerProcessing(Request request, Response response) throws Exception { controlActors.start( Actor.RECONVERT_LOAD, - FileStorageId.of(Integer.parseInt(request.params("fid"))) + FileStorageId.parse(request.params("fid")) ); return ""; } public Object loadProcessedData(Request request, Response response) throws Exception { - var fid = FileStorageId.of(Integer.parseInt(request.params("fid"))); + var fid = FileStorageId.parse(request.params("fid")); // Start the FSM from the intermediate state that triggers the load controlActors.startFrom( diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessOutboxFactory.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessOutboxFactory.java index 4c296069..52808aef 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessOutboxFactory.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessOutboxFactory.java @@ -24,4 +24,8 @@ public class ProcessOutboxFactory { public MqOutbox createLoaderOutbox() { return new MqOutbox(persistence, ProcessInboxNames.LOADER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid()); } + + public MqOutbox createCrawlerOutbox() { + return new MqOutbox(persistence, ProcessInboxNames.CRAWLER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid()); + } } diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb index 1674d6f5..7f748489 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb @@ -34,6 +34,11 @@ {{#each storage}} + {{#if isCrawlable}} +
+ +
+ {{/if}} {{#if isLoadable}}
@@ -44,6 +49,11 @@
{{/if}} + {{#if isRecrawlable}} +
+ +
+ {{/if}} {{#if isDeletable}}
diff --git a/code/tools/crawl-job-extractor/src/test/java/nu/marginalia/crawl/CrawlJobSpecWriterTest.java b/code/tools/crawl-job-extractor/src/test/java/nu/marginalia/crawl/CrawlJobSpecWriterTest.java index 38cfc4fb..7fd5922f 100644 --- a/code/tools/crawl-job-extractor/src/test/java/nu/marginalia/crawl/CrawlJobSpecWriterTest.java +++ b/code/tools/crawl-job-extractor/src/test/java/nu/marginalia/crawl/CrawlJobSpecWriterTest.java @@ -31,9 +31,9 @@ public class CrawlJobSpecWriterTest { @Test public void testReadWrite() throws IOException { try (CrawlJobSpecWriter writer = new CrawlJobSpecWriter(tempFile)) { - writer.accept(new CrawlingSpecification("first",1, "test1", List.of("a", "b", "c"))); - writer.accept(new CrawlingSpecification("second",1, "test2", List.of("a", "b", "c", "d"))); - writer.accept(new CrawlingSpecification("third",1, "test3", List.of("a", "b"))); + writer.accept(new CrawlingSpecification("first",1, "test1", List.of("a", "b", "c"), null)); + writer.accept(new CrawlingSpecification("second",1, "test2", List.of("a", "b", "c", "d"), null)); + writer.accept(new CrawlingSpecification("third",1, "test3", List.of("a", "b"), null)); } List outputs = new ArrayList<>(); diff --git a/run/env/service.env b/run/env/service.env index dfa012b3..ac745577 100644 --- a/run/env/service.env +++ b/run/env/service.env @@ -1,3 +1,4 @@ WMSA_HOME=run/ CONTROL_SERVICE_OPTS="-DdistPath=/dist" -CONVERTER_OPTS="-ea -Xmx16G -XX:-CompactStrings -XX:+UseParallelGC -XX:GCTimeRatio=14 -XX:ParallelGCThreads=15" \ No newline at end of file +CONVERTER_OPTS="-ea -Xmx16G -XX:-CompactStrings -XX:+UseParallelGC -XX:GCTimeRatio=14 -XX:ParallelGCThreads=15" +CRAWLER_OPTS="-Xmx16G -XX:+UseParallelGC -XX:GCTimeRatio=14 -XX:ParallelGCThreads=15" \ No newline at end of file