From e7dd28b926d6886dc7e2a177fdae3d38098263d0 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Fri, 29 Dec 2023 14:25:48 +0100 Subject: [PATCH] (converter) Optimize sideload-loading Use ProcessingIterator to fan out processing of documents across more cores, instead of doing all of it in the writer thread blocking everything else with slow single-threaded processing. --- .../converting/processor/DomainProcessor.java | 50 ++++++------------- 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java index 391be0df..f108321a 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java @@ -20,6 +20,7 @@ import nu.marginalia.converting.model.ProcessedDomain; import nu.marginalia.model.EdgeDomain; import nu.marginalia.converting.processor.logic.links.TopKeywords; import nu.marginalia.converting.processor.logic.LshDocumentDeduplicator; +import nu.marginalia.util.ProcessingIterator; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -110,26 +111,21 @@ public class DomainProcessor { @Override public Iterator getDocumentsStream() { - return new DocumentsIterator(); - } + return new ProcessingIterator<>(24, 16, (taskConsumer) -> { + while (dataStream.hasNext()) + { + if (!(dataStream.next() instanceof CrawledDocument doc)) + continue; + if (doc.url == null || !processedUrls.add(doc.url)) + continue; - class DocumentsIterator implements Iterator { - ProcessedDocument next = null; - @Override - public boolean hasNext() { - try { - while (next == null - && dataStream.hasNext()) - { - if (!(dataStream.next() instanceof CrawledDocument doc)) - continue; - if (doc.url == null || !processedUrls.add(doc.url)) - continue; + taskConsumer.accept(() -> { var processedDoc = documentProcessor.process(doc, domain.domain, externalDomainLinks, documentDecorator); - deduplicator.markIfDuplicate(processedDoc); - next = processedDoc; + synchronized (deduplicator) { + deduplicator.markIfDuplicate(processedDoc); + } if (processedDoc.isProcessedFully()) { // This is a bit sketchy, but we need to set the size and topology to something @@ -137,26 +133,10 @@ public class DomainProcessor { 10_000, externalDomainLinks.countForUrl(processedDoc.url)); } - return true; - } + return processedDoc; + }); } - catch (IOException ex) { - logger.warn("Failed to process domain sideload", ex); - } - - return false; - } - - @Override - public ProcessedDocument next() { - try { - if (next == null && !hasNext()) - throw new NoSuchElementException(); - return next; - } finally { - next = null; - } - } + }); } @Override