(WIP) Make it possible to sideload encyclopedia data.

This is mostly a pilot track for sideloading other large websites.

Also change coverter to produce a more compact output (java serialization instead of json).
This commit is contained in:
Viktor Lofgren 2023-07-28 18:14:43 +02:00
parent 9288d311d4
commit f11103d31d
22 changed files with 618 additions and 143 deletions

View File

@ -0,0 +1,6 @@
package nu.marginalia.mqapi.converting;
public enum ConvertAction {
ConvertCrawlData,
SideloadEncyclopedia
}

View File

@ -5,6 +5,8 @@ import nu.marginalia.db.storage.model.FileStorageId;
@AllArgsConstructor
public class ConvertRequest {
public final ConvertAction action;
public final String inputSource;
public final FileStorageId crawlStorage;
public final FileStorageId processedDataStorage;
}

View File

@ -79,6 +79,7 @@ dependencies {
implementation libs.crawlercommons
implementation libs.commons.lang3
implementation libs.sqlite
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit

View File

@ -5,24 +5,26 @@ import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.mqapi.converting.ConvertAction;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
import plan.CrawlPlan;
import nu.marginalia.converting.compiler.InstructionsCompiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.processor.DomainProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
@ -34,7 +36,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX;
public class ConverterMain {
private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class);
private final DomainProcessor processor;
private final InstructionsCompiler compiler;
@ -42,10 +43,9 @@ public class ConverterMain {
private final ProcessHeartbeat heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final SideloadSourceFactory sideloadSourceFactory;
public static void main(String... args) throws Exception {
Injector injector = Guice.createInjector(
new ConverterModule(),
new DatabaseModule()
@ -55,15 +55,9 @@ public class ConverterMain {
logger.info("Starting pipe");
var request = converter.fetchInstructions();
try {
converter.convert(request);
request.ok();
}
catch (Exception ex) {
logger.error("Conversion failed", ex);
request.err();
}
converter
.fetchInstructions()
.execute(converter);
logger.info("Finished");
@ -77,21 +71,42 @@ public class ConverterMain {
Gson gson,
ProcessHeartbeat heartbeat,
MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService
) {
FileStorageService fileStorageService,
SideloadSourceFactory sideloadSourceFactory
)
{
this.processor = processor;
this.compiler = compiler;
this.gson = gson;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.sideloadSourceFactory = sideloadSourceFactory;
heartbeat.start();
}
public void convert(ConvertRequest request) throws Exception {
public void convert(SideloadSource sideloadSource, Path writeDir) throws Exception {
int maxPoolSize = 16;
var plan = request.getPlan();
try (WorkLog workLog = new WorkLog(writeDir.resolve("processor.log"));
ConversionLog conversionLog = new ConversionLog(writeDir)) {
var instructionWriter = new InstructionWriterFactory(conversionLog, writeDir, gson);
final String where;
final int size;
try (var writer = instructionWriter.createInstructionsForDomainWriter(sideloadSource.getId())) {
compiler.compileStreaming(sideloadSource, writer::accept);
where = writer.getFileName();
size = writer.getSize();
}
workLog.setJobToFinished(sideloadSource.getId(), where, size);
}
}
public void convert(CrawlPlan plan) throws Exception {
final int maxPoolSize = 16;
@ -146,29 +161,19 @@ public class ConverterMain {
do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
request.ok();
}
catch (Exception e) {
request.err();
throw e;
}
}
private static class ConvertRequest {
private final CrawlPlan plan;
private abstract static class ConvertRequest {
private final MqMessage message;
private final MqSingleShotInbox inbox;
ConvertRequest(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) {
this.plan = plan;
private ConvertRequest(MqMessage message, MqSingleShotInbox inbox) {
this.message = message;
this.inbox = inbox;
}
public CrawlPlan getPlan() {
return plan;
}
public abstract void execute(ConverterMain converterMain) throws Exception;
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
@ -176,9 +181,55 @@ public class ConverterMain {
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private static class SideloadAction extends ConvertRequest {
private final SideloadSource sideloadSource;
private final Path workDir;
SideloadAction(SideloadSource sideloadSource,
Path workDir,
MqMessage message, MqSingleShotInbox inbox) {
super(message, inbox);
this.sideloadSource = sideloadSource;
this.workDir = workDir;
}
@Override
public void execute(ConverterMain converterMain) throws Exception {
try {
converterMain.convert(sideloadSource, workDir);
ok();
}
catch (Exception ex) {
logger.error("Error sideloading", ex);
err();
}
}
}
private static class ConvertCrawlDataAction extends ConvertRequest {
private final CrawlPlan plan;
private ConvertCrawlDataAction(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) {
super(message, inbox);
this.plan = plan;
}
@Override
public void execute(ConverterMain converterMain) throws Exception {
try {
converterMain.convert(plan);
ok();
}
catch (Exception ex) {
err();
}
}
}
private ConvertRequest fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, UUID.randomUUID());
@ -188,6 +239,8 @@ public class ConverterMain {
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class);
if (request.action == ConvertAction.ConvertCrawlData) {
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var processData = fileStorageService.getStorage(request.processedDataStorage);
@ -195,7 +248,21 @@ public class ConverterMain {
new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"),
new CrawlPlan.WorkDir(processData.path(), "processor.log"));
return new ConvertRequest(plan, msg, inbox);
return new ConvertCrawlDataAction(plan, msg, inbox);
}
if (request.action == ConvertAction.SideloadEncyclopedia) {
var processData = fileStorageService.getStorage(request.processedDataStorage);
var filePath = Path.of(request.inputSource);
return new SideloadAction(sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(filePath),
processData.asPath(),
msg, inbox);
}
else {
throw new RuntimeException("Unknown action: " + request.action);
}
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {

View File

@ -42,7 +42,7 @@ public class InstructionWriterFactory {
}
public class InstructionWriter implements AutoCloseable {
private final OutputStreamWriter outputStream;
private final ObjectOutputStream outputStream;
private final String where;
private final SummarizingInterpreter summary = new SummarizingInterpreter();
@ -52,7 +52,7 @@ public class InstructionWriterFactory {
InstructionWriter(Path filename) throws IOException {
where = filename.getFileName().toString();
Files.deleteIfExists(filename);
outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(filename.toFile()))));
outputStream = new ObjectOutputStream(new ZstdOutputStream(new FileOutputStream(filename.toFile())));
}
public void accept(Instruction instruction) {
@ -64,10 +64,12 @@ public class InstructionWriterFactory {
size++;
try {
outputStream.append(instruction.tag().name());
outputStream.append(' ');
gson.toJson(instruction, outputStream);
outputStream.append('\n');
outputStream.writeObject(instruction);
// Reset the stream to avoid keeping references to the objects
// (as this will cause the memory usage to grow indefinitely when
// writing huge amounts of data)
outputStream.reset();
}
catch (IOException ex) {
logger.warn("IO exception writing instruction", ex);

View File

@ -23,7 +23,7 @@ public class DocumentsCompiler {
}
private void compileDocumentDetails(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
public void compileDocumentDetails(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
var details = doc.details;
if (details != null) {
@ -31,7 +31,7 @@ public class DocumentsCompiler {
}
}
private void compileWords(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
public void compileWords(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
var words = doc.words;
if (words != null) {

View File

@ -40,4 +40,8 @@ public class DomainMetadataCompiler {
instructionConsumer.accept(new LoadDomainMetadata(domain, knownUrls.size(), goodUrls, visitedUrls));
}
public void compileFake(Consumer<Instruction> instructionConsumer, EdgeDomain domain, int countAll, int countGood) {
instructionConsumer.accept(new LoadDomainMetadata(domain, countAll, countGood, countAll));
}
}

View File

@ -3,11 +3,15 @@ package nu.marginalia.converting.compiler;
import com.google.inject.Inject;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDomain;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Iterator;
import java.util.function.Consumer;
import static java.util.Objects.requireNonNullElse;
@ -20,6 +24,8 @@ public class InstructionsCompiler {
private final LinksCompiler linksCompiler;
private final RedirectCompiler redirectCompiler;
private final Logger logger = LoggerFactory.getLogger(InstructionsCompiler.class);
@Inject
public InstructionsCompiler(UrlsCompiler urlsCompiler,
DocumentsCompiler documentsCompiler,
@ -53,4 +59,35 @@ public class InstructionsCompiler {
domainMetadataCompiler.compile(instructionConsumer, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList()));
}
public void compileStreaming(SideloadSource sideloadSource,
Consumer<Instruction> instructionConsumer) {
ProcessedDomain domain = sideloadSource.getDomain();
Iterator<EdgeUrl> urlsIterator = sideloadSource.getUrlsIterator();
Iterator<ProcessedDocument> documentsIterator = sideloadSource.getDocumentsStream();
// Guaranteed to always be first
instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));
int countAll = 0;
int countGood = 0;
logger.info("Writing domains");
urlsCompiler.compileJustDomain(instructionConsumer, domain.domain);
logger.info("Writing urls");
urlsCompiler.compileJustUrls(instructionConsumer, urlsIterator);
logger.info("Writing docs");
while (documentsIterator.hasNext()) {
var doc = documentsIterator.next();
countAll++;
if (doc.isOk()) countGood++;
documentsCompiler.compileDocumentDetails(instructionConsumer, doc);
documentsCompiler.compileWords(instructionConsumer, doc);
}
domainMetadataCompiler.compileFake(instructionConsumer, domain.domain, countAll, countGood);
}
}

View File

@ -9,10 +9,7 @@ import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.function.Consumer;
public class UrlsCompiler {
@ -58,4 +55,20 @@ public class UrlsCompiler {
instructionConsumer.accept(new LoadUrl(seenUrls.toArray(EdgeUrl[]::new)));
}
public void compileJustUrls(Consumer<Instruction> instructionConsumer, Iterator<EdgeUrl> urlsIterator) {
var urls = new ArrayList<EdgeUrl>(1000);
while (urlsIterator.hasNext()) {
if (urls.size() >= 1000) {
instructionConsumer.accept(new LoadUrl(urls.toArray(EdgeUrl[]::new)));
urls.clear();
}
urls.add(urlsIterator.next());
}
}
public void compileJustDomain(Consumer<Instruction> instructionConsumer, EdgeDomain domain) {
instructionConsumer.accept(new LoadDomain(domain));
}
}

View File

@ -28,7 +28,8 @@ public class DomainProcessor {
@Inject
public DomainProcessor(DocumentProcessor documentProcessor,
SiteWords siteWords,
LshDocumentDeduplicator documentDeduplicator) {
LshDocumentDeduplicator documentDeduplicator)
{
this.documentProcessor = documentProcessor;
this.siteWords = siteWords;
this.documentDeduplicator = documentDeduplicator;

View File

@ -0,0 +1,247 @@
package nu.marginalia.converting.sideload;
import com.github.luben.zstd.ZstdInputStream;
import com.google.gson.Gson;
import lombok.SneakyThrows;
import nu.marginalia.converting.model.DisqualifiedException;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.processor.plugin.HtmlDocumentProcessorPlugin;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawl.DomainIndexingState;
import nu.marginalia.model.crawl.UrlIndexingState;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.sql.*;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
/** This is an experimental sideloader for encyclopedia.marginalia.nu's database;
* (which serves as a way of loading wikipedia's zim files without binding to GPL2'd code)
*
* See https://github.com/MarginaliaSearch/encyclopedia.marginalia.nu for extracting the data
*/
public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoCloseable {
private final Connection connection;
private final Gson gson;
private final HtmlDocumentProcessorPlugin htmlProcessorPlugin;
public EncyclopediaMarginaliaNuSideloader(Path pathToDbFile,
Gson gson,
HtmlDocumentProcessorPlugin htmlProcessorPlugin) throws SQLException {
this.gson = gson;
this.htmlProcessorPlugin = htmlProcessorPlugin;
String sqliteDbString = "jdbc:sqlite:" + pathToDbFile.toString();
connection = DriverManager.getConnection(sqliteDbString);
}
@Override
public ProcessedDomain getDomain() {
var ret = new ProcessedDomain();
ret.domain = new EdgeDomain("encyclopedia.marginalia.nu");
ret.id = "encyclopedia.marginalia.nu";
ret.ip = "127.0.0.1";
ret.state = DomainIndexingState.ACTIVE;
return ret;
}
@Override
@SneakyThrows
public Iterator<EdgeUrl> getUrlsIterator() {
EdgeUrl base = new EdgeUrl("https://encyclopedia.marginalia.nu/");
return new SqlQueryIterator<>(connection.prepareStatement("""
SELECT url, html FROM articles
"""))
{
@Override
public EdgeUrl convert(ResultSet rs) throws Exception {
var path = URLEncoder.encode(rs.getString("url"), StandardCharsets.UTF_8);
return base.withPathAndParam("/article/"+path, null);
}
};
}
@SneakyThrows
@Override
public Iterator<ProcessedDocument> getDocumentsStream() {
LinkedBlockingQueue<ProcessedDocument> docs = new LinkedBlockingQueue<>(32);
AtomicBoolean isFinished = new AtomicBoolean(false);
ExecutorService executorService = Executors.newFixedThreadPool(16);
Semaphore sem = new Semaphore(16);
executorService.submit(() -> {
try {
var stmt = connection.prepareStatement("""
SELECT url,title,html FROM articles
""");
stmt.setFetchSize(100);
var rs = stmt.executeQuery();
while (rs.next()) {
var articleParts = fromCompressedJson(rs.getBytes("html"), ArticleParts.class);
String title = rs.getString("title");
String url = rs.getString("url");
sem.acquire();
executorService.submit(() -> {
try {
docs.add(convertDocument(articleParts.parts, title, url));
} catch (URISyntaxException | DisqualifiedException e) {
e.printStackTrace();
} finally {
sem.release();
}
});
}
stmt.close();
}
catch (Exception e) {
e.printStackTrace();
}
finally {
isFinished.set(true);
}
});
return new Iterator<>() {
@Override
public boolean hasNext() {
return !isFinished.get() || !docs.isEmpty() || sem.availablePermits() < 16;
}
@SneakyThrows
@Override
public ProcessedDocument next() {
return docs.take();
}
};
}
private ProcessedDocument convertDocument(List<String> parts, String title, String url) throws URISyntaxException, DisqualifiedException {
String fullUrl = "https://encyclopedia.marginalia.nu/article/"+url;
StringBuilder fullHtml = new StringBuilder();
fullHtml.append("<!DOCTYPE html><html><head><title>").append(title).append("</title></head><body>");
for (String part : parts) {
fullHtml.append("<p>");
fullHtml.append(part);
fullHtml.append("</p>");
}
fullHtml.append("</body></html>");
var crawledDoc = new CrawledDocument(
"encyclopedia.marginalia.nu",
fullUrl,
"text/html",
LocalDateTime.now().toString(),
200,
"OK",
"NP",
"",
fullHtml.toString(),
Integer.toHexString(fullHtml.hashCode()),
fullUrl,
"",
"SIDELOAD"
);
var ret = new ProcessedDocument();
try {
var details = htmlProcessorPlugin.createDetails(crawledDoc);
ret.words = details.words();
ret.details = details.details();
ret.url = new EdgeUrl(fullUrl);
ret.state = UrlIndexingState.OK;
ret.stateReason = "SIDELOAD";
}
catch (Exception e) {
ret.url = new EdgeUrl(fullUrl);
ret.state = UrlIndexingState.DISQUALIFIED;
ret.stateReason = "SIDELOAD";
}
return ret;
}
private <T> T fromCompressedJson(byte[] stream, Class<T> type) throws IOException {
return gson.fromJson(new InputStreamReader(new ZstdInputStream(new ByteArrayInputStream(stream))), type);
}
private record ArticleParts(List<String> parts) {}
@Override
public String getId() {
return "encyclopedia.marginalia.nu";
}
@Override
public void close() throws Exception {
connection.close();
}
private abstract static class SqlQueryIterator<T> implements Iterator<T> {
PreparedStatement stmt;
ResultSet rs;
T next = null;
public SqlQueryIterator(PreparedStatement stmt) throws SQLException {
this.stmt = stmt;
stmt.setFetchSize(1000);
rs = stmt.executeQuery();
}
@SneakyThrows
@Override
public boolean hasNext() {
if (next != null) {
return true;
}
if (!rs.next()) {
stmt.close();
return false;
}
next = convert(rs);
return true;
}
public abstract T convert(ResultSet rs) throws Exception;
@Override
public T next () {
if (!hasNext())
throw new IllegalStateException("No next element");
var ret = next;
next = null;
return ret;
}
}
}

View File

@ -0,0 +1,15 @@
package nu.marginalia.converting.sideload;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.model.EdgeUrl;
import java.util.Iterator;
public interface SideloadSource {
ProcessedDomain getDomain();
Iterator<EdgeUrl> getUrlsIterator();
Iterator<ProcessedDocument> getDocumentsStream();
String getId();
}

View File

@ -0,0 +1,23 @@
package nu.marginalia.converting.sideload;
import com.google.gson.Gson;
import com.google.inject.Inject;
import nu.marginalia.converting.processor.plugin.HtmlDocumentProcessorPlugin;
import java.nio.file.Path;
import java.sql.SQLException;
public class SideloadSourceFactory {
private final Gson gson;
private final HtmlDocumentProcessorPlugin htmlProcessorPlugin;
@Inject
public SideloadSourceFactory(Gson gson, HtmlDocumentProcessorPlugin htmlProcessorPlugin) {
this.gson = gson;
this.htmlProcessorPlugin = htmlProcessorPlugin;
}
public SideloadSource sideloadEncyclopediaMarginaliaNu(Path pathToDbFile) throws SQLException {
return new EncyclopediaMarginaliaNuSideloader(pathToDbFile, gson, htmlProcessorPlugin);
}
}

View File

@ -2,10 +2,8 @@ package nu.marginalia.loading;
import com.github.luben.zstd.ZstdInputStream;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;
import lombok.SneakyThrows;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -13,6 +11,7 @@ import javax.inject.Inject;
import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class ConvertedDomainReader {
@ -27,30 +26,48 @@ public class ConvertedDomainReader {
public List<Instruction> read(Path path, int cntHint) throws IOException {
List<Instruction> ret = new ArrayList<>(cntHint);
try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new BufferedInputStream(new FileInputStream(path.toFile())))))) {
String line;
for (;;) {
line = br.readLine();
if (line == null) {
break;
}
if (line.isBlank()) {
continue;
}
var parts= line.split(" ", 2);
var type = InstructionTag.valueOf(parts[0]).clazz;
try {
ret.add(gson.fromJson(parts[1], type));
}
catch (NullPointerException|JsonParseException ex) {
logger.warn("Failed to deserialize {} {}", type.getSimpleName(), StringUtils.abbreviate(parts[1], 255));
logger.warn("Json error", ex);
}
try (var or = new ObjectInputStream(new ZstdInputStream(new FileInputStream(path.toFile())))) {
var object = or.readObject();
if (object instanceof Instruction is) {
ret.add(is);
}
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
return ret;
}
public Iterator<Instruction> createIterator(Path path) throws IOException {
var or = new ObjectInputStream(new ZstdInputStream(new BufferedInputStream(new FileInputStream(path.toFile()))));
return new Iterator<>() {
Instruction next;
@SneakyThrows
@Override
public boolean hasNext() {
if (next != null)
return true;
try {
next = (Instruction) or.readObject();
return true;
}
catch (java.io.EOFException ex) {
or.close();
return false;
}
}
@Override
public Instruction next() {
if (next != null || hasNext()) {
var ret = next;
next = null;
return ret;
}
throw new IllegalStateException();
}
};
}
}

View File

@ -23,7 +23,7 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
@ -55,9 +55,14 @@ public class LoaderMain {
);
var instance = injector.getInstance(LoaderMain.class);
try {
var instructions = instance.fetchInstructions();
instance.run(instructions);
}
catch (Exception ex) {
logger.error("Error running loader", ex);
}
}
@Inject
public LoaderMain(ConvertedDomainReader instructionsReader,
@ -101,7 +106,19 @@ public class LoaderMain {
for (var entry : WorkLog.iterable(logFile)) {
heartbeat.setProgress(loaded++ / (double) loadTotal);
load(plan, entry.path(), entry.cnt());
var loader = loaderFactory.create(entry.cnt());
Path destDir = plan.getProcessedFilePath(entry.path());
var instructionsIter = instructionsReader.createIterator(destDir);
while (instructionsIter.hasNext()) {
var next = instructionsIter.next();
try {
next.apply(loader);
}
catch (Exception ex) {
logger.error("Failed to load instruction {}", next);
}
}
}
running = false;
@ -110,6 +127,7 @@ public class LoaderMain {
// This needs to be done in order to have a readable index journal
indexLoadKeywords.close();
logger.info("Loading finished");
}
catch (Exception ex) {
logger.error("Failed to load", ex);
@ -119,6 +137,7 @@ public class LoaderMain {
finally {
heartbeat.shutDown();
}
System.exit(0);
}
@ -128,7 +147,7 @@ public class LoaderMain {
Path destDir = plan.getProcessedFilePath(path);
try {
var loader = loaderFactory.create(cnt);
var instructions = instructionsReader.read(destDir, cnt);
var instructions = instructionsReader.createIterator(destDir);
processQueue.put(new LoadJob(path, loader, instructions));
} catch (Exception e) {
logger.error("Failed to load " + destDir, e);
@ -137,15 +156,16 @@ public class LoaderMain {
static final TaskStats taskStats = new TaskStats(100);
private record LoadJob(String path, Loader loader, List<Instruction> instructionList) {
private record LoadJob(String path, Loader loader, Iterator<Instruction> instructionIterator) {
public void run() {
long startTime = System.currentTimeMillis();
for (var i : instructionList) {
while (instructionIterator.hasNext()) {
var next = instructionIterator.next();
try {
i.apply(loader);
next.apply(loader);
}
catch (Exception ex) {
logger.error("Failed to load instruction {}", i);
logger.error("Failed to load instruction {}", next);
}
}

View File

@ -86,40 +86,24 @@ public class Loader implements Interpreter {
@Override
public void loadProcessedDocument(LoadProcessedDocument document) {
deferralCheck(document.url());
processedDocumentList.add(document);
if (processedDocumentList.size() > 100) {
sqlLoadProcessedDocument.load(data, processedDocumentList);
processedDocumentList.clear();
}
}
@Override
public void loadProcessedDocumentWithError(LoadProcessedDocumentWithError document) {
deferralCheck(document.url());
processedDocumentWithErrorList.add(document);
if (processedDocumentWithErrorList.size() > 100) {
sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
processedDocumentWithErrorList.clear();
}
private void deferralCheck(EdgeUrl url) {
if (data.getDomainId(url.domain) <= 0)
deferredDomains.add(url.domain);
if (data.getUrlId(url) <= 0)
deferredUrls.add(url);
}
@Override
public void loadKeywords(EdgeUrl url, DocumentMetadata metadata, DocumentKeywords words) {
// This is a bit of a bandaid safeguard against a bug in
// in the converter, shouldn't be necessary in the future
if (!deferredDomains.isEmpty()) {
loadDomain(deferredDomains.toArray(EdgeDomain[]::new));
deferredDomains.clear();
}
if (!deferredUrls.isEmpty()) {
loadUrl(deferredUrls.toArray(EdgeUrl[]::new));
deferredUrls.clear();
}
try {
indexLoadKeywords.load(data, url, metadata, words);
} catch (InterruptedException e) {
@ -140,8 +124,12 @@ public class Loader implements Interpreter {
public void finish() {
// Some work needs to be processed out of order for the database relations to work out
if (processedDocumentList.size() > 0) {
sqlLoadProcessedDocument.load(data, processedDocumentList);
}
if (processedDocumentWithErrorList.size() > 0) {
sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
}
}
}

View File

@ -73,7 +73,7 @@ public class SqlLoadProcessedDocument {
int urlId = data.getUrlId(doc.url());
if (urlId <= 0) {
logger.warn("Failed to resolve ID for URL {}", doc.url());
return;
continue;
}
stmt.setInt(1, urlId);

View File

@ -14,12 +14,14 @@ import java.sql.SQLException;
public class SqlLoadProcessedDomain {
private final HikariDataSource dataSource;
private final SqlLoadDomains loadDomains;
private final SqlLoadUrls loadUrls;
private static final Logger logger = LoggerFactory.getLogger(SqlLoadProcessedDomain.class);
@Inject
public SqlLoadProcessedDomain(HikariDataSource dataSource, SqlLoadDomains loadDomains) {
public SqlLoadProcessedDomain(HikariDataSource dataSource, SqlLoadDomains loadDomains, SqlLoadUrls loadUrls) {
this.dataSource = dataSource;
this.loadDomains = loadDomains;
this.loadUrls = loadUrls;
try (var conn = dataSource.getConnection()) {
@ -34,7 +36,7 @@ public class SqlLoadProcessedDomain {
BEGIN
DELETE FROM DOMAIN_METADATA WHERE ID=DID;
DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=DID;
DELETE FROM EC_URL WHERE DOMAIN_ID = DID;
DELETE FROM EC_PAGE_DATA WHERE ID IN (SELECT ID FROM EC_URL WHERE DOMAIN_ID = DID);
UPDATE EC_DOMAIN SET INDEX_DATE=NOW(), STATE=ST, DOMAIN_ALIAS=NULL, INDEXED=GREATEST(INDEXED,IDX), IP=IP WHERE ID=DID;
DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=DID;
END
@ -47,6 +49,7 @@ public class SqlLoadProcessedDomain {
}
public void load(LoaderData data, EdgeDomain domain, DomainIndexingState state, String ip) {
data.setTargetDomain(domain);
loadDomains.load(data, domain);
@ -63,6 +66,8 @@ public class SqlLoadProcessedDomain {
if (rc < 1) {
logger.warn("load({},{}) -- bad rowcount {}", domain, state, rc);
}
loadUrls.loadUrlsForDomain(data, domain, 0);
}
catch (SQLException ex) {
logger.warn("SQL error initializing domain", ex);

View File

@ -30,23 +30,35 @@ public class SqlLoadUrls {
public void load(LoaderData data, EdgeUrl[] urls) {
Set<EdgeDomain> affectedDomains = new HashSet<>();
if (urls.length == 0)
return;
int maxOldId = 0;
try (var conn = dataSource.getConnection();
var insertCall = conn.prepareStatement("INSERT IGNORE INTO EC_URL (PROTO,DOMAIN_ID,PORT,PATH,PARAM,PATH_HASH) VALUES (?,?,?,?,?,?)");
var queryCall = conn.prepareStatement("SELECT ID, PROTO, PATH, PARAM FROM EC_URL WHERE DOMAIN_ID=?")
)
var queryMaxId = conn.prepareStatement("SELECT MAX(ID) FROM EC_URL"))
{
conn.setAutoCommit(false);
var rs = queryMaxId.executeQuery();
if (rs.next()) {
maxOldId = rs.getInt(1);
}
int cnt = 0; int batchOffset = 0;
for (var url : urls) {
if (data.getUrlId(url) != 0)
continue;
if (url.path.length() >= 255) {
logger.debug("Skipping bad URL {}", url);
logger.info("Skipping bad URL {}", url);
continue;
}
var domainId = data.getDomainId(url.domain);
affectedDomains.add(url.domain);
insertCall.setString(1, url.proto);
insertCall.setInt(2, data.getDomainId(url.domain));
insertCall.setInt(2, domainId);
if (url.port != null) {
insertCall.setInt(3, url.port);
}
@ -58,10 +70,8 @@ public class SqlLoadUrls {
insertCall.setLong(6, hashPath(url.path, url.param));
insertCall.addBatch();
if (cnt++ == 1000) {
if (++cnt == 1000) {
var ret = insertCall.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]);
@ -72,10 +82,9 @@ public class SqlLoadUrls {
cnt = 0;
}
}
if (cnt > 0) {
var ret = insertCall.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]);
@ -83,25 +92,13 @@ public class SqlLoadUrls {
}
}
conn.commit();
conn.setAutoCommit(true);
for (var domain : affectedDomains) {
queryCall.setInt(1, data.getDomainId(domain));
var rsp = queryCall.executeQuery();
rsp.setFetchSize(1000);
while (rsp.next()) {
int urlId = rsp.getInt(1);
String proto = rsp.getString(2);
String path = rsp.getString(3);
String param = rsp.getString(4);
data.addUrl(new EdgeUrl(proto, domain, null, path, param), urlId);
loadUrlsForDomain(data, domain, maxOldId);
}
}
}
catch (SQLException ex) {
logger.warn("SQL error inserting URLs", ex);
@ -121,4 +118,27 @@ public class SqlLoadUrls {
return pathHash + murmur3_128.hashString(queryParam, StandardCharsets.UTF_8).padToLong();
}
}
/** Loads urlIDs for the domain into `data` from the database, starting at URL ID minId. */
public void loadUrlsForDomain(LoaderData data, EdgeDomain domain, int minId) throws SQLException {
try (var conn = dataSource.getConnection();
var queryCall = conn.prepareStatement("SELECT ID, PROTO, PATH, PARAM FROM EC_URL WHERE DOMAIN_ID=? AND ID > ?")) {
queryCall.setInt(1, data.getDomainId(domain));
queryCall.setInt(2, minId);
var rsp = queryCall.executeQuery();
rsp.setFetchSize(1000);
while (rsp.next()) {
int urlId = rsp.getInt(1);
String proto = rsp.getString(2);
String path = rsp.getString(3);
String param = rsp.getString(4);
data.addUrl(new EdgeUrl(proto, domain, null, path, param), urlId);
}
}
}
}

View File

@ -5,6 +5,7 @@ import nu.marginalia.loading.loader.LoaderData;
import nu.marginalia.loading.loader.SqlLoadDomains;
import nu.marginalia.loading.loader.SqlLoadProcessedDomain;
import nu.marginalia.converting.instruction.instructions.DomainLink;
import nu.marginalia.loading.loader.SqlLoadUrls;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.crawl.DomainIndexingState;
import org.junit.jupiter.api.AfterEach;
@ -50,18 +51,18 @@ class SqlLoadProcessedDomainTest {
@Test
public void loadProcessedDomain() {
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource));
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource));
loader.load(loaderData, new EdgeDomain("www.marginalia.nu"), DomainIndexingState.BLOCKED, "127.0.0.1");
}
@Test
public void loadProcessedDomainTwice() {
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource));
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource));
loader.load(loaderData, new EdgeDomain("www.marginalia.nu"), DomainIndexingState.BLOCKED, "127.0.0.1");
}
@Test
public void loadProcessedDomaiWithExtremelyLongIP() {
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource));
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource));
String ip = Stream.generate(() -> "127.").limit(1024).collect(Collectors.joining());
@ -70,7 +71,7 @@ class SqlLoadProcessedDomainTest {
@Test
public void loadDomainAlias() {
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource));
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource));
loader.loadAlias(loaderData, new DomainLink(new EdgeDomain("memex.marginalia.nu"), new EdgeDomain("www.marginalia.nu")));
}
}

View File

@ -10,6 +10,7 @@ import nu.marginalia.control.svc.ProcessOutboxFactory;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.mqapi.converting.ConvertAction;
import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.mqapi.loading.LoadRequest;
import nu.marginalia.db.storage.FileStorageService;
@ -121,7 +122,10 @@ public class ReconvertAndLoadActor extends AbstractStateGraph {
storageService.relateFileStorages(toProcess.id(), processedArea.id());
// Pre-send convert request
var request = new ConvertRequest(message.crawlStorageId, processedArea.id());
var request = new ConvertRequest(ConvertAction.ConvertCrawlData,
null,
message.crawlStorageId,
processedArea.id());
long id = mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));
return message

View File

@ -175,6 +175,8 @@ dependencyResolutionManagement {
library('handlebars','com.github.jknack','handlebars').version('4.3.1')
library('handlebars.markdown','com.github.jknack','handlebars-markdown').version('4.2.1')
library('sqlite','org.xerial','sqlite-jdbc').version('3.41.2.1')
bundle('slf4j', ['slf4j.api', 'log4j.api', 'log4j.core', 'log4j.slf4j'])
bundle('slf4j.test', ['slf4j.jdk14'])
bundle('prometheus', ['prometheus', 'prometheus-servlet', 'prometheus-server', 'prometheus-hotspot'])