diff --git a/code/common/process/src/main/java/nu/marginalia/util/ParallelPipe.java b/code/common/process/src/main/java/nu/marginalia/util/ParallelPipe.java
deleted file mode 100644
index fc95debe..00000000
--- a/code/common/process/src/main/java/nu/marginalia/util/ParallelPipe.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package nu.marginalia.util;
-
-import lombok.SneakyThrows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/** Generalization of the workflow
- * -- single provider thread reading sequentially from disk
- * -> multiple independent CPU-bound processing tasks
- * -> single consumer thread writing to network/disk
- *
- */
-public abstract class ParallelPipe {
- private final LinkedBlockingQueue inputs;
- private final LinkedBlockingQueue intermediates;
-
- private final Logger logger = LoggerFactory.getLogger(getClass());
-
- private final List processThreads = new ArrayList<>();
- private final Thread receiverThread;
-
- private volatile boolean expectingInput = true;
- private volatile boolean expectingOutput = true;
-
- public ParallelPipe(String name, int numberOfThreads, int inputQueueSize, int intermediateQueueSize) {
- inputs = new LinkedBlockingQueue<>(inputQueueSize);
- intermediates = new LinkedBlockingQueue<>(intermediateQueueSize);
-
- for (int i = 0; i < numberOfThreads; i++) {
- processThreads.add(new Thread(this::runProcessThread, name + "-process["+i+"]"));
- }
- receiverThread = new Thread(this::runReceiverThread, name + "-receiver");
-
- processThreads.forEach(Thread::start);
- receiverThread.start();
- }
-
- public void clearQueues() {
- inputs.clear();
- intermediates.clear();
- }
-
- @SneakyThrows
- private void runProcessThread() {
- while (expectingInput || !inputs.isEmpty()) {
- var in = inputs.poll(10, TimeUnit.SECONDS);
-
- if (in != null) {
- try {
- var ret = onProcess(in);
- if (ret != null) {
- intermediates.put(ret);
- }
- }
- catch (InterruptedException ex) {
- throw ex;
- }
- catch (Exception ex) {
- logger.error("Exception", ex);
- }
-
- }
- }
-
- logger.info("Terminating {}", Thread.currentThread().getName());
- }
-
- @SneakyThrows
- private void runReceiverThread() {
- while (expectingOutput || !inputs.isEmpty() || !intermediates.isEmpty()) {
- var intermediate = intermediates.poll(997, TimeUnit.MILLISECONDS);
- if (intermediate != null) {
- try {
- onReceive(intermediate);
- }
- catch (Exception ex) {
- logger.error("Exception", ex);
- }
- }
- }
-
- logger.info("Terminating {}", Thread.currentThread().getName());
- }
-
- /** Begin processing an item */
- @SneakyThrows
- public void accept(INPUT input) {
- inputs.put(input);
- }
-
- /** The meat of the processor thread runtime */
- protected abstract INTERMEDIATE onProcess(INPUT input) throws Exception;
-
- /** The meat of the consumer thread runtime */
- protected abstract void onReceive(INTERMEDIATE intermediate) throws Exception;
-
- public void join() throws InterruptedException {
- expectingInput = false;
-
- for (var thread : processThreads) {
- thread.join();
- }
-
- expectingOutput = false;
- receiverThread.join();
- }
-}
diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java
index f35740ce..9c8373e1 100644
--- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java
+++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java
@@ -4,7 +4,7 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
-import nu.marginalia.crawling.io.SerializableCrawlDataStream;
+import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
@@ -17,7 +17,6 @@ import plan.CrawlPlan;
import nu.marginalia.converting.compiler.InstructionsCompiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.processor.DomainProcessor;
-import nu.marginalia.util.ParallelPipe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,6 +25,9 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -55,7 +57,7 @@ public class ConverterMain {
var request = converter.fetchInstructions();
try {
- converter.load(request);
+ converter.convert(request);
request.ok();
}
catch (Exception ex) {
@@ -87,58 +89,64 @@ public class ConverterMain {
heartbeat.start();
}
-
-
- public void load(ConvertRequest request) throws Exception {
+ public void convert(ConvertRequest request) throws Exception {
var plan = request.getPlan();
+ final int maxPoolSize = 16;
+
try (WorkLog processLog = plan.createProcessWorkLog();
ConversionLog log = new ConversionLog(plan.process.getDir())) {
- var instructionWriter = new InstructionWriter(log, plan.process.getDir(), gson);
+ var instructionWriter = new InstructionWriterFactory(log, plan.process.getDir(), gson);
+
+ Semaphore semaphore = new Semaphore(maxPoolSize);
+ var pool = new ThreadPoolExecutor(
+ maxPoolSize/4,
+ maxPoolSize,
+ 5, TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(8)
+ );
int totalDomains = plan.countCrawledDomains();
AtomicInteger processedDomains = new AtomicInteger(0);
- var pipe = new ParallelPipe("Converter", 16, 4, 2) {
-
- @Override
- protected ProcessingInstructions onProcess(SerializableCrawlDataStream dataStream) {
- var processed = processor.process(dataStream);
- var compiled = compiler.compile(processed);
-
- return new ProcessingInstructions(processed.id, compiled);
- }
-
- @Override
- protected void onReceive(ProcessingInstructions processedInstructions) throws IOException {
- Thread.currentThread().setName("Converter:Receiver["+processedInstructions.id+"]");
- try {
- var instructions = processedInstructions.instructions;
- instructions.removeIf(Instruction::isNoOp);
-
- String where = instructionWriter.accept(processedInstructions.id, instructions);
- processLog.setJobToFinished(processedInstructions.id, where, instructions.size());
-
- heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
- }
- finally {
- Thread.currentThread().setName("Converter:Receiver[IDLE]");
- }
- }
-
- };
-
// Advance the progress bar to the current position if this is a resumption
processedDomains.set(processLog.countFinishedJobs());
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
for (var domain : plan.crawlDataIterable(id -> !processLog.isJobFinished(id)))
{
- pipe.accept(domain);
+ semaphore.acquire();
+ pool.execute(() -> {
+ try {
+ ProcessedDomain processed = processor.process(domain);
+
+ final String where;
+ final int size;
+
+ try (var writer = instructionWriter.createInstructionsForDomainWriter(processed.id)) {
+ compiler.compile(processed, writer::accept);
+ where = writer.getFileName();
+ size = writer.getSize();
+ }
+
+ processLog.setJobToFinished(processed.id, where, size);
+ heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
+ }
+ catch (IOException ex) {
+ logger.warn("IO exception in converter", ex);
+ }
+ finally {
+ semaphore.release();
+ }
+ });
}
- pipe.join();
+ pool.shutdown();
+ do {
+ System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
+ } while (!pool.awaitTermination(60, TimeUnit.SECONDS));
+
request.ok();
}
catch (Exception e) {
@@ -205,7 +213,4 @@ public class ConverterMain {
}
}
-
- record ProcessingInstructions(String id, List instructions) {}
-
}
diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriter.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java
similarity index 65%
rename from code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriter.java
rename to code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java
index 826c41cd..e6009d0e 100644
--- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriter.java
+++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java
@@ -15,22 +15,18 @@ import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
+import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.List;
-public class InstructionWriter {
+public class InstructionWriterFactory {
- private ConversionLog log;
+ private final ConversionLog log;
private final Path outputDir;
private final Gson gson;
- private static final Logger logger = LoggerFactory.getLogger(InstructionWriter.class);
+ private static final Logger logger = LoggerFactory.getLogger(InstructionWriterFactory.class);
- public InstructionWriter(ConversionLog log, Path outputDir, Gson gson) {
+ public InstructionWriterFactory(ConversionLog log, Path outputDir, Gson gson) {
this.log = log;
this.outputDir = outputDir;
this.gson = gson;
@@ -40,29 +36,57 @@ public class InstructionWriter {
}
}
- public String accept(String id, List instructionList) throws IOException {
+ public InstructionWriter createInstructionsForDomainWriter(String id) throws IOException {
Path outputFile = getOutputFile(id);
+ return new InstructionWriter(outputFile);
+ }
- if (Files.exists(outputFile)) {
- Files.delete(outputFile);
+ public class InstructionWriter implements AutoCloseable {
+ private final OutputStreamWriter outputStream;
+ private final String where;
+ private final SummarizingInterpreter summary = new SummarizingInterpreter();
+
+ private int size = 0;
+
+
+ InstructionWriter(Path filename) throws IOException {
+ where = filename.getFileName().toString();
+ Files.deleteIfExists(filename);
+ outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(filename.toFile()))));
}
- try (var outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(outputFile.toFile()))))) {
+ public void accept(Instruction instruction) {
+ if (instruction.isNoOp()) return;
- SummarizingInterpreter summary = new SummarizingInterpreter(instructionList);
- logger.info("Writing {} - {} - {}", id, instructionList.size(), summary);
+ instruction.apply(summary);
+ instruction.apply(log);
- for (var instr : instructionList) {
- instr.apply(log);
+ size++;
- outputStream.append(instr.tag().name());
+ try {
+ outputStream.append(instruction.tag().name());
outputStream.append(' ');
- gson.toJson(instr, outputStream);
+ gson.toJson(instruction, outputStream);
outputStream.append('\n');
}
+ catch (IOException ex) {
+ logger.warn("IO exception writing instruction", ex);
+ }
}
- return outputFile.getFileName().toString();
+ @Override
+ public void close() throws IOException {
+ logger.info("Wrote {} - {} - {}", where, size, summary);
+ outputStream.close();
+ }
+
+ public String getFileName() {
+ return where;
+ }
+
+ public int getSize() {
+ return size;
+ }
}
private Path getOutputFile(String id) throws IOException {
@@ -79,12 +103,6 @@ public class InstructionWriter {
private static class SummarizingInterpreter implements Interpreter {
- private SummarizingInterpreter(List instructions) {
- for (var i : instructions) {
- i.apply(this);
- }
- }
-
private String domainName;
private int ok = 0;
private int error = 0;
diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java
index 3849f015..881a1a33 100644
--- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java
+++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java
@@ -7,34 +7,35 @@ import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.model.crawl.HtmlFeature;
import java.util.List;
+import java.util.function.Consumer;
public class DocumentsCompiler {
- public void compile(List ret, List documents) {
+ public void compile(Consumer instructionConsumer, List documents) {
for (var doc : documents) {
- compileDocumentDetails(ret, doc);
+ compileDocumentDetails(instructionConsumer, doc);
}
for (var doc : documents) {
- compileWords(ret, doc);
+ compileWords(instructionConsumer, doc);
}
}
- private void compileDocumentDetails(List ret, ProcessedDocument doc) {
+ private void compileDocumentDetails(Consumer instructionConsumer, ProcessedDocument doc) {
var details = doc.details;
if (details != null) {
- ret.add(new LoadProcessedDocument(doc.url, doc.state, details.title, details.description, HtmlFeature.encode(details.features), details.standard.name(), details.length, details.hashCode, details.quality, details.pubYear));
+ instructionConsumer.accept(new LoadProcessedDocument(doc.url, doc.state, details.title, details.description, HtmlFeature.encode(details.features), details.standard.name(), details.length, details.hashCode, details.quality, details.pubYear));
}
}
- private void compileWords(List ret, ProcessedDocument doc) {
+ private void compileWords(Consumer instructionConsumer, ProcessedDocument doc) {
var words = doc.words;
if (words != null) {
- ret.add(new LoadKeywords(doc.url, doc.details.metadata, words.build()));
+ instructionConsumer.accept(new LoadKeywords(doc.url, doc.details.metadata, words.build()));
}
}
diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java
index e80f42eb..74ae5816 100644
--- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java
+++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java
@@ -11,11 +11,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Consumer;
public class DomainMetadataCompiler {
- public void compile(List ret, EdgeDomain domain, @NotNull List documents) {
+ public void compile(Consumer instructionConsumer, EdgeDomain domain, @NotNull List documents) {
int visitedUrls = 0;
int goodUrls = 0;
@@ -36,7 +37,7 @@ public class DomainMetadataCompiler {
.ifPresent(knownUrls::addAll);
}
- ret.add(new LoadDomainMetadata(domain, knownUrls.size(), goodUrls, visitedUrls));
+ instructionConsumer.accept(new LoadDomainMetadata(domain, knownUrls.size(), goodUrls, visitedUrls));
}
}
diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/FeedsCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/FeedsCompiler.java
index 64779a0f..2c111ea2 100644
--- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/FeedsCompiler.java
+++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/FeedsCompiler.java
@@ -7,10 +7,11 @@ import nu.marginalia.model.EdgeUrl;
import java.util.List;
import java.util.Objects;
+import java.util.function.Consumer;
public class FeedsCompiler {
- public void compile(List ret, List documents) {
+ public void compile(Consumer instructionConsumer, List documents) {
EdgeUrl[] feeds = documents.stream().map(doc -> doc.details)
.filter(Objects::nonNull)
@@ -18,6 +19,6 @@ public class FeedsCompiler {
.distinct()
.toArray(EdgeUrl[]::new);
- ret.add(new LoadRssFeed(feeds));
+ instructionConsumer.accept(new LoadRssFeed(feeds));
}
}
diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java
index 71bf7785..9b32ed8d 100644
--- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java
+++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java
@@ -8,6 +8,7 @@ import nu.marginalia.converting.model.ProcessedDomain;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.function.Consumer;
import static java.util.Objects.requireNonNullElse;
@@ -35,25 +36,21 @@ public class InstructionsCompiler {
this.redirectCompiler = redirectCompiler;
}
- public List compile(ProcessedDomain domain) {
- List ret = new ArrayList<>(domain.size()*4);
-
+ public void compile(ProcessedDomain domain, Consumer instructionConsumer) {
// Guaranteed to always be first
- ret.add(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));
+ instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));
if (domain.documents != null) {
- urlsCompiler.compile(ret, domain.documents);
- documentsCompiler.compile(ret, domain.documents);
+ urlsCompiler.compile(instructionConsumer, domain.documents);
+ documentsCompiler.compile(instructionConsumer, domain.documents);
- feedsCompiler.compile(ret, domain.documents);
- linksCompiler.compile(ret, domain.domain, domain.documents);
+ feedsCompiler.compile(instructionConsumer, domain.documents);
+ linksCompiler.compile(instructionConsumer, domain.domain, domain.documents);
}
if (domain.redirect != null) {
- redirectCompiler.compile(ret, domain.domain, domain.redirect);
+ redirectCompiler.compile(instructionConsumer, domain.domain, domain.redirect);
}
- domainMetadataCompiler.compile(ret, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList()));
-
- return ret;
+ domainMetadataCompiler.compile(instructionConsumer, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList()));
}
}
diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/LinksCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/LinksCompiler.java
index a578602d..e100cb86 100644
--- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/LinksCompiler.java
+++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/LinksCompiler.java
@@ -8,10 +8,11 @@ import nu.marginalia.model.EdgeDomain;
import java.util.List;
import java.util.Objects;
+import java.util.function.Consumer;
public class LinksCompiler {
- public void compile(List ret, EdgeDomain from, List documents) {
+ public void compile(Consumer instructionConsumer, EdgeDomain from, List documents) {
DomainLink[] links = documents.stream().map(doc -> doc.details)
.filter(Objects::nonNull)
@@ -21,6 +22,6 @@ public class LinksCompiler {
.map(domain -> new DomainLink(from, domain))
.toArray(DomainLink[]::new);
- ret.add(new LoadDomainLink(links));
+ instructionConsumer.accept(new LoadDomainLink(links));
}
}
diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/RedirectCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/RedirectCompiler.java
index b14dedca..dcd0201f 100644
--- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/RedirectCompiler.java
+++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/RedirectCompiler.java
@@ -8,12 +8,13 @@ import nu.marginalia.converting.instruction.instructions.LoadDomainRedirect;
import nu.marginalia.model.EdgeDomain;
import java.util.List;
+import java.util.function.Consumer;
public class RedirectCompiler {
- public void compile(List ret, EdgeDomain from, EdgeDomain to) {
- ret.add(new LoadDomain(to));
- ret.add(new LoadDomainLink(new DomainLink(from, to)));
- ret.add(new LoadDomainRedirect(new DomainLink(from, to)));
+ public void compile(Consumer instructionConsumer, EdgeDomain from, EdgeDomain to) {
+ instructionConsumer.accept(new LoadDomain(to));
+ instructionConsumer.accept(new LoadDomainLink(new DomainLink(from, to)));
+ instructionConsumer.accept(new LoadDomainRedirect(new DomainLink(from, to)));
}
}
diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java
index d5184cfc..ba347058 100644
--- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java
+++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java
@@ -13,13 +13,14 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.function.Consumer;
public class UrlsCompiler {
private static final int MAX_INTERNAL_LINKS = 25;
private final Logger logger = LoggerFactory.getLogger(getClass());
- public void compile(List ret, List documents) {
+ public void compile(Consumer instructionConsumer, List documents) {
Set seenUrls = new HashSet<>(documents.size()*4);
Set seenDomains = new HashSet<>(documents.size());
@@ -53,8 +54,8 @@ public class UrlsCompiler {
}
}
- ret.add(new LoadDomain(seenDomains.toArray(EdgeDomain[]::new)));
- ret.add(new LoadUrl(seenUrls.toArray(EdgeUrl[]::new)));
+ instructionConsumer.accept(new LoadDomain(seenDomains.toArray(EdgeDomain[]::new)));
+ instructionConsumer.accept(new LoadUrl(seenUrls.toArray(EdgeUrl[]::new)));
}
}
diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java
index 124a2a49..032f2c23 100644
--- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java
+++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java
@@ -119,6 +119,7 @@ public class ProcessService {
}
opts.put("WMSA_HOME", WMSA_HOME);
opts.put("JAVA_HOME", System.getenv("JAVA_HOME"));
+ opts.put("JAVA_OPTS", "");
opts.put("CONVERTER_OPTS", System.getenv("CONVERTER_OPTS"));
opts.put("LOADER_OPTS", System.getenv("LOADER_OPTS"));
opts.put("CRAWLER_OPTS", System.getenv("CRAWLER_OPTS"));