(converter) Refactor converter to not keep instructions list in RAM.

(converter) Refactor converter to not keep instructions list in RAM.

(converter) Refactor converter to not keep instructions list in RAM.
This commit is contained in:
Viktor Lofgren 2023-07-25 20:41:43 +02:00
parent fd44e09ebd
commit 507f26ad47
11 changed files with 126 additions and 211 deletions

View File

@ -1,112 +0,0 @@
package nu.marginalia.util;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/** Generalization of the workflow <br>
* -- single provider thread reading sequentially from disk <br>
* -> multiple independent CPU-bound processing tasks <br>
* -> single consumer thread writing to network/disk <br>
* <p>
*/
public abstract class ParallelPipe<INPUT,INTERMEDIATE> {
private final LinkedBlockingQueue<INPUT> inputs;
private final LinkedBlockingQueue<INTERMEDIATE> intermediates;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final List<Thread> processThreads = new ArrayList<>();
private final Thread receiverThread;
private volatile boolean expectingInput = true;
private volatile boolean expectingOutput = true;
public ParallelPipe(String name, int numberOfThreads, int inputQueueSize, int intermediateQueueSize) {
inputs = new LinkedBlockingQueue<>(inputQueueSize);
intermediates = new LinkedBlockingQueue<>(intermediateQueueSize);
for (int i = 0; i < numberOfThreads; i++) {
processThreads.add(new Thread(this::runProcessThread, name + "-process["+i+"]"));
}
receiverThread = new Thread(this::runReceiverThread, name + "-receiver");
processThreads.forEach(Thread::start);
receiverThread.start();
}
public void clearQueues() {
inputs.clear();
intermediates.clear();
}
@SneakyThrows
private void runProcessThread() {
while (expectingInput || !inputs.isEmpty()) {
var in = inputs.poll(10, TimeUnit.SECONDS);
if (in != null) {
try {
var ret = onProcess(in);
if (ret != null) {
intermediates.put(ret);
}
}
catch (InterruptedException ex) {
throw ex;
}
catch (Exception ex) {
logger.error("Exception", ex);
}
}
}
logger.info("Terminating {}", Thread.currentThread().getName());
}
@SneakyThrows
private void runReceiverThread() {
while (expectingOutput || !inputs.isEmpty() || !intermediates.isEmpty()) {
var intermediate = intermediates.poll(997, TimeUnit.MILLISECONDS);
if (intermediate != null) {
try {
onReceive(intermediate);
}
catch (Exception ex) {
logger.error("Exception", ex);
}
}
}
logger.info("Terminating {}", Thread.currentThread().getName());
}
/** Begin processing an item */
@SneakyThrows
public void accept(INPUT input) {
inputs.put(input);
}
/** The meat of the processor thread runtime */
protected abstract INTERMEDIATE onProcess(INPUT input) throws Exception;
/** The meat of the consumer thread runtime */
protected abstract void onReceive(INTERMEDIATE intermediate) throws Exception;
public void join() throws InterruptedException {
expectingInput = false;
for (var thread : processThreads) {
thread.join();
}
expectingOutput = false;
receiverThread.join();
}
}

View File

@ -4,7 +4,7 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
@ -17,7 +17,6 @@ import plan.CrawlPlan;
import nu.marginalia.converting.compiler.InstructionsCompiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.util.ParallelPipe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,6 +25,9 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -55,7 +57,7 @@ public class ConverterMain {
var request = converter.fetchInstructions();
try {
converter.load(request);
converter.convert(request);
request.ok();
}
catch (Exception ex) {
@ -87,58 +89,64 @@ public class ConverterMain {
heartbeat.start();
}
public void load(ConvertRequest request) throws Exception {
public void convert(ConvertRequest request) throws Exception {
var plan = request.getPlan();
final int maxPoolSize = 16;
try (WorkLog processLog = plan.createProcessWorkLog();
ConversionLog log = new ConversionLog(plan.process.getDir())) {
var instructionWriter = new InstructionWriter(log, plan.process.getDir(), gson);
var instructionWriter = new InstructionWriterFactory(log, plan.process.getDir(), gson);
Semaphore semaphore = new Semaphore(maxPoolSize);
var pool = new ThreadPoolExecutor(
maxPoolSize/4,
maxPoolSize,
5, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(8)
);
int totalDomains = plan.countCrawledDomains();
AtomicInteger processedDomains = new AtomicInteger(0);
var pipe = new ParallelPipe<SerializableCrawlDataStream, ProcessingInstructions>("Converter", 16, 4, 2) {
@Override
protected ProcessingInstructions onProcess(SerializableCrawlDataStream dataStream) {
var processed = processor.process(dataStream);
var compiled = compiler.compile(processed);
return new ProcessingInstructions(processed.id, compiled);
}
@Override
protected void onReceive(ProcessingInstructions processedInstructions) throws IOException {
Thread.currentThread().setName("Converter:Receiver["+processedInstructions.id+"]");
try {
var instructions = processedInstructions.instructions;
instructions.removeIf(Instruction::isNoOp);
String where = instructionWriter.accept(processedInstructions.id, instructions);
processLog.setJobToFinished(processedInstructions.id, where, instructions.size());
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
finally {
Thread.currentThread().setName("Converter:Receiver[IDLE]");
}
}
};
// Advance the progress bar to the current position if this is a resumption
processedDomains.set(processLog.countFinishedJobs());
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
for (var domain : plan.crawlDataIterable(id -> !processLog.isJobFinished(id)))
{
pipe.accept(domain);
semaphore.acquire();
pool.execute(() -> {
try {
ProcessedDomain processed = processor.process(domain);
final String where;
final int size;
try (var writer = instructionWriter.createInstructionsForDomainWriter(processed.id)) {
compiler.compile(processed, writer::accept);
where = writer.getFileName();
size = writer.getSize();
}
processLog.setJobToFinished(processed.id, where, size);
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
catch (IOException ex) {
logger.warn("IO exception in converter", ex);
}
finally {
semaphore.release();
}
});
}
pipe.join();
pool.shutdown();
do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
request.ok();
}
catch (Exception e) {
@ -205,7 +213,4 @@ public class ConverterMain {
}
}
record ProcessingInstructions(String id, List<Instruction> instructions) {}
}

View File

@ -15,22 +15,18 @@ import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
public class InstructionWriter {
public class InstructionWriterFactory {
private ConversionLog log;
private final ConversionLog log;
private final Path outputDir;
private final Gson gson;
private static final Logger logger = LoggerFactory.getLogger(InstructionWriter.class);
private static final Logger logger = LoggerFactory.getLogger(InstructionWriterFactory.class);
public InstructionWriter(ConversionLog log, Path outputDir, Gson gson) {
public InstructionWriterFactory(ConversionLog log, Path outputDir, Gson gson) {
this.log = log;
this.outputDir = outputDir;
this.gson = gson;
@ -40,29 +36,57 @@ public class InstructionWriter {
}
}
public String accept(String id, List<Instruction> instructionList) throws IOException {
public InstructionWriter createInstructionsForDomainWriter(String id) throws IOException {
Path outputFile = getOutputFile(id);
return new InstructionWriter(outputFile);
}
if (Files.exists(outputFile)) {
Files.delete(outputFile);
public class InstructionWriter implements AutoCloseable {
private final OutputStreamWriter outputStream;
private final String where;
private final SummarizingInterpreter summary = new SummarizingInterpreter();
private int size = 0;
InstructionWriter(Path filename) throws IOException {
where = filename.getFileName().toString();
Files.deleteIfExists(filename);
outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(filename.toFile()))));
}
try (var outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(outputFile.toFile()))))) {
public void accept(Instruction instruction) {
if (instruction.isNoOp()) return;
SummarizingInterpreter summary = new SummarizingInterpreter(instructionList);
logger.info("Writing {} - {} - {}", id, instructionList.size(), summary);
instruction.apply(summary);
instruction.apply(log);
for (var instr : instructionList) {
instr.apply(log);
size++;
outputStream.append(instr.tag().name());
try {
outputStream.append(instruction.tag().name());
outputStream.append(' ');
gson.toJson(instr, outputStream);
gson.toJson(instruction, outputStream);
outputStream.append('\n');
}
catch (IOException ex) {
logger.warn("IO exception writing instruction", ex);
}
}
return outputFile.getFileName().toString();
@Override
public void close() throws IOException {
logger.info("Wrote {} - {} - {}", where, size, summary);
outputStream.close();
}
public String getFileName() {
return where;
}
public int getSize() {
return size;
}
}
private Path getOutputFile(String id) throws IOException {
@ -79,12 +103,6 @@ public class InstructionWriter {
private static class SummarizingInterpreter implements Interpreter {
private SummarizingInterpreter(List<Instruction> instructions) {
for (var i : instructions) {
i.apply(this);
}
}
private String domainName;
private int ok = 0;
private int error = 0;

View File

@ -7,34 +7,35 @@ import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.model.crawl.HtmlFeature;
import java.util.List;
import java.util.function.Consumer;
public class DocumentsCompiler {
public void compile(List<Instruction> ret, List<ProcessedDocument> documents) {
public void compile(Consumer<Instruction> instructionConsumer, List<ProcessedDocument> documents) {
for (var doc : documents) {
compileDocumentDetails(ret, doc);
compileDocumentDetails(instructionConsumer, doc);
}
for (var doc : documents) {
compileWords(ret, doc);
compileWords(instructionConsumer, doc);
}
}
private void compileDocumentDetails(List<Instruction> ret, ProcessedDocument doc) {
private void compileDocumentDetails(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
var details = doc.details;
if (details != null) {
ret.add(new LoadProcessedDocument(doc.url, doc.state, details.title, details.description, HtmlFeature.encode(details.features), details.standard.name(), details.length, details.hashCode, details.quality, details.pubYear));
instructionConsumer.accept(new LoadProcessedDocument(doc.url, doc.state, details.title, details.description, HtmlFeature.encode(details.features), details.standard.name(), details.length, details.hashCode, details.quality, details.pubYear));
}
}
private void compileWords(List<Instruction> ret, ProcessedDocument doc) {
private void compileWords(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
var words = doc.words;
if (words != null) {
ret.add(new LoadKeywords(doc.url, doc.details.metadata, words.build()));
instructionConsumer.accept(new LoadKeywords(doc.url, doc.details.metadata, words.build()));
}
}

View File

@ -11,11 +11,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
public class DomainMetadataCompiler {
public void compile(List<Instruction> ret, EdgeDomain domain, @NotNull List<ProcessedDocument> documents) {
public void compile(Consumer<Instruction> instructionConsumer, EdgeDomain domain, @NotNull List<ProcessedDocument> documents) {
int visitedUrls = 0;
int goodUrls = 0;
@ -36,7 +37,7 @@ public class DomainMetadataCompiler {
.ifPresent(knownUrls::addAll);
}
ret.add(new LoadDomainMetadata(domain, knownUrls.size(), goodUrls, visitedUrls));
instructionConsumer.accept(new LoadDomainMetadata(domain, knownUrls.size(), goodUrls, visitedUrls));
}
}

View File

@ -7,10 +7,11 @@ import nu.marginalia.model.EdgeUrl;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
public class FeedsCompiler {
public void compile(List<Instruction> ret, List<ProcessedDocument> documents) {
public void compile(Consumer<Instruction> instructionConsumer, List<ProcessedDocument> documents) {
EdgeUrl[] feeds = documents.stream().map(doc -> doc.details)
.filter(Objects::nonNull)
@ -18,6 +19,6 @@ public class FeedsCompiler {
.distinct()
.toArray(EdgeUrl[]::new);
ret.add(new LoadRssFeed(feeds));
instructionConsumer.accept(new LoadRssFeed(feeds));
}
}

View File

@ -8,6 +8,7 @@ import nu.marginalia.converting.model.ProcessedDomain;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import static java.util.Objects.requireNonNullElse;
@ -35,25 +36,21 @@ public class InstructionsCompiler {
this.redirectCompiler = redirectCompiler;
}
public List<Instruction> compile(ProcessedDomain domain) {
List<Instruction> ret = new ArrayList<>(domain.size()*4);
public void compile(ProcessedDomain domain, Consumer<Instruction> instructionConsumer) {
// Guaranteed to always be first
ret.add(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));
instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));
if (domain.documents != null) {
urlsCompiler.compile(ret, domain.documents);
documentsCompiler.compile(ret, domain.documents);
urlsCompiler.compile(instructionConsumer, domain.documents);
documentsCompiler.compile(instructionConsumer, domain.documents);
feedsCompiler.compile(ret, domain.documents);
linksCompiler.compile(ret, domain.domain, domain.documents);
feedsCompiler.compile(instructionConsumer, domain.documents);
linksCompiler.compile(instructionConsumer, domain.domain, domain.documents);
}
if (domain.redirect != null) {
redirectCompiler.compile(ret, domain.domain, domain.redirect);
redirectCompiler.compile(instructionConsumer, domain.domain, domain.redirect);
}
domainMetadataCompiler.compile(ret, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList()));
return ret;
domainMetadataCompiler.compile(instructionConsumer, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList()));
}
}

View File

@ -8,10 +8,11 @@ import nu.marginalia.model.EdgeDomain;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
public class LinksCompiler {
public void compile(List<Instruction> ret, EdgeDomain from, List<ProcessedDocument> documents) {
public void compile(Consumer<Instruction> instructionConsumer, EdgeDomain from, List<ProcessedDocument> documents) {
DomainLink[] links = documents.stream().map(doc -> doc.details)
.filter(Objects::nonNull)
@ -21,6 +22,6 @@ public class LinksCompiler {
.map(domain -> new DomainLink(from, domain))
.toArray(DomainLink[]::new);
ret.add(new LoadDomainLink(links));
instructionConsumer.accept(new LoadDomainLink(links));
}
}

View File

@ -8,12 +8,13 @@ import nu.marginalia.converting.instruction.instructions.LoadDomainRedirect;
import nu.marginalia.model.EdgeDomain;
import java.util.List;
import java.util.function.Consumer;
public class RedirectCompiler {
public void compile(List<Instruction> ret, EdgeDomain from, EdgeDomain to) {
ret.add(new LoadDomain(to));
ret.add(new LoadDomainLink(new DomainLink(from, to)));
ret.add(new LoadDomainRedirect(new DomainLink(from, to)));
public void compile(Consumer<Instruction> instructionConsumer, EdgeDomain from, EdgeDomain to) {
instructionConsumer.accept(new LoadDomain(to));
instructionConsumer.accept(new LoadDomainLink(new DomainLink(from, to)));
instructionConsumer.accept(new LoadDomainRedirect(new DomainLink(from, to)));
}
}

View File

@ -13,13 +13,14 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
public class UrlsCompiler {
private static final int MAX_INTERNAL_LINKS = 25;
private final Logger logger = LoggerFactory.getLogger(getClass());
public void compile(List<Instruction> ret, List<ProcessedDocument> documents) {
public void compile(Consumer<Instruction> instructionConsumer, List<ProcessedDocument> documents) {
Set<EdgeUrl> seenUrls = new HashSet<>(documents.size()*4);
Set<EdgeDomain> seenDomains = new HashSet<>(documents.size());
@ -53,8 +54,8 @@ public class UrlsCompiler {
}
}
ret.add(new LoadDomain(seenDomains.toArray(EdgeDomain[]::new)));
ret.add(new LoadUrl(seenUrls.toArray(EdgeUrl[]::new)));
instructionConsumer.accept(new LoadDomain(seenDomains.toArray(EdgeDomain[]::new)));
instructionConsumer.accept(new LoadUrl(seenUrls.toArray(EdgeUrl[]::new)));
}
}

View File

@ -119,6 +119,7 @@ public class ProcessService {
}
opts.put("WMSA_HOME", WMSA_HOME);
opts.put("JAVA_HOME", System.getenv("JAVA_HOME"));
opts.put("JAVA_OPTS", "");
opts.put("CONVERTER_OPTS", System.getenv("CONVERTER_OPTS"));
opts.put("LOADER_OPTS", System.getenv("LOADER_OPTS"));
opts.put("CRAWLER_OPTS", System.getenv("CRAWLER_OPTS"));