diff --git a/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java b/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java index 0da8db92..158c9ed9 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java @@ -193,26 +193,31 @@ public class DomainProcessor { @Override public Iterator getDocumentsStream() { - return dataStream.map((next) -> { - if (!(next instanceof CrawledDocument doc)) - return Optional.empty(); + return iteratorFactory.create((taskConsumer) -> { + while (dataStream.hasNext()) + { + if (!(dataStream.next() instanceof CrawledDocument doc)) + continue; + if (doc.url == null || !processedUrls.add(doc.url)) + continue; - if (doc.url == null || !processedUrls.add(doc.url)) - return Optional.empty(); - var processedDoc = documentProcessor.process(doc, domain.domain, externalDomainLinks, documentDecorator); + taskConsumer.accept(() -> { + var processedDoc = documentProcessor.process(doc, domain.domain, externalDomainLinks, documentDecorator); - synchronized (deduplicator) { - deduplicator.markIfDuplicate(processedDoc); + synchronized (deduplicator) { + deduplicator.markIfDuplicate(processedDoc); + } + + if (processedDoc.isProcessedFully()) { + // This is a bit sketchy, but we need to set the size and topology to something + processedDoc.details.metadata = processedDoc.details.metadata.withSizeAndTopology( + 10_000, externalDomainLinks.countForUrl(processedDoc.url)); + } + + return processedDoc; + }); } - - if (processedDoc.isProcessedFully()) { - // This is a bit sketchy, but we need to set the size and topology to something - processedDoc.details.metadata = processedDoc.details.metadata.withSizeAndTopology( - 10_000, externalDomainLinks.countForUrl(processedDoc.url)); - } - - return Optional.of(processedDoc); }); }