From 24051fec03804d6e33ec94a04bc098e37b58b724 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Wed, 27 Dec 2023 18:20:03 +0100 Subject: [PATCH] (converter) WIP Run sideload-style processing for large domains The processor normally retains the domain data in memory after processing to be able to do additional site-wide analysis. This works well, except there are a number of outlier websites that have an absurd number of documents that can rapidly fill up the heap of the process. These websites now receive a simplified treatment. This is executed in the converter batch writer thread. This is slower, but the documents will not be persisted in memory. --- .../marginalia/converting/ConverterMain.java | 8 +- .../converting/model/ProcessedDomain.java | 18 +- .../converting/processor/DomainProcessor.java | 155 ++++++++++++++++-- .../writer/ConverterBatchWritableIf.java | 9 + .../writer/ConverterBatchWriter.java | 15 +- .../writer/ConverterBatchWriterIf.java | 15 ++ .../converting/writer/ConverterWriter.java | 7 +- .../converting/ConvertingIntegrationTest.java | 6 +- ...CrawlingThenConvertingIntegrationTest.java | 2 +- .../experiments/SiteStatisticsExperiment.java | 2 +- 10 files changed, 208 insertions(+), 29 deletions(-) create mode 100644 code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWritableIf.java create mode 100644 code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWriterIf.java diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index 3bada914..b4b3f96e 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -6,9 +6,9 @@ import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.ProcessConfiguration; import nu.marginalia.ProcessConfigurationModule; -import nu.marginalia.converting.model.ProcessedDomain; import nu.marginalia.converting.sideload.SideloadSource; import nu.marginalia.converting.sideload.SideloadSourceFactory; +import nu.marginalia.converting.writer.ConverterBatchWritableIf; import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.converting.writer.ConverterWriter; import nu.marginalia.storage.FileStorageService; @@ -109,7 +109,7 @@ public class ConverterMain { taskHeartbeat.progress(sideloadSource.domainName(), i++, sideloadSources.size()); - writer.write(sideloadSource); + writer.writeSideloadSource(sideloadSource); } taskHeartbeat.progress("Finished", i, sideloadSources.size()); @@ -139,8 +139,8 @@ public class ConverterMain { { pool.submit(() -> { try { - ProcessedDomain processed = processor.process(domain); - converterWriter.accept(processed); + ConverterBatchWritableIf writable = processor.createWritable(domain); + converterWriter.accept(writable); } catch (Exception ex) { logger.info("Error in processing", ex); diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/model/ProcessedDomain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/model/ProcessedDomain.java index 3e954637..2146f52b 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/model/ProcessedDomain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/model/ProcessedDomain.java @@ -1,15 +1,18 @@ package nu.marginalia.converting.model; import lombok.ToString; +import nu.marginalia.converting.writer.ConverterBatchWritableIf; +import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.crawl.DomainIndexingState; import org.jetbrains.annotations.Nullable; +import java.io.IOException; import java.util.List; import java.util.Optional; @ToString -public class ProcessedDomain { +public class ProcessedDomain implements ConverterBatchWritableIf { public EdgeDomain domain; public List documents; @@ -26,4 +29,17 @@ public class ProcessedDomain { public int size() { return Optional.ofNullable(documents).map(List::size).orElse(1); } + + @Override + public void write(ConverterBatchWriter writer) throws IOException { + writer.writeDomainData(this); + } + + @Override + public String id() { + return domain.toString(); + } + + @Override + public void close() {} } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java index e8b89e94..6d46a85f 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java @@ -8,6 +8,9 @@ import nu.marginalia.atags.source.AnchorTagsSource; import nu.marginalia.atags.source.AnchorTagsSourceFactory; import nu.marginalia.converting.model.ProcessedDocument; import nu.marginalia.converting.processor.logic.links.LinkGraph; +import nu.marginalia.converting.sideload.SideloadSource; +import nu.marginalia.converting.writer.ConverterBatchWritableIf; +import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.crawling.io.SerializableCrawlDataStream; import nu.marginalia.crawling.model.*; import nu.marginalia.geoip.GeoIpDictionary; @@ -17,11 +20,15 @@ import nu.marginalia.converting.model.ProcessedDomain; import nu.marginalia.model.EdgeDomain; import nu.marginalia.converting.processor.logic.links.TopKeywords; import nu.marginalia.converting.processor.logic.LshDocumentDeduplicator; +import nu.marginalia.util.ProcessingIterator; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.sql.SQLException; import java.util.*; import java.util.regex.Pattern; @@ -33,6 +40,11 @@ public class DomainProcessor { private final AnchorTextKeywords anchorTextKeywords; private final GeoIpDictionary geoIpDictionary; + + // The threshold for running a cheaper sideloading-style process + // (10 MB is ~ 99.5%th percentile of domain data sizes) + private static final long DOMAIN_SIDELOAD_THRESHOLD = 10_000_000L; + private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject @@ -51,9 +63,130 @@ public class DomainProcessor { geoIpDictionary.waitReady(); } + public ConverterBatchWritableIf createWritable(SerializableCrawlDataStream domain) throws IOException { + Path filePath = domain.path(); + + if (filePath != null && Files.size(filePath) > DOMAIN_SIDELOAD_THRESHOLD) { + // If the file is too big, we run a processing mode that doesn't + // require loading the entire dataset into RAM + return sideloadProcessing(domain); + } + + return fullProcessing(domain); + } + + public ConverterBatchWritableIf sideloadProcessing(SerializableCrawlDataStream dataStream) { + try { + return new SideloadProcessing(dataStream); + } + catch (Exception ex) { + logger.warn("Failed to process domain sideload", ex); + return null; + } + + } + + class SideloadProcessing implements ConverterBatchWritableIf, SideloadSource { + private final SerializableCrawlDataStream dataStream; + private final ProcessedDomain domain; + private final DocumentDecorator documentDecorator; + private final Set processedUrls = new HashSet<>(); + private final DomainLinks externalDomainLinks; + private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator(); + + SideloadProcessing(SerializableCrawlDataStream dataStream) throws IOException { + this.dataStream = dataStream; + + if (!dataStream.hasNext()) { + throw new IllegalStateException("No data in stream"); + } + if (!(dataStream.next() instanceof CrawledDomain crawledDomain)) { + throw new IllegalStateException("First record must be a domain"); + } + + domain = new ProcessedDomain(); + externalDomainLinks = anchorTagsSource.getAnchorTags(domain.domain); + documentDecorator = new DocumentDecorator(anchorTextKeywords, externalDomainLinks); + + processDomain(crawledDomain, domain, documentDecorator); + } + + @Override + public ProcessedDomain getDomain() { + return domain; + } + + @Override + public Iterator getDocumentsStream() { + return new DocumentsIterator(); + } + + class DocumentsIterator implements Iterator { + ProcessedDocument next = null; + @Override + public boolean hasNext() { + try { + while (next != null + && dataStream.hasNext() + && dataStream.next() instanceof CrawledDocument doc) + { + if (doc.url == null || !processedUrls.add(doc.url)) + continue; + + var processedDoc = documentProcessor.process(doc, externalDomainLinks, documentDecorator); + + deduplicator.markIfDuplicate(processedDoc); + next = processedDoc; + + if (processedDoc.isProcessedFully()) { + // This is a bit sketchy, but we need to set the size and topology to something + processedDoc.details.metadata = processedDoc.details.metadata.withSizeAndTopology( + 10_000, externalDomainLinks.countForUrl(processedDoc.url)); + } + + return true; + } + } + catch (IOException ex) { + logger.warn("Failed to process domain sideload", ex); + } + + return false; + } + + @Override + public ProcessedDocument next() { + try { + if (next == null && !hasNext()) + throw new NoSuchElementException(); + return next; + } finally { + next = null; + } + } + } + + @Override + public void write(ConverterBatchWriter writer) throws IOException { + writer.writeSideloadSource(this); + } + + @Override + public String id() { + return domain.domain.toString(); + } + + @Override + public void close() throws Exception { + dataStream.close(); + deduplicator.close(); + } + } + + @SneakyThrows @Nullable - public ProcessedDomain process(SerializableCrawlDataStream dataStream) { + public ProcessedDomain fullProcessing(SerializableCrawlDataStream dataStream) { if (!dataStream.hasNext()) { return null; } @@ -83,8 +216,7 @@ public class DomainProcessor { if (data instanceof CrawledDomain crawledDomain) { documentDecorator = new DocumentDecorator(anchorTextKeywords, externalDomainLinks); - ret = processDomain(crawledDomain, ret, documentDecorator); - + processDomain(crawledDomain, ret, documentDecorator); ret.documents = docs; } else if (data instanceof CrawledDocument doc) { @@ -112,25 +244,23 @@ public class DomainProcessor { return ret; } - private ProcessedDomain processDomain(CrawledDomain crawledDomain, - ProcessedDomain ret, + private void processDomain(CrawledDomain crawledDomain, + ProcessedDomain domain, DocumentDecorator decorator) { - ret.domain = new EdgeDomain(crawledDomain.domain); - ret.ip = crawledDomain.ip; + domain.domain = new EdgeDomain(crawledDomain.domain); + domain.ip = crawledDomain.ip; addIpInfo(decorator, crawledDomain.ip); - if (isAcademicDomain(ret.domain)) { + if (isAcademicDomain(domain.domain)) { decorator.addTerm("special:academia"); } if (crawledDomain.redirectDomain != null) { - ret.redirect = new EdgeDomain(crawledDomain.redirectDomain); + domain.redirect = new EdgeDomain(crawledDomain.redirectDomain); } - ret.state = getState(crawledDomain.crawlerStatus); - - return ret; + domain.state = getState(crawledDomain.crawlerStatus); } @@ -232,4 +362,5 @@ public class DomainProcessor { }; } + } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWritableIf.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWritableIf.java new file mode 100644 index 00000000..c3b4ae65 --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWritableIf.java @@ -0,0 +1,9 @@ +package nu.marginalia.converting.writer; + +import java.io.IOException; + +public interface ConverterBatchWritableIf { + void write(ConverterBatchWriter writer) throws IOException; + String id(); + void close() throws Exception; +} diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWriter.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWriter.java index 239d748c..73333320 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWriter.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWriter.java @@ -27,7 +27,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; /** Writer for a single batch of converter parquet files */ -public class ConverterBatchWriter implements AutoCloseable { +public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriterIf { private final DomainRecordParquetFileWriter domainWriter; private final DomainLinkRecordParquetFileWriter domainLinkWriter; private final DocumentRecordParquetFileWriter documentWriter; @@ -46,7 +46,13 @@ public class ConverterBatchWriter implements AutoCloseable { ); } - public void write(SideloadSource sideloadSource) throws IOException { + @Override + public void write(ConverterBatchWritableIf writable) throws IOException { + writable.write(this); + } + + @Override + public void writeSideloadSource(SideloadSource sideloadSource) throws IOException { var domain = sideloadSource.getDomain(); writeDomainData(domain); @@ -54,7 +60,8 @@ public class ConverterBatchWriter implements AutoCloseable { writeDocumentData(domain.domain, sideloadSource.getDocumentsStream()); } - public void write(ProcessedDomain domain) { + @Override + public void writeProcessedDomain(ProcessedDomain domain) { var results = ForkJoinPool.commonPool().invokeAll( writeTasks(domain) ); @@ -180,7 +187,7 @@ public class ConverterBatchWriter implements AutoCloseable { return this; } - private Object writeDomainData(ProcessedDomain domain) throws IOException { + public Object writeDomainData(ProcessedDomain domain) throws IOException { DomainMetadata metadata = DomainMetadata.from(domain); List feeds = getFeedUrls(domain); diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWriterIf.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWriterIf.java new file mode 100644 index 00000000..eb6e14f4 --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterBatchWriterIf.java @@ -0,0 +1,15 @@ +package nu.marginalia.converting.writer; + +import nu.marginalia.converting.model.ProcessedDomain; +import nu.marginalia.converting.sideload.SideloadSource; + +import java.io.IOException; + +public interface ConverterBatchWriterIf { + + void write(ConverterBatchWritableIf writable) throws IOException; + + void writeSideloadSource(SideloadSource sideloadSource) throws IOException; + + void writeProcessedDomain(ProcessedDomain domain); +} diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterWriter.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterWriter.java index 6cb4f332..6bac2804 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterWriter.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterWriter.java @@ -24,7 +24,7 @@ public class ConverterWriter implements AutoCloseable { private final Duration switchInterval = Duration.of(10, ChronoUnit.MINUTES); - private final ArrayBlockingQueue domainData + private final ArrayBlockingQueue domainData = new ArrayBlockingQueue<>(1); private final Thread workerThread; @@ -42,7 +42,7 @@ public class ConverterWriter implements AutoCloseable { } @SneakyThrows - public void accept(@Nullable ProcessedDomain domain) { + public void accept(@Nullable ConverterBatchWritableIf domain) { if (null == domain) return; @@ -66,10 +66,11 @@ public class ConverterWriter implements AutoCloseable { if (data == null) continue; - String id = data.domain.toString(); + String id = data.id(); if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) { logger.warn("Skipping already logged item {}", id); + data.close(); continue; } diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java index 09973f1b..c22f2c66 100644 --- a/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java @@ -43,7 +43,7 @@ public class ConvertingIntegrationTest { var domain = new CrawledDomain("memex.marginalia.nu", null, "OK", "-", "127.0.0.1", docs, Collections.emptyList()); - var ret = domainProcessor.process(asSerializableCrawlData(domain)); + var ret = domainProcessor.fullProcessing(asSerializableCrawlData(domain)); assertEquals(ret.state, DomainIndexingState.ACTIVE); assertEquals(ret.domain, new EdgeDomain("memex.marginalia.nu")); @@ -51,7 +51,7 @@ public class ConvertingIntegrationTest { } @Test public void testMemexMarginaliaNuDateInternalConsistency() throws IOException { - var ret = domainProcessor.process(asSerializableCrawlData(readMarginaliaWorkingSet())); + var ret = domainProcessor.fullProcessing(asSerializableCrawlData(readMarginaliaWorkingSet())); ret.documents.stream().filter(ProcessedDocument::isProcessedFully).forEach(doc -> { int year = PubDate.fromYearByte(doc.details.metadata.year()); Integer yearMeta = doc.details.pubYear; @@ -64,7 +64,7 @@ public class ConvertingIntegrationTest { @Test public void testMemexMarginaliaNu() throws IOException { - var ret = domainProcessor.process(asSerializableCrawlData(readMarginaliaWorkingSet())); + var ret = domainProcessor.fullProcessing(asSerializableCrawlData(readMarginaliaWorkingSet())); assertNotNull(ret); assertEquals(ret.state, DomainIndexingState.ACTIVE); assertEquals(ret.domain, new EdgeDomain("memex.marginalia.nu")); diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java index 535eac31..3e6bc5eb 100644 --- a/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java @@ -251,7 +251,7 @@ public class CrawlingThenConvertingIntegrationTest { private ProcessedDomain process() { try (var stream = new ParquetSerializableCrawlDataStream(fileName2)) { - return domainProcessor.process(stream); + return domainProcessor.fullProcessing(stream); } catch (Exception e) { Assertions.fail(e); diff --git a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/experiments/SiteStatisticsExperiment.java b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/experiments/SiteStatisticsExperiment.java index 98c11e7f..0afb290f 100644 --- a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/experiments/SiteStatisticsExperiment.java +++ b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/experiments/SiteStatisticsExperiment.java @@ -22,7 +22,7 @@ public class SiteStatisticsExperiment extends Experiment { @Override public boolean process(SerializableCrawlDataStream stream) { - var ret = domainProcessor.process(stream); + var ret = domainProcessor.fullProcessing(stream); ret.documents.stream() .filter(ProcessedDocument::isProcessedFully)