diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java index f1f79790..54ebe800 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java @@ -20,7 +20,6 @@ import nu.marginalia.crawl.warc.WarcArchiverFactory; import nu.marginalia.crawl.warc.WarcArchiverIf; import nu.marginalia.db.DomainBlacklist; import nu.marginalia.io.CrawlerOutputFile; -import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.model.EdgeDomain; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.process.ProcessConfiguration; @@ -417,13 +416,13 @@ public class CrawlerMain extends ProcessMainClass { try { Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain); if (Files.exists(slopPath)) { - return new CrawlDataReference(SerializableCrawlDataStream.openDataStream(slopPath)); + return new CrawlDataReference(slopPath); } Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain); if (Files.exists(parquetPath)) { slopPath = migrateParquetData(parquetPath, domain, outputDir); - return new CrawlDataReference(SerializableCrawlDataStream.openDataStream(slopPath)); + return new CrawlDataReference(slopPath); } } catch (IOException e) { diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java index 8e1838a9..c16d39d0 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java @@ -4,6 +4,7 @@ import nu.marginalia.ContentTypes; import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.lsh.EasyLSH; import nu.marginalia.model.crawldata.CrawledDocument; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,51 +12,73 @@ import javax.annotation.Nullable; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Iterator; +import java.util.Objects; +import java.util.Optional; /** A reference to a domain that has been crawled before. */ -public class CrawlDataReference implements AutoCloseable { +public class CrawlDataReference implements AutoCloseable, Iterable { + + private boolean closed = false; + + @Nullable + private final Path path; + + @Nullable + private SerializableCrawlDataStream data = null; - private final SerializableCrawlDataStream data; private static final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class); - public CrawlDataReference(SerializableCrawlDataStream data) { - this.data = data; + public CrawlDataReference(@Nullable Path path) { + this.path = path; } public CrawlDataReference() { - this(SerializableCrawlDataStream.empty()); + this(null); } /** Delete the associated data from disk, if it exists */ public void delete() throws IOException { - Path filePath = data.path(); - - if (filePath != null) { - Files.deleteIfExists(filePath); + if (path != null) { + Files.deleteIfExists(path); } } - /** Get the next document from the crawl data, - * returning null when there are no more documents - * available - */ - @Nullable - public CrawledDocument nextDocument() { - try { - while (data.hasNext()) { - if (data.next() instanceof CrawledDocument doc) { - if (!ContentTypes.isAccepted(doc.contentType)) - continue; + public @NotNull Iterator iterator() { - return doc; + requireStream(); + // Guaranteed by requireStream, but helps java + Objects.requireNonNull(data); + + return data.map(next -> { + if (next instanceof CrawledDocument doc && ContentTypes.isAccepted(doc.contentType)) { + return Optional.of(doc); + } + else { + return Optional.empty(); + } + }); + } + + /** After calling this method, data is guaranteed to be non-null */ + private void requireStream() { + if (closed) { + throw new IllegalStateException("Use after close()"); + } + + if (data == null) { + try { + if (path != null) { + data = SerializableCrawlDataStream.openDataStream(path); + return; } } - } - catch (IOException ex) { - logger.error("Failed to read next document", ex); - } + catch (Exception ex) { + logger.error("Failed to open stream", ex); + } - return null; + data = SerializableCrawlDataStream.empty(); + } } public static boolean isContentBodySame(byte[] one, byte[] other) { @@ -98,7 +121,12 @@ public class CrawlDataReference implements AutoCloseable { } @Override - public void close() throws Exception { - data.close(); + public void close() throws IOException { + if (!closed) { + if (data != null) { + data.close(); + } + closed = true; + } } } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java index 76dd53d7..56aae5d0 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java @@ -89,30 +89,45 @@ public class CrawlerRetreiver implements AutoCloseable { } public int crawlDomain(DomainLinks domainLinks, CrawlDataReference oldCrawlData) { - try { + try (oldCrawlData) { // Do an initial domain probe to determine the root URL - EdgeUrl rootUrl; - var probeResult = probeRootUrl(); - switch (probeResult) { + + return switch (probeResult) { case HttpFetcher.DomainProbeResult.Ok(EdgeUrl probedUrl) -> { - rootUrl = probedUrl; // Good track + + // Sleep after the initial probe, we don't have access to the robots.txt yet + // so we don't know the crawl delay + TimeUnit.SECONDS.sleep(1); + + final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(probedUrl.domain, warcRecorder); + final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay()); + + delayTimer.waitFetchDelay(0); // initial delay after robots.txt + + DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(probedUrl, delayTimer); + domainStateDb.save(summaryRecord); + + // Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified + if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) { + // If we have reference data, we will always grow the crawl depth a bit + crawlFrontier.increaseDepth(1.5, 2500); + } + + oldCrawlData.close(); // proactively close the crawl data reference here to not hold onto expensive resources + + yield crawlDomain(probedUrl, robotsRules, delayTimer, domainLinks); } case HttpFetcher.DomainProbeResult.Redirect(EdgeDomain domain1) -> { domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, "Redirect", domain1.toString())); - return 1; + yield 1; } case HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus status, String desc) -> { domainStateDb.save(DomainStateDb.SummaryRecord.forError(domain, status.toString(), desc)); - return 1; + yield 1; } - } + }; - // Sleep after the initial probe, we don't have access to the robots.txt yet - // so we don't know the crawl delay - TimeUnit.SECONDS.sleep(1); - - return crawlDomain(oldCrawlData, rootUrl, domainLinks); } catch (Exception ex) { logger.error("Error crawling domain {}", domain, ex); @@ -120,28 +135,15 @@ public class CrawlerRetreiver implements AutoCloseable { } } - private int crawlDomain(CrawlDataReference oldCrawlData, - EdgeUrl rootUrl, - DomainLinks domainLinks) throws InterruptedException { + private int crawlDomain(EdgeUrl rootUrl, + SimpleRobotRules robotsRules, + CrawlDelayTimer delayTimer, + DomainLinks domainLinks) { - final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(rootUrl.domain, warcRecorder); - final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay()); - - delayTimer.waitFetchDelay(0); // initial delay after robots.txt - - DomainStateDb.SummaryRecord summaryRecord = sniffRootDocument(rootUrl, delayTimer); - domainStateDb.save(summaryRecord); - - // Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified - if (crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer) > 0) { - // If we have reference data, we will always grow the crawl depth a bit - crawlFrontier.increaseDepth(1.5, 2500); - } // Add external links to the crawl frontier crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto)); - // Fetch sitemaps for (var sitemap : robotsRules.getSitemaps()) { crawlFrontier.addAllToQueue(fetcher.fetchSitemapUrls(sitemap, delayTimer)); diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java index 67835eeb..db9b2337 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java @@ -40,18 +40,12 @@ public class CrawlerRevisitor { int errors = 0; int skipped = 0; - for (;;) { + for (CrawledDocument doc : oldCrawlData) { if (errors > 20) { // If we've had too many errors, we'll stop trying to recrawl break; } - CrawledDocument doc = oldCrawlData.nextDocument(); - - if (doc == null) - break; - - // This Shouldn't Happen (TM) var urlMaybe = EdgeUrl.parse(doc.url); if (urlMaybe.isEmpty()) continue; diff --git a/code/processes/crawling-process/model/java/nu/marginalia/slop/SlopCrawlDataRecord.java b/code/processes/crawling-process/model/java/nu/marginalia/slop/SlopCrawlDataRecord.java index 8842d7e8..21114ca0 100644 --- a/code/processes/crawling-process/model/java/nu/marginalia/slop/SlopCrawlDataRecord.java +++ b/code/processes/crawling-process/model/java/nu/marginalia/slop/SlopCrawlDataRecord.java @@ -108,15 +108,17 @@ public record SlopCrawlDataRecord(String domain, public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException { Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion"); - try (var writer = new Writer(tempDir)) { - CrawledDocumentParquetRecordFileReader.stream(parquetInput).forEach( - parquetRecord -> { - try { - writer.write(new SlopCrawlDataRecord(parquetRecord)); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + try (var writer = new Writer(tempDir); + var stream = CrawledDocumentParquetRecordFileReader.stream(parquetInput)) + { + stream.forEach( + parquetRecord -> { + try { + writer.write(new SlopCrawlDataRecord(parquetRecord)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } catch (IOException ex) { FileUtils.deleteDirectory(tempDir.toFile()); diff --git a/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java b/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java index 98d6bce9..8c397034 100644 --- a/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java +++ b/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java @@ -375,7 +375,7 @@ class CrawlerRetreiverTest { doCrawl(tempFileWarc1, specs); convertToParquet(tempFileWarc1, tempFileParquet1); doCrawlWithReferenceStream(specs, - SerializableCrawlDataStream.openDataStream(tempFileParquet1) + new CrawlDataReference(tempFileParquet1) ); convertToParquet(tempFileWarc2, tempFileParquet2); @@ -447,11 +447,9 @@ class CrawlerRetreiverTest { throw new RuntimeException(e); } - var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1); - System.out.println("---"); - doCrawlWithReferenceStream(specs, stream); + doCrawlWithReferenceStream(specs, new CrawlDataReference(tempFileParquet1)); var revisitCrawlFrontier = new DomainCrawlFrontier( new EdgeDomain("www.marginalia.nu"), @@ -508,12 +506,11 @@ class CrawlerRetreiverTest { } } - private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, SerializableCrawlDataStream stream) { + private void doCrawlWithReferenceStream(CrawlerMain.CrawlSpecRecord specs, CrawlDataReference reference) { try (var recorder = new WarcRecorder(tempFileWarc2, new Cookies()); var db = new DomainStateDb(tempFileDb) ) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), - new CrawlDataReference(stream)); + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, db, recorder).crawlDomain(new DomainLinks(), reference); } catch (IOException | SQLException ex) { Assertions.fail(ex); diff --git a/settings.gradle b/settings.gradle index 75504cd7..6930fd05 100644 --- a/settings.gradle +++ b/settings.gradle @@ -234,7 +234,7 @@ dependencyResolutionManagement { library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208') library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208') - library('slop', 'nu.marginalia', 'slop').version('0.0.9-org-5-SNAPSHOT') + library('slop', 'nu.marginalia', 'slop').version('0.0.10-SNAPSHOT') library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion) library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion) library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion)