(loader) Fix bug where trailing deferred domain meta inserts weren't executed

This commit is contained in:
Viktor Lofgren 2023-07-31 14:22:24 +02:00
parent d95f01b701
commit 2f8488610a
6 changed files with 69 additions and 140 deletions

View File

@ -118,7 +118,11 @@ public class MqOutbox {
}
/** Blocks until a response arrives for the given message id or the timeout passes */
/** Blocks until a response arrives for the given message id or the timeout passes.
* <p>
* @throws TimeoutException if the timeout passes before a response arrives.
* @throws InterruptedException if the thread is interrupted while waiting.
*/
public MqMessage waitResponse(long id, int timeout, TimeUnit unit) throws TimeoutException, SQLException, InterruptedException {
long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
@ -160,7 +164,9 @@ public class MqOutbox {
public void flagAsBad(long id) throws SQLException {
persistence.updateMessageState(id, MqMessageState.ERR);
}
public void flagAsDead(long id) throws SQLException {
persistence.updateMessageState(id, MqMessageState.DEAD);
}
}

View File

@ -10,18 +10,18 @@ import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
public interface Interpreter {
void loadUrl(EdgeUrl[] url);
void loadDomain(EdgeDomain[] domain);
void loadRssFeed(EdgeUrl[] rssFeed);
void loadDomainLink(DomainLink[] links);
default void loadUrl(EdgeUrl[] url) {}
default void loadDomain(EdgeDomain[] domain) {}
default void loadRssFeed(EdgeUrl[] rssFeed) {}
default void loadDomainLink(DomainLink[] links) {}
void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip);
void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument);
void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError);
default void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) {}
default void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {}
default void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) {}
void loadKeywords(EdgeUrl url, DocumentMetadata metadata, DocumentKeywords words);
default void loadKeywords(EdgeUrl url, DocumentMetadata metadata, DocumentKeywords words) {}
void loadDomainRedirect(DomainLink link);
default void loadDomainRedirect(DomainLink link) {}
void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls);
default void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {}
}

View File

@ -36,35 +36,9 @@ public class ConversionLog implements AutoCloseable, Interpreter {
writer.close();
}
@Override
public void loadUrl(EdgeUrl[] url) {}
@Override
public void loadDomain(EdgeDomain[] domain) {}
@Override
public void loadRssFeed(EdgeUrl[] rssFeed) {}
@Override
public void loadDomainLink(DomainLink[] links) {}
@Override
public void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) {}
@Override
public void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {}
@Override
public synchronized void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) {
writer.printf("%s\t%s\n", loadProcessedDocumentWithError.url(), loadProcessedDocumentWithError.reason());
}
@Override
public void loadKeywords(EdgeUrl url, DocumentMetadata metadata, DocumentKeywords words) {}
@Override
public void loadDomainRedirect(DomainLink link) {}
@Override
public void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {}
}

View File

@ -109,22 +109,16 @@ public class InstructionWriterFactory {
private int ok = 0;
private int error = 0;
int keywords = 0;
int documents = 0;
public String toString() {
// This shouldn't happen (TM)
assert keywords == documents : "keywords != documents";
return String.format("%s - %d %d", domainName, ok, error);
}
@Override
public void loadUrl(EdgeUrl[] url) {}
@Override
public void loadDomain(EdgeDomain[] domain) {}
@Override
public void loadRssFeed(EdgeUrl[] rssFeed) {}
@Override
public void loadDomainLink(DomainLink[] links) {}
@Override
public void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) {
this.domainName = domain.toString();
@ -132,20 +126,14 @@ public class InstructionWriterFactory {
@Override
public void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {
}
@Override
public void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) {
documents++;
}
@Override
public void loadKeywords(EdgeUrl url, DocumentMetadata metadata, DocumentKeywords words) {
keywords++;
}
@Override
public void loadDomainRedirect(DomainLink link) {}
@Override
public void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {
ok += goodUrls;

View File

@ -1,12 +1,18 @@
package nu.marginalia.loading;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import lombok.SneakyThrows;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.loading.loader.IndexLoadKeywords;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
@ -14,19 +20,17 @@ import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.log.WorkLog;
import plan.CrawlPlan;
import nu.marginalia.loading.loader.Loader;
import nu.marginalia.loading.loader.LoaderFactory;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.service.module.DatabaseModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX;
@ -42,9 +46,6 @@ public class LoaderMain {
private final FileStorageService fileStorageService;
private final IndexLoadKeywords indexLoadKeywords;
private final Gson gson;
private volatile boolean running = true;
final Thread processorThread;
public static void main(String... args) throws Exception {
new org.mariadb.jdbc.Driver();
@ -84,9 +85,6 @@ public class LoaderMain {
this.gson = gson;
heartbeat.start();
processorThread = new Thread(this::processor, "Processor Thread");
processorThread.start();
}
@SneakyThrows
@ -94,6 +92,7 @@ public class LoaderMain {
var plan = instructions.getPlan();
var logFile = plan.process.getLogFile();
TaskStats taskStats = new TaskStats(100);
try {
int loadTotal = 0;
int loaded = 0;
@ -102,29 +101,37 @@ public class LoaderMain {
loadTotal++;
}
LoaderMain.loadTotal = loadTotal;
logger.info("Loading {} files", loadTotal);
for (var entry : WorkLog.iterable(logFile)) {
heartbeat.setProgress(loaded++ / (double) loadTotal);
InstructionCounter instructionCounter = new InstructionCounter();
heartbeat.setProgress(loaded++ / (double) loadTotal);
long startTime = System.currentTimeMillis();
var loader = loaderFactory.create(entry.cnt());
Path destDir = plan.getProcessedFilePath(entry.path());
var instructionsIter = instructionsReader.createIterator(destDir);
while (instructionsIter.hasNext()) {
var next = instructionsIter.next();
try {
next.apply(loader);
}
catch (Exception ex) {
logger.error("Failed to load instruction {}", next);
try (var loader = loaderFactory.create(entry.cnt())) {
var instructionsIter = instructionsReader.createIterator(destDir);
while (instructionsIter.hasNext()) {
var next = instructionsIter.next();
try {
next.apply(instructionCounter);
next.apply(loader);
} catch (Exception ex) {
logger.error("Failed to load instruction {}", next);
}
}
}
long endTime = System.currentTimeMillis();
long loadTime = endTime - startTime;
taskStats.observe(endTime - startTime);
logger.info("Loaded {}/{} : {} ({}) {}ms {} l/s", taskStats.getCount(),
loadTotal, destDir, instructionCounter.getCount(), loadTime, taskStats.avgTime());
}
running = false;
processorThread.join();
instructions.ok();
// This needs to be done in order to have a readable index journal
@ -144,59 +151,6 @@ public class LoaderMain {
System.exit(0);
}
private volatile static int loadTotal;
private void load(CrawlPlan plan, String path, int cnt) {
Path destDir = plan.getProcessedFilePath(path);
try {
var loader = loaderFactory.create(cnt);
var instructions = instructionsReader.createIterator(destDir);
processQueue.put(new LoadJob(path, loader, instructions));
} catch (Exception e) {
logger.error("Failed to load " + destDir, e);
}
}
static final TaskStats taskStats = new TaskStats(100);
private record LoadJob(String path, Loader loader, Iterator<Instruction> instructionIterator) {
public void run() {
long startTime = System.currentTimeMillis();
while (instructionIterator.hasNext()) {
var next = instructionIterator.next();
try {
next.apply(loader);
}
catch (Exception ex) {
logger.error("Failed to load instruction {}", next);
}
}
loader.finish();
long loadTime = System.currentTimeMillis() - startTime;
taskStats.observe(loadTime);
logger.info("Loaded {}/{} : {} ({}) {}ms {} l/s", taskStats.getCount(),
loadTotal, path, loader.data.sizeHint, loadTime, taskStats.avgTime());
}
}
private static final LinkedBlockingQueue<LoadJob> processQueue = new LinkedBlockingQueue<>(2);
private void processor() {
try {
while (running || !processQueue.isEmpty()) {
LoadJob job = processQueue.poll(1, TimeUnit.SECONDS);
if (job != null) {
job.run();
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static class LoadRequest {
private final CrawlPlan plan;
private final MqMessage message;
@ -258,4 +212,13 @@ public class LoaderMain {
}
}
public class InstructionCounter implements Interpreter {
private int count = 0;
public void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {
count++;
}
public int getCount() {
return count;
}
}
}

View File

@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class Loader implements Interpreter {
public class Loader implements Interpreter, AutoCloseable {
private final SqlLoadUrls sqlLoadUrls;
private final SqlLoadDomains sqlLoadDomains;
private final SqlLoadDomainLinks sqlLoadDomainLinks;
@ -30,8 +30,6 @@ public class Loader implements Interpreter {
private final List<LoadProcessedDocument> processedDocumentList;
private final List<LoadProcessedDocumentWithError> processedDocumentWithErrorList;
private final List<EdgeDomain> deferredDomains = new ArrayList<>();
private final List<EdgeUrl> deferredUrls = new ArrayList<>();
public final LoaderData data;
@ -87,6 +85,7 @@ public class Loader implements Interpreter {
@Override
public void loadProcessedDocument(LoadProcessedDocument document) {
processedDocumentList.add(document);
if (processedDocumentList.size() > 100) {
sqlLoadProcessedDocument.load(data, processedDocumentList);
processedDocumentList.clear();
@ -96,6 +95,7 @@ public class Loader implements Interpreter {
@Override
public void loadProcessedDocumentWithError(LoadProcessedDocumentWithError document) {
processedDocumentWithErrorList.add(document);
if (processedDocumentWithErrorList.size() > 100) {
sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
processedDocumentWithErrorList.clear();
@ -121,9 +121,7 @@ public class Loader implements Interpreter {
sqlLoadDomainMetadata.load(data, domain, knownUrls, goodUrls, visitedUrls);
}
public void finish() {
// Some work needs to be processed out of order for the database relations to work out
public void close() {
if (processedDocumentList.size() > 0) {
sqlLoadProcessedDocument.load(data, processedDocumentList);
}