From 7ba296ccdfea1ba6f57358ba3d94ec96101b99c0 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Sat, 30 Dec 2023 13:05:10 +0100 Subject: [PATCH] (converter) Route sizeHint to SideloadProcessing Route the sizeHint from the input parquet file to SideloadProcessing, so that it can set sideloadSizeAdvice appropriately, instead of using a fixed "large" number. This is necessary to populate the KNOWN_URL column in the domain data table, which is important as it is used in e.g. calculating how far to re-crawl the site in the future. --- .../converting/processor/DomainProcessor.java | 18 +++++++++--------- .../converting/ConvertingIntegrationTest.java | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java index f108321a..e97aa057 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java @@ -57,19 +57,20 @@ public class DomainProcessor { } public ConverterBatchWritableIf createWritable(SerializableCrawlDataStream domain) { - if (domain.sizeHint() > 10_000) { + final int sizeHint = domain.sizeHint(); + + if (sizeHint > 10_000) { // If the file is too big, we run a processing mode that doesn't // require loading the entire dataset into RAM - logger.info("Sideloading {}", domain.path()); - return sideloadProcessing(domain); + return sideloadProcessing(domain, sizeHint); } return fullProcessing(domain); } - public SideloadProcessing sideloadProcessing(SerializableCrawlDataStream dataStream) { + public SideloadProcessing sideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) { try { - return new SideloadProcessing(dataStream); + return new SideloadProcessing(dataStream, sizeHint); } catch (Exception ex) { logger.warn("Failed to process domain sideload", ex); @@ -86,17 +87,16 @@ public class DomainProcessor { private final DomainLinks externalDomainLinks; private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator(); - SideloadProcessing(SerializableCrawlDataStream dataStream) throws IOException { + SideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) throws IOException { this.dataStream = dataStream; - if (!dataStream.hasNext() - || !(dataStream.next() instanceof CrawledDomain crawledDomain)) + if (!dataStream.hasNext() || !(dataStream.next() instanceof CrawledDomain crawledDomain)) { throw new IllegalStateException("First record must be a domain, was " + dataStream.next().getClass().getSimpleName()); } domain = new ProcessedDomain(); - domain.sizeloadSizeAdvice = 10_000; + domain.sizeloadSizeAdvice = sizeHint == 0 ? 10_000 : sizeHint; documentDecorator = new DocumentDecorator(anchorTextKeywords); processDomain(crawledDomain, domain, documentDecorator); diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java index 141777d6..61de3c38 100644 --- a/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/ConvertingIntegrationTest.java @@ -96,7 +96,7 @@ public class ConvertingIntegrationTest { @Test public void testMemexMarginaliaNuSideloadProcessing() throws IOException { - var ret = domainProcessor.sideloadProcessing(asSerializableCrawlData(readMarginaliaWorkingSet())); + var ret = domainProcessor.sideloadProcessing(asSerializableCrawlData(readMarginaliaWorkingSet()), 100); assertNotNull(ret); assertEquals("memex.marginalia.nu", ret.id());