diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java index d604a585..3f3362f1 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java @@ -118,7 +118,11 @@ public class MqOutbox { } - /** Blocks until a response arrives for the given message id or the timeout passes */ + /** Blocks until a response arrives for the given message id or the timeout passes. + *

+ * @throws TimeoutException if the timeout passes before a response arrives. + * @throws InterruptedException if the thread is interrupted while waiting. + */ public MqMessage waitResponse(long id, int timeout, TimeUnit unit) throws TimeoutException, SQLException, InterruptedException { long deadline = System.currentTimeMillis() + unit.toMillis(timeout); @@ -160,7 +164,9 @@ public class MqOutbox { public void flagAsBad(long id) throws SQLException { persistence.updateMessageState(id, MqMessageState.ERR); } + public void flagAsDead(long id) throws SQLException { persistence.updateMessageState(id, MqMessageState.DEAD); } + } \ No newline at end of file diff --git a/code/process-models/converting-model/src/main/java/nu/marginalia/converting/instruction/Interpreter.java b/code/process-models/converting-model/src/main/java/nu/marginalia/converting/instruction/Interpreter.java index 4583f31d..248ea38d 100644 --- a/code/process-models/converting-model/src/main/java/nu/marginalia/converting/instruction/Interpreter.java +++ b/code/process-models/converting-model/src/main/java/nu/marginalia/converting/instruction/Interpreter.java @@ -10,18 +10,18 @@ import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument; import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError; public interface Interpreter { - void loadUrl(EdgeUrl[] url); - void loadDomain(EdgeDomain[] domain); - void loadRssFeed(EdgeUrl[] rssFeed); - void loadDomainLink(DomainLink[] links); + default void loadUrl(EdgeUrl[] url) {} + default void loadDomain(EdgeDomain[] domain) {} + default void loadRssFeed(EdgeUrl[] rssFeed) {} + default void loadDomainLink(DomainLink[] links) {} - void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip); - void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument); - void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError); + default void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) {} + default void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {} + default void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) {} - void loadKeywords(EdgeUrl url, DocumentMetadata metadata, DocumentKeywords words); + default void loadKeywords(EdgeUrl url, DocumentMetadata metadata, DocumentKeywords words) {} - void loadDomainRedirect(DomainLink link); + default void loadDomainRedirect(DomainLink link) {} - void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls); + default void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {} } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConversionLog.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConversionLog.java index 2c2ffb95..10c11e21 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConversionLog.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConversionLog.java @@ -36,35 +36,9 @@ public class ConversionLog implements AutoCloseable, Interpreter { writer.close(); } - @Override - public void loadUrl(EdgeUrl[] url) {} - - @Override - public void loadDomain(EdgeDomain[] domain) {} - - @Override - public void loadRssFeed(EdgeUrl[] rssFeed) {} - - @Override - public void loadDomainLink(DomainLink[] links) {} - - @Override - public void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) {} - - @Override - public void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {} - @Override public synchronized void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) { writer.printf("%s\t%s\n", loadProcessedDocumentWithError.url(), loadProcessedDocumentWithError.reason()); } - @Override - public void loadKeywords(EdgeUrl url, DocumentMetadata metadata, DocumentKeywords words) {} - - @Override - public void loadDomainRedirect(DomainLink link) {} - - @Override - public void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {} } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java index fee4fc19..08f842c6 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java @@ -109,22 +109,16 @@ public class InstructionWriterFactory { private int ok = 0; private int error = 0; + int keywords = 0; + int documents = 0; + public String toString() { + // This shouldn't happen (TM) + assert keywords == documents : "keywords != documents"; + return String.format("%s - %d %d", domainName, ok, error); } - @Override - public void loadUrl(EdgeUrl[] url) {} - - @Override - public void loadDomain(EdgeDomain[] domain) {} - - @Override - public void loadRssFeed(EdgeUrl[] rssFeed) {} - - @Override - public void loadDomainLink(DomainLink[] links) {} - @Override public void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) { this.domainName = domain.toString(); @@ -132,20 +126,14 @@ public class InstructionWriterFactory { @Override public void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) { - - } - - @Override - public void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) { + documents++; } @Override public void loadKeywords(EdgeUrl url, DocumentMetadata metadata, DocumentKeywords words) { + keywords++; } - @Override - public void loadDomainRedirect(DomainLink link) {} - @Override public void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) { ok += goodUrls; diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java index 68bcf8c4..21b0b1ec 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java @@ -1,12 +1,18 @@ package nu.marginalia.loading; +import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import lombok.SneakyThrows; +import nu.marginalia.converting.instruction.Interpreter; +import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument; import nu.marginalia.db.storage.FileStorageService; +import nu.marginalia.keyword.model.DocumentKeywords; import nu.marginalia.loading.loader.IndexLoadKeywords; +import nu.marginalia.model.EdgeUrl; +import nu.marginalia.model.idx.DocumentMetadata; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.inbox.MqInboxResponse; @@ -14,19 +20,17 @@ import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.log.WorkLog; import plan.CrawlPlan; -import nu.marginalia.loading.loader.Loader; import nu.marginalia.loading.loader.LoaderFactory; -import nu.marginalia.converting.instruction.Instruction; import nu.marginalia.service.module.DatabaseModule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.file.Path; import java.sql.SQLException; -import java.util.Iterator; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX; @@ -42,9 +46,6 @@ public class LoaderMain { private final FileStorageService fileStorageService; private final IndexLoadKeywords indexLoadKeywords; private final Gson gson; - private volatile boolean running = true; - - final Thread processorThread; public static void main(String... args) throws Exception { new org.mariadb.jdbc.Driver(); @@ -84,9 +85,6 @@ public class LoaderMain { this.gson = gson; heartbeat.start(); - - processorThread = new Thread(this::processor, "Processor Thread"); - processorThread.start(); } @SneakyThrows @@ -94,6 +92,7 @@ public class LoaderMain { var plan = instructions.getPlan(); var logFile = plan.process.getLogFile(); + TaskStats taskStats = new TaskStats(100); try { int loadTotal = 0; int loaded = 0; @@ -102,29 +101,37 @@ public class LoaderMain { loadTotal++; } - LoaderMain.loadTotal = loadTotal; - logger.info("Loading {} files", loadTotal); for (var entry : WorkLog.iterable(logFile)) { - heartbeat.setProgress(loaded++ / (double) loadTotal); + InstructionCounter instructionCounter = new InstructionCounter(); + + heartbeat.setProgress(loaded++ / (double) loadTotal); + long startTime = System.currentTimeMillis(); - var loader = loaderFactory.create(entry.cnt()); Path destDir = plan.getProcessedFilePath(entry.path()); - var instructionsIter = instructionsReader.createIterator(destDir); - while (instructionsIter.hasNext()) { - var next = instructionsIter.next(); - try { - next.apply(loader); - } - catch (Exception ex) { - logger.error("Failed to load instruction {}", next); + try (var loader = loaderFactory.create(entry.cnt())) { + var instructionsIter = instructionsReader.createIterator(destDir); + + while (instructionsIter.hasNext()) { + var next = instructionsIter.next(); + try { + next.apply(instructionCounter); + next.apply(loader); + } catch (Exception ex) { + logger.error("Failed to load instruction {}", next); + } } } + + long endTime = System.currentTimeMillis(); + long loadTime = endTime - startTime; + taskStats.observe(endTime - startTime); + + logger.info("Loaded {}/{} : {} ({}) {}ms {} l/s", taskStats.getCount(), + loadTotal, destDir, instructionCounter.getCount(), loadTime, taskStats.avgTime()); } - running = false; - processorThread.join(); instructions.ok(); // This needs to be done in order to have a readable index journal @@ -144,59 +151,6 @@ public class LoaderMain { System.exit(0); } - private volatile static int loadTotal; - - private void load(CrawlPlan plan, String path, int cnt) { - Path destDir = plan.getProcessedFilePath(path); - try { - var loader = loaderFactory.create(cnt); - var instructions = instructionsReader.createIterator(destDir); - processQueue.put(new LoadJob(path, loader, instructions)); - } catch (Exception e) { - logger.error("Failed to load " + destDir, e); - } - } - - static final TaskStats taskStats = new TaskStats(100); - - private record LoadJob(String path, Loader loader, Iterator instructionIterator) { - public void run() { - long startTime = System.currentTimeMillis(); - while (instructionIterator.hasNext()) { - var next = instructionIterator.next(); - try { - next.apply(loader); - } - catch (Exception ex) { - logger.error("Failed to load instruction {}", next); - } - } - - loader.finish(); - long loadTime = System.currentTimeMillis() - startTime; - taskStats.observe(loadTime); - logger.info("Loaded {}/{} : {} ({}) {}ms {} l/s", taskStats.getCount(), - loadTotal, path, loader.data.sizeHint, loadTime, taskStats.avgTime()); - } - - } - - private static final LinkedBlockingQueue processQueue = new LinkedBlockingQueue<>(2); - - private void processor() { - try { - while (running || !processQueue.isEmpty()) { - LoadJob job = processQueue.poll(1, TimeUnit.SECONDS); - - if (job != null) { - job.run(); - } - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - } private static class LoadRequest { private final CrawlPlan plan; private final MqMessage message; @@ -258,4 +212,13 @@ public class LoaderMain { } } + public class InstructionCounter implements Interpreter { + private int count = 0; + public void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) { + count++; + } + public int getCount() { + return count; + } + } } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java index 96c5a21c..d6f97076 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java @@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -public class Loader implements Interpreter { +public class Loader implements Interpreter, AutoCloseable { private final SqlLoadUrls sqlLoadUrls; private final SqlLoadDomains sqlLoadDomains; private final SqlLoadDomainLinks sqlLoadDomainLinks; @@ -30,8 +30,6 @@ public class Loader implements Interpreter { private final List processedDocumentList; private final List processedDocumentWithErrorList; - private final List deferredDomains = new ArrayList<>(); - private final List deferredUrls = new ArrayList<>(); public final LoaderData data; @@ -87,6 +85,7 @@ public class Loader implements Interpreter { @Override public void loadProcessedDocument(LoadProcessedDocument document) { processedDocumentList.add(document); + if (processedDocumentList.size() > 100) { sqlLoadProcessedDocument.load(data, processedDocumentList); processedDocumentList.clear(); @@ -96,6 +95,7 @@ public class Loader implements Interpreter { @Override public void loadProcessedDocumentWithError(LoadProcessedDocumentWithError document) { processedDocumentWithErrorList.add(document); + if (processedDocumentWithErrorList.size() > 100) { sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList); processedDocumentWithErrorList.clear(); @@ -121,9 +121,7 @@ public class Loader implements Interpreter { sqlLoadDomainMetadata.load(data, domain, knownUrls, goodUrls, visitedUrls); } - public void finish() { - // Some work needs to be processed out of order for the database relations to work out - + public void close() { if (processedDocumentList.size() > 0) { sqlLoadProcessedDocument.load(data, processedDocumentList); }