From 74a1f100f437ac210a5d22f5ffaaad2b881b8747 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Sun, 26 Jan 2025 14:46:50 +0100 Subject: [PATCH] (converter) Refactor to remove CrawledDomainReader and move its functionality into SerializableCrawlDataStream --- .../marginalia/converting/ConverterMain.java | 8 +-- .../java/nu/marginalia/crawl/CrawlerMain.java | 6 +-- .../nu/marginalia/io/CrawledDomainReader.java | 53 ------------------- .../io/SerializableCrawlDataStream.java | 48 ++++++++++++++++- .../retreival/CrawlerRetreiverTest.java | 17 +++--- .../nu/marginalia/extractor/AtagExporter.java | 3 +- .../nu/marginalia/extractor/FeedExporter.java | 5 +- .../extractor/TermFrequencyExporter.java | 4 +- .../tools/ExperimentRunnerMain.java | 4 +- .../test/nu/marginalia/IntegrationTest.java | 4 +- 10 files changed, 70 insertions(+), 82 deletions(-) delete mode 100644 code/processes/crawling-process/model/java/nu/marginalia/io/CrawledDomainReader.java diff --git a/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java index b7732cc2..9ab3ea25 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java @@ -12,7 +12,7 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory; import nu.marginalia.converting.writer.ConverterBatchWritableIf; import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.converting.writer.ConverterWriter; -import nu.marginalia.io.CrawledDomainReader; +import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mqapi.converting.ConvertRequest; import nu.marginalia.process.ProcessConfiguration; @@ -207,12 +207,12 @@ public class ConverterMain extends ProcessMainClass { for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(), new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog))) { - if (CrawledDomainReader.sizeHint(dataPath) >= SIDELOAD_THRESHOLD) { + if (SerializableCrawlDataStream.getSizeHint(dataPath) >= SIDELOAD_THRESHOLD) { continue; } pool.submit(() -> { - try (var dataStream = CrawledDomainReader.createDataStream(dataPath)) { + try (var dataStream = SerializableCrawlDataStream.openDataStream(dataPath)) { ConverterBatchWritableIf writable = processor.fullProcessing(dataStream) ; converterWriter.accept(writable); } @@ -239,7 +239,7 @@ public class ConverterMain extends ProcessMainClass { for (var dataPath : WorkLog.iterableMap(crawlDir.getLogFile(), new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog))) { - int sizeHint = CrawledDomainReader.sizeHint(dataPath); + int sizeHint = SerializableCrawlDataStream.getSizeHint(dataPath); if (sizeHint < SIDELOAD_THRESHOLD) { continue; } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java index 8b0afdba..f1f79790 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java @@ -19,8 +19,8 @@ import nu.marginalia.crawl.retreival.DomainProber; import nu.marginalia.crawl.warc.WarcArchiverFactory; import nu.marginalia.crawl.warc.WarcArchiverIf; import nu.marginalia.db.DomainBlacklist; -import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.CrawlerOutputFile; +import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.model.EdgeDomain; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.process.ProcessConfiguration; @@ -417,13 +417,13 @@ public class CrawlerMain extends ProcessMainClass { try { Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain); if (Files.exists(slopPath)) { - return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath)); + return new CrawlDataReference(SerializableCrawlDataStream.openDataStream(slopPath)); } Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain); if (Files.exists(parquetPath)) { slopPath = migrateParquetData(parquetPath, domain, outputDir); - return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath)); + return new CrawlDataReference(SerializableCrawlDataStream.openDataStream(slopPath)); } } catch (IOException e) { diff --git a/code/processes/crawling-process/model/java/nu/marginalia/io/CrawledDomainReader.java b/code/processes/crawling-process/model/java/nu/marginalia/io/CrawledDomainReader.java deleted file mode 100644 index 4c3dbea4..00000000 --- a/code/processes/crawling-process/model/java/nu/marginalia/io/CrawledDomainReader.java +++ /dev/null @@ -1,53 +0,0 @@ -package nu.marginalia.io; - -import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream; -import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.file.Path; - -public class CrawledDomainReader { - private static final Logger logger = LoggerFactory.getLogger(CrawledDomainReader.class); - - /** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */ - public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException - { - - String fileName = fullPath.getFileName().toString(); - if (fileName.endsWith(".parquet")) { - try { - return new ParquetSerializableCrawlDataStream(fullPath); - } catch (Exception ex) { - logger.error("Error reading domain data from " + fullPath, ex); - return SerializableCrawlDataStream.empty(); - } - } - - if (fileName.endsWith(".slop.zip")) { - try { - return new SlopSerializableCrawlDataStream(fullPath); - } catch (Exception ex) { - logger.error("Error reading domain data from " + fullPath, ex); - return SerializableCrawlDataStream.empty(); - } - } - - logger.error("Unknown file type: {}", fullPath); - return SerializableCrawlDataStream.empty(); - } - - public static int sizeHint(Path fullPath) { - String fileName = fullPath.getFileName().toString(); - if (fileName.endsWith(".parquet")) { - return ParquetSerializableCrawlDataStream.sizeHint(fullPath); - } - else if (fileName.endsWith(".slop.zip")) { - return SlopSerializableCrawlDataStream.sizeHint(fullPath); - } - else { - return 0; - } - } -} diff --git a/code/processes/crawling-process/model/java/nu/marginalia/io/SerializableCrawlDataStream.java b/code/processes/crawling-process/model/java/nu/marginalia/io/SerializableCrawlDataStream.java index 8cdbfb95..f9ef150d 100644 --- a/code/processes/crawling-process/model/java/nu/marginalia/io/SerializableCrawlDataStream.java +++ b/code/processes/crawling-process/model/java/nu/marginalia/io/SerializableCrawlDataStream.java @@ -1,5 +1,7 @@ package nu.marginalia.io; +import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream; +import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream; import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.crawldata.CrawledDomain; import nu.marginalia.model.crawldata.SerializableCrawlData; @@ -18,7 +20,6 @@ import java.util.function.Function; /** Closable iterator exceptional over serialized crawl data * The data may appear in any order, and the iterator must be closed. * - * @see CrawledDomainReader * */ public interface SerializableCrawlDataStream extends AutoCloseable { Logger logger = LoggerFactory.getLogger(SerializableCrawlDataStream.class); @@ -27,7 +28,7 @@ public interface SerializableCrawlDataStream extends AutoCloseable { /** Return a size hint for the stream. 0 is returned if the hint is not available, * or if the file is seemed too small to bother */ - default int sizeHint() { return 0; } + default int getSizeHint() { return 0; } boolean hasNext() throws IOException; @@ -36,6 +37,49 @@ public interface SerializableCrawlDataStream extends AutoCloseable { void close() throws IOException; + /** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */ + static SerializableCrawlDataStream openDataStream(Path fullPath) throws IOException + { + + String fileName = fullPath.getFileName().toString(); + if (fileName.endsWith(".parquet")) { + try { + return new ParquetSerializableCrawlDataStream(fullPath); + } catch (Exception ex) { + logger.error("Error reading domain data from " + fullPath, ex); + return SerializableCrawlDataStream.empty(); + } + } + + if (fileName.endsWith(".slop.zip")) { + try { + return new SlopSerializableCrawlDataStream(fullPath); + } catch (Exception ex) { + logger.error("Error reading domain data from " + fullPath, ex); + return SerializableCrawlDataStream.empty(); + } + } + + logger.error("Unknown file type: {}", fullPath); + return SerializableCrawlDataStream.empty(); + } + + /** Get an idication of the size of the stream. This is used to determine whether to + * load the stream into memory or not. 0 is returned if the hint is not available, + * or if the file is seemed too small to bother */ + static int getSizeHint(Path fullPath) { + String fileName = fullPath.getFileName().toString(); + if (fileName.endsWith(".parquet")) { + return ParquetSerializableCrawlDataStream.sizeHint(fullPath); + } + else if (fileName.endsWith(".slop.zip")) { + return SlopSerializableCrawlDataStream.sizeHint(fullPath); + } + else { + return 0; + } + } + default Iterator map(Function> mapper) { return new Iterator<>() { T next = null; diff --git a/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java b/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java index 3343c6d4..98d6bce9 100644 --- a/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java +++ b/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java @@ -10,7 +10,6 @@ import nu.marginalia.crawl.fetcher.HttpFetcher; import nu.marginalia.crawl.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.retreival.*; -import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; @@ -227,7 +226,7 @@ class CrawlerRetreiverTest { convertToParquet(tempFileWarc1, tempFileParquet1); - try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { + try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) { while (stream.hasNext()) { if (stream.next() instanceof CrawledDocument doc) { data.add(doc); @@ -280,7 +279,7 @@ class CrawlerRetreiverTest { convertToParquet(tempFileWarc1, tempFileParquet1); - try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { + try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) { while (stream.hasNext()) { if (stream.next() instanceof CrawledDocument doc) { data.add(doc); @@ -329,7 +328,7 @@ class CrawlerRetreiverTest { doCrawl(tempFileWarc1, specs); convertToParquet(tempFileWarc1, tempFileParquet1); - try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { + try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) { while (stream.hasNext()) { if (stream.next() instanceof CrawledDocument doc) { data.add(doc); @@ -376,7 +375,7 @@ class CrawlerRetreiverTest { doCrawl(tempFileWarc1, specs); convertToParquet(tempFileWarc1, tempFileParquet1); doCrawlWithReferenceStream(specs, - CrawledDomainReader.createDataStream(tempFileParquet1) + SerializableCrawlDataStream.openDataStream(tempFileParquet1) ); convertToParquet(tempFileWarc2, tempFileParquet2); @@ -397,7 +396,7 @@ class CrawlerRetreiverTest { }); } - try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) { + try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) { while (ds.hasNext()) { var doc = ds.next(); if (doc instanceof CrawledDomain dr) { @@ -439,7 +438,7 @@ class CrawlerRetreiverTest { convertToParquet(tempFileWarc1, tempFileParquet1); - try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { + try (var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1)) { while (stream.hasNext()) { var doc = stream.next(); data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc); @@ -448,7 +447,7 @@ class CrawlerRetreiverTest { throw new RuntimeException(e); } - var stream = CrawledDomainReader.createDataStream(tempFileParquet1); + var stream = SerializableCrawlDataStream.openDataStream(tempFileParquet1); System.out.println("---"); @@ -488,7 +487,7 @@ class CrawlerRetreiverTest { }); } - try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) { + try (var ds = SerializableCrawlDataStream.openDataStream(tempFileParquet2)) { while (ds.hasNext()) { var doc = ds.next(); if (doc instanceof CrawledDomain dr) { diff --git a/code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java index f31d39eb..1b2be51c 100644 --- a/code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java +++ b/code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java @@ -3,7 +3,6 @@ package nu.marginalia.extractor; import com.google.inject.Inject; import gnu.trove.set.hash.TLongHashSet; import nu.marginalia.hash.MurmurHash3_128; -import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.link_parser.LinkParser; import nu.marginalia.model.EdgeDomain; @@ -59,7 +58,7 @@ public class AtagExporter implements ExporterIf { } Path crawlDataPath = inputDir.resolve(item.relPath()); - try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { + try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) { exportLinks(tagWriter, stream); } catch (Exception ex) { diff --git a/code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java index 2f5b1d90..aeb87768 100644 --- a/code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java +++ b/code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java @@ -1,7 +1,6 @@ package nu.marginalia.extractor; import com.google.inject.Inject; -import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.link_parser.FeedExtractor; import nu.marginalia.link_parser.LinkParser; @@ -56,7 +55,7 @@ public class FeedExporter implements ExporterIf { } Path crawlDataPath = inputDir.resolve(item.relPath()); - try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { + try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) { exportFeeds(tagWriter, stream); } catch (Exception ex) { @@ -75,7 +74,7 @@ public class FeedExporter implements ExporterIf { private boolean exportFeeds(FeedCsvWriter exporter, SerializableCrawlDataStream stream) throws IOException, URISyntaxException { FeedExtractor feedExtractor = new FeedExtractor(new LinkParser()); - int size = stream.sizeHint(); + int size = stream.getSizeHint(); while (stream.hasNext()) { if (!(stream.next() instanceof CrawledDocument doc)) diff --git a/code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java index 08562251..5348c83a 100644 --- a/code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java +++ b/code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java @@ -5,7 +5,7 @@ import gnu.trove.map.hash.TLongIntHashMap; import gnu.trove.set.hash.TLongHashSet; import nu.marginalia.WmsaHome; import nu.marginalia.converting.processor.logic.dom.DomPruningFilter; -import nu.marginalia.io.CrawledDomainReader; +import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.language.filter.LanguageFilter; import nu.marginalia.language.model.DocumentLanguageData; import nu.marginalia.language.sentence.SentenceExtractor; @@ -103,7 +103,7 @@ public class TermFrequencyExporter implements ExporterIf { { TLongHashSet words = new TLongHashSet(1000); - try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { + try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) { while (stream.hasNext()) { if (Thread.interrupted()) return; diff --git a/code/tools/experiment-runner/java/nu/marginalia/tools/ExperimentRunnerMain.java b/code/tools/experiment-runner/java/nu/marginalia/tools/ExperimentRunnerMain.java index 84538679..f1f1bf5b 100644 --- a/code/tools/experiment-runner/java/nu/marginalia/tools/ExperimentRunnerMain.java +++ b/code/tools/experiment-runner/java/nu/marginalia/tools/ExperimentRunnerMain.java @@ -3,7 +3,7 @@ package nu.marginalia.tools; import com.google.inject.Guice; import com.google.inject.Injector; import nu.marginalia.converting.ConverterModule; -import nu.marginalia.io.CrawledDomainReader; +import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.process.log.WorkLog; import nu.marginalia.service.module.DatabaseModule; @@ -40,7 +40,7 @@ public class ExperimentRunnerMain { Path basePath = Path.of(args[0]); for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) { Path crawlDataPath = basePath.resolve(item.relPath()); - try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { + try (var stream = SerializableCrawlDataStream.openDataStream(crawlDataPath)) { experiment.process(stream); } catch (Exception ex) { diff --git a/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java b/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java index 138a243a..b06800e2 100644 --- a/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java +++ b/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java @@ -26,7 +26,7 @@ import nu.marginalia.index.index.StatefulIndex; import nu.marginalia.index.journal.IndexJournal; import nu.marginalia.index.model.SearchParameters; import nu.marginalia.index.searchset.SearchSetAny; -import nu.marginalia.io.CrawledDomainReader; +import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.linkdb.docs.DocumentDbReader; import nu.marginalia.linkdb.docs.DocumentDbWriter; import nu.marginalia.loading.LoaderIndexJournalWriter; @@ -152,7 +152,7 @@ public class IntegrationTest { /** PROCESS CRAWL DATA */ - var processedDomain = domainProcessor.fullProcessing(CrawledDomainReader.createDataStream(crawlDataParquet)); + var processedDomain = domainProcessor.fullProcessing(SerializableCrawlDataStream.openDataStream(crawlDataParquet)); System.out.println(processedDomain);