mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 21:18:58 +00:00
(converting) WIP begin to remove converting-model and the old InstructionsCompiler
This commit is contained in:
parent
24b4606f96
commit
4799dd769e
@ -1,49 +1,3 @@
|
|||||||
# Converting Models
|
# Converting Models
|
||||||
|
|
||||||
Contains models shared by the [converting-process](../../processes/converting-process/) and
|
!!To be deleted!!
|
||||||
[loading-process](../../processes/loading-process/).
|
|
||||||
|
|
||||||
## Design
|
|
||||||
|
|
||||||
The two processes communicate through a file-based protocol. The converter serializes [instructions](src/main/java/nu/marginalia/converting/instruction/Instruction.java)
|
|
||||||
to file, which are deserialized by the loader and fed into an [instructions](src/main/java/nu/marginalia/converting/instruction/Interpreter.java).
|
|
||||||
|
|
||||||
The instructions implement a visitor pattern.
|
|
||||||
|
|
||||||
Conceptually the pattern can be thought of a bit like remote function calls over file,
|
|
||||||
or a crude instructions-based programming language.
|
|
||||||
|
|
||||||
This
|
|
||||||
|
|
||||||
```java
|
|
||||||
producer.foo("cat");
|
|
||||||
producer.bar("milk", "eggs", "bread");
|
|
||||||
```
|
|
||||||
|
|
||||||
translates through this paradigm, to this:
|
|
||||||
|
|
||||||
```
|
|
||||||
(producer)
|
|
||||||
writeInstruction(DoFoo("Cat"))
|
|
||||||
writeInstruction(DoBar("Milk", "Eggs", "Bread"))
|
|
||||||
|
|
||||||
(consumer)
|
|
||||||
while read instruction:
|
|
||||||
interpreter.apply(instruction)
|
|
||||||
|
|
||||||
(Interpreter)
|
|
||||||
doFoo(animal):
|
|
||||||
...
|
|
||||||
doBar(ingredients):
|
|
||||||
...
|
|
||||||
|
|
||||||
(doFoo)
|
|
||||||
DoFoo(animal):
|
|
||||||
apply(interpreter):
|
|
||||||
interpreter.foo(animal)
|
|
||||||
|
|
||||||
(doBar)
|
|
||||||
DoBar(ingredients):
|
|
||||||
apply(interpreter):
|
|
||||||
interpreter.bar(ingredients)
|
|
||||||
```
|
|
@ -1,10 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
public interface Instruction extends Serializable {
|
|
||||||
void apply(Interpreter interpreter);
|
|
||||||
boolean isNoOp();
|
|
||||||
|
|
||||||
InstructionTag tag();
|
|
||||||
}
|
|
@ -1,25 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.instructions.*;
|
|
||||||
|
|
||||||
public enum InstructionTag {
|
|
||||||
|
|
||||||
DOMAIN(LoadDomain.class),
|
|
||||||
LINK(LoadDomainLink.class),
|
|
||||||
REDIRECT(LoadDomainRedirect.class),
|
|
||||||
WORDS(LoadKeywords.class),
|
|
||||||
PROC_DOCUMENT(LoadProcessedDocument.class),
|
|
||||||
PROC_DOCUMENT_ERR(LoadProcessedDocumentWithError.class),
|
|
||||||
PROC_DOMAIN(LoadProcessedDomain.class),
|
|
||||||
|
|
||||||
DOMAIN_METADATA(LoadDomainMetadata.class),
|
|
||||||
|
|
||||||
RSS(LoadRssFeed.class);
|
|
||||||
|
|
||||||
public final Class<? extends Instruction> clazz;
|
|
||||||
|
|
||||||
InstructionTag(Class<? extends Instruction> clazz) {
|
|
||||||
this.clazz = clazz;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,26 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction;
|
|
||||||
|
|
||||||
import nu.marginalia.keyword.model.DocumentKeywords;
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
|
||||||
import nu.marginalia.model.EdgeUrl;
|
|
||||||
import nu.marginalia.model.crawl.DomainIndexingState;
|
|
||||||
import nu.marginalia.model.idx.DocumentMetadata;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.DomainLink;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
|
|
||||||
|
|
||||||
public interface Interpreter {
|
|
||||||
default void loadDomain(EdgeDomain[] domain) {}
|
|
||||||
default void loadRssFeed(EdgeUrl[] rssFeed) {}
|
|
||||||
default void loadDomainLink(DomainLink[] links) {}
|
|
||||||
|
|
||||||
default void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) {}
|
|
||||||
default void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {}
|
|
||||||
default void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) {}
|
|
||||||
|
|
||||||
default void loadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) {}
|
|
||||||
|
|
||||||
default void loadDomainRedirect(DomainLink link) {}
|
|
||||||
|
|
||||||
default void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {}
|
|
||||||
}
|
|
@ -1,8 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction.instructions;
|
|
||||||
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
public record DomainLink(EdgeDomain from, EdgeDomain to) implements Serializable {
|
|
||||||
}
|
|
@ -1,31 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction.instructions;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.InstructionTag;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
public record LoadDomain(EdgeDomain... domain) implements Instruction {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void apply(Interpreter interpreter) {
|
|
||||||
interpreter.loadDomain(domain);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isNoOp() {
|
|
||||||
return domain.length == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InstructionTag tag() {
|
|
||||||
return InstructionTag.DOMAIN;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return getClass().getSimpleName()+"["+Arrays.toString(domain)+"]";
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,31 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction.instructions;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.InstructionTag;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
public record LoadDomainLink(DomainLink... links) implements Instruction {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void apply(Interpreter interpreter) {
|
|
||||||
interpreter.loadDomainLink(links);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return getClass().getSimpleName()+"["+ Arrays.toString(links)+"]";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InstructionTag tag() {
|
|
||||||
return InstructionTag.LINK;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isNoOp() {
|
|
||||||
return links.length == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,28 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction.instructions;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.InstructionTag;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
|
||||||
import nu.marginalia.model.EdgeUrl;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
public record LoadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) implements Instruction {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void apply(Interpreter interpreter) {
|
|
||||||
interpreter.loadDomainMetadata(domain, knownUrls, goodUrls, visitedUrls);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isNoOp() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InstructionTag tag() {
|
|
||||||
return InstructionTag.DOMAIN_METADATA;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,29 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction.instructions;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.InstructionTag;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
|
|
||||||
public record LoadDomainRedirect(DomainLink links) implements Instruction {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void apply(Interpreter interpreter) {
|
|
||||||
interpreter.loadDomainRedirect(links);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return getClass().getSimpleName()+"["+ links+"]";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InstructionTag tag() {
|
|
||||||
return InstructionTag.REDIRECT;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isNoOp() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,32 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction.instructions;
|
|
||||||
|
|
||||||
import nu.marginalia.keyword.model.DocumentKeywords;
|
|
||||||
import nu.marginalia.model.idx.DocumentMetadata;
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.InstructionTag;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
import nu.marginalia.model.EdgeUrl;
|
|
||||||
|
|
||||||
public record LoadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) implements Instruction {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void apply(Interpreter interpreter) {
|
|
||||||
interpreter.loadKeywords(url, ordinal, features, metadata, words);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isNoOp() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InstructionTag tag() {
|
|
||||||
return InstructionTag.WORDS;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return getClass().getSimpleName()+"["+ words+"]";
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,37 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction.instructions;
|
|
||||||
|
|
||||||
import nu.marginalia.model.crawl.UrlIndexingState;
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.InstructionTag;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
import nu.marginalia.model.EdgeUrl;
|
|
||||||
import org.jetbrains.annotations.Nullable;
|
|
||||||
|
|
||||||
|
|
||||||
public record LoadProcessedDocument(EdgeUrl url,
|
|
||||||
int ordinal, UrlIndexingState state,
|
|
||||||
String title,
|
|
||||||
String description,
|
|
||||||
int htmlFeatures,
|
|
||||||
String standard,
|
|
||||||
int length,
|
|
||||||
long hash,
|
|
||||||
double quality,
|
|
||||||
@Nullable Integer pubYear
|
|
||||||
) implements Instruction
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void apply(Interpreter interpreter) {
|
|
||||||
interpreter.loadProcessedDocument(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InstructionTag tag() {
|
|
||||||
return InstructionTag.PROC_DOCUMENT;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isNoOp() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,29 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction.instructions;
|
|
||||||
|
|
||||||
import nu.marginalia.model.crawl.UrlIndexingState;
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.InstructionTag;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
import nu.marginalia.model.EdgeUrl;
|
|
||||||
|
|
||||||
|
|
||||||
public record LoadProcessedDocumentWithError(EdgeUrl url,
|
|
||||||
UrlIndexingState state,
|
|
||||||
String reason,
|
|
||||||
int ordinal) implements Instruction
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void apply(Interpreter interpreter) {
|
|
||||||
interpreter.loadProcessedDocumentWithError(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InstructionTag tag() {
|
|
||||||
return InstructionTag.PROC_DOCUMENT_ERR;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isNoOp() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,26 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction.instructions;
|
|
||||||
|
|
||||||
import nu.marginalia.model.crawl.DomainIndexingState;
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.InstructionTag;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
|
||||||
|
|
||||||
public record LoadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) implements Instruction {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void apply(Interpreter interpreter) {
|
|
||||||
interpreter.loadProcessedDomain(domain, state, ip);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InstructionTag tag() {
|
|
||||||
return InstructionTag.PROC_DOMAIN;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isNoOp() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,32 +0,0 @@
|
|||||||
package nu.marginalia.converting.instruction.instructions;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.InstructionTag;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
import nu.marginalia.model.EdgeUrl;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
public record LoadRssFeed(EdgeUrl... feeds) implements Instruction {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void apply(Interpreter interpreter) {
|
|
||||||
interpreter.loadRssFeed(feeds);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return getClass().getSimpleName()+"["+ Arrays.toString(feeds)+"]";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InstructionTag tag() {
|
|
||||||
return InstructionTag.RSS;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isNoOp() {
|
|
||||||
return feeds.length == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,37 +0,0 @@
|
|||||||
package nu.marginalia.converting;
|
|
||||||
|
|
||||||
import com.github.luben.zstd.RecyclingBufferPool;
|
|
||||||
import com.github.luben.zstd.ZstdOutputStream;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
|
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.PrintWriter;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.nio.file.StandardOpenOption;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.time.ZoneOffset;
|
|
||||||
|
|
||||||
public class ConversionLog implements AutoCloseable, Interpreter {
|
|
||||||
private final PrintWriter writer;
|
|
||||||
|
|
||||||
public ConversionLog(Path rootDir) throws IOException {
|
|
||||||
String fileName = String.format("conversion-log-%s.zstd", LocalDateTime.now().toEpochSecond(ZoneOffset.UTC));
|
|
||||||
Path logFile = rootDir.resolve(fileName);
|
|
||||||
|
|
||||||
writer = new PrintWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(logFile, StandardOpenOption.WRITE, StandardOpenOption.CREATE)), RecyclingBufferPool.INSTANCE));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
writer.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) {
|
|
||||||
writer.printf("%s\t%s\n", loadProcessedDocumentWithError.url(), loadProcessedDocumentWithError.reason());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -20,14 +20,11 @@ import nu.marginalia.process.log.WorkLog;
|
|||||||
import nu.marginalia.service.module.DatabaseModule;
|
import nu.marginalia.service.module.DatabaseModule;
|
||||||
import nu.marginalia.worklog.BatchingWorkLog;
|
import nu.marginalia.worklog.BatchingWorkLog;
|
||||||
import nu.marginalia.worklog.BatchingWorkLogImpl;
|
import nu.marginalia.worklog.BatchingWorkLogImpl;
|
||||||
import org.checkerframework.checker.units.qual.C;
|
|
||||||
import plan.CrawlPlan;
|
import plan.CrawlPlan;
|
||||||
import nu.marginalia.converting.compiler.InstructionsCompiler;
|
|
||||||
import nu.marginalia.converting.processor.DomainProcessor;
|
import nu.marginalia.converting.processor.DomainProcessor;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -40,7 +37,6 @@ import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX;
|
|||||||
public class ConverterMain {
|
public class ConverterMain {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class);
|
private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class);
|
||||||
private final DomainProcessor processor;
|
private final DomainProcessor processor;
|
||||||
private final InstructionsCompiler compiler;
|
|
||||||
private final Gson gson;
|
private final Gson gson;
|
||||||
private final ProcessHeartbeat heartbeat;
|
private final ProcessHeartbeat heartbeat;
|
||||||
private final MessageQueueFactory messageQueueFactory;
|
private final MessageQueueFactory messageQueueFactory;
|
||||||
@ -69,7 +65,6 @@ public class ConverterMain {
|
|||||||
@Inject
|
@Inject
|
||||||
public ConverterMain(
|
public ConverterMain(
|
||||||
DomainProcessor processor,
|
DomainProcessor processor,
|
||||||
InstructionsCompiler compiler,
|
|
||||||
Gson gson,
|
Gson gson,
|
||||||
ProcessHeartbeatImpl heartbeat,
|
ProcessHeartbeatImpl heartbeat,
|
||||||
MessageQueueFactory messageQueueFactory,
|
MessageQueueFactory messageQueueFactory,
|
||||||
@ -78,7 +73,6 @@ public class ConverterMain {
|
|||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.processor = processor;
|
this.processor = processor;
|
||||||
this.compiler = compiler;
|
|
||||||
this.gson = gson;
|
this.gson = gson;
|
||||||
this.heartbeat = heartbeat;
|
this.heartbeat = heartbeat;
|
||||||
this.messageQueueFactory = messageQueueFactory;
|
this.messageQueueFactory = messageQueueFactory;
|
||||||
@ -91,21 +85,7 @@ public class ConverterMain {
|
|||||||
public void convert(SideloadSource sideloadSource, Path writeDir) throws Exception {
|
public void convert(SideloadSource sideloadSource, Path writeDir) throws Exception {
|
||||||
int maxPoolSize = 16;
|
int maxPoolSize = 16;
|
||||||
|
|
||||||
try (WorkLog workLog = new WorkLog(writeDir.resolve("processor.log"));
|
// FIXME
|
||||||
ConversionLog conversionLog = new ConversionLog(writeDir)) {
|
|
||||||
var instructionWriter = new InstructionWriterFactory(conversionLog, writeDir, gson);
|
|
||||||
|
|
||||||
final String where;
|
|
||||||
final int size;
|
|
||||||
|
|
||||||
try (var writer = instructionWriter.createInstructionsForDomainWriter(sideloadSource.getId())) {
|
|
||||||
compiler.compileStreaming(sideloadSource, writer::accept);
|
|
||||||
where = writer.getFileName();
|
|
||||||
size = writer.getSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
workLog.setJobToFinished(sideloadSource.getId(), where, size);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void convert(CrawlPlan plan) throws Exception {
|
public void convert(CrawlPlan plan) throws Exception {
|
||||||
@ -115,10 +95,8 @@ public class ConverterMain {
|
|||||||
|
|
||||||
|
|
||||||
try (BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(plan.process.getLogFile());
|
try (BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(plan.process.getLogFile());
|
||||||
ConverterWriter converterWriter = new ConverterWriter(batchingWorkLog, plan.process.getDir());
|
ConverterWriter converterWriter = new ConverterWriter(batchingWorkLog, plan.process.getDir()))
|
||||||
ConversionLog log = new ConversionLog(plan.process.getDir())) {
|
{
|
||||||
var instructionWriter = new InstructionWriterFactory(log, plan.process.getDir(), gson);
|
|
||||||
|
|
||||||
var pool = new DumbThreadPool(maxPoolSize, 2);
|
var pool = new DumbThreadPool(maxPoolSize, 2);
|
||||||
|
|
||||||
int totalDomains = plan.countCrawledDomains();
|
int totalDomains = plan.countCrawledDomains();
|
||||||
@ -132,9 +110,7 @@ public class ConverterMain {
|
|||||||
{
|
{
|
||||||
pool.submit(() -> {
|
pool.submit(() -> {
|
||||||
ProcessedDomain processed = processor.process(domain);
|
ProcessedDomain processed = processor.process(domain);
|
||||||
|
|
||||||
converterWriter.accept(processed);
|
converterWriter.accept(processed);
|
||||||
|
|
||||||
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
|
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1,141 +0,0 @@
|
|||||||
package nu.marginalia.converting;
|
|
||||||
|
|
||||||
import com.github.luben.zstd.ZstdOutputStream;
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
import nu.marginalia.model.crawl.DomainIndexingState;
|
|
||||||
import nu.marginalia.model.idx.DocumentMetadata;
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.Interpreter;
|
|
||||||
import nu.marginalia.keyword.model.DocumentKeywords;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
|
||||||
import nu.marginalia.model.EdgeUrl;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.*;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
|
|
||||||
public class InstructionWriterFactory {
|
|
||||||
|
|
||||||
private final ConversionLog log;
|
|
||||||
private final Path outputDir;
|
|
||||||
private final Gson gson;
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(InstructionWriterFactory.class);
|
|
||||||
|
|
||||||
public InstructionWriterFactory(ConversionLog log, Path outputDir, Gson gson) {
|
|
||||||
this.log = log;
|
|
||||||
this.outputDir = outputDir;
|
|
||||||
this.gson = gson;
|
|
||||||
|
|
||||||
if (!Files.isDirectory(outputDir)) {
|
|
||||||
throw new IllegalArgumentException("Output dir " + outputDir + " does not exist");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public InstructionWriter createInstructionsForDomainWriter(String id) throws IOException {
|
|
||||||
Path outputFile = getOutputFile(id);
|
|
||||||
return new InstructionWriter(outputFile);
|
|
||||||
}
|
|
||||||
|
|
||||||
public class InstructionWriter implements AutoCloseable {
|
|
||||||
private final ObjectOutputStream outputStream;
|
|
||||||
private final String where;
|
|
||||||
private final SummarizingInterpreter summary = new SummarizingInterpreter();
|
|
||||||
|
|
||||||
private int size = 0;
|
|
||||||
|
|
||||||
|
|
||||||
InstructionWriter(Path filename) throws IOException {
|
|
||||||
where = filename.getFileName().toString();
|
|
||||||
Files.deleteIfExists(filename);
|
|
||||||
outputStream = new ObjectOutputStream(new ZstdOutputStream(new FileOutputStream(filename.toFile())));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void accept(Instruction instruction) {
|
|
||||||
if (instruction.isNoOp()) return;
|
|
||||||
|
|
||||||
instruction.apply(summary);
|
|
||||||
instruction.apply(log);
|
|
||||||
|
|
||||||
size++;
|
|
||||||
|
|
||||||
try {
|
|
||||||
outputStream.writeObject(instruction);
|
|
||||||
|
|
||||||
// Reset the stream to avoid keeping references to the objects
|
|
||||||
// (as this will cause the memory usage to grow indefinitely when
|
|
||||||
// writing huge amounts of data)
|
|
||||||
outputStream.reset();
|
|
||||||
}
|
|
||||||
catch (IOException ex) {
|
|
||||||
logger.warn("IO exception writing instruction", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
logger.info("Wrote {} - {} - {}", where, size, summary);
|
|
||||||
outputStream.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getFileName() {
|
|
||||||
return where;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getSize() {
|
|
||||||
return size;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Path getOutputFile(String id) throws IOException {
|
|
||||||
String first = id.substring(0, 2);
|
|
||||||
String second = id.substring(2, 4);
|
|
||||||
|
|
||||||
Path destDir = outputDir.resolve(first).resolve(second);
|
|
||||||
if (!Files.exists(destDir)) {
|
|
||||||
Files.createDirectories(destDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
return destDir.resolve(id + ".pzstd");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class SummarizingInterpreter implements Interpreter {
|
|
||||||
|
|
||||||
private String domainName;
|
|
||||||
private int ok = 0;
|
|
||||||
private int error = 0;
|
|
||||||
|
|
||||||
int keywords = 0;
|
|
||||||
int documents = 0;
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
// This shouldn't happen (TM)
|
|
||||||
assert keywords == documents : "keywords != documents";
|
|
||||||
|
|
||||||
return String.format("%s - %d %d", domainName, ok, error);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) {
|
|
||||||
this.domainName = domain.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {
|
|
||||||
documents++;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void loadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) {
|
|
||||||
keywords++;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {
|
|
||||||
ok += goodUrls;
|
|
||||||
error += visitedUrls - goodUrls;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,59 +0,0 @@
|
|||||||
package nu.marginalia.converting.compiler;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadKeywords;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
|
|
||||||
import nu.marginalia.converting.model.ProcessedDocument;
|
|
||||||
import nu.marginalia.model.crawl.HtmlFeature;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
public class DocumentsCompiler {
|
|
||||||
|
|
||||||
public void compileDocumentDetails(Consumer<Instruction> instructionConsumer,
|
|
||||||
ProcessedDocument doc,
|
|
||||||
int ordinal) {
|
|
||||||
var details = doc.details;
|
|
||||||
|
|
||||||
if (details != null) {
|
|
||||||
instructionConsumer.accept(new LoadProcessedDocument(doc.url,
|
|
||||||
ordinal,
|
|
||||||
doc.state,
|
|
||||||
details.title,
|
|
||||||
details.description,
|
|
||||||
HtmlFeature.encode(details.features),
|
|
||||||
details.standard.name(),
|
|
||||||
details.length,
|
|
||||||
details.hashCode,
|
|
||||||
details.quality,
|
|
||||||
details.pubYear
|
|
||||||
));
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
instructionConsumer.accept(new LoadProcessedDocumentWithError(
|
|
||||||
doc.url,
|
|
||||||
doc.state,
|
|
||||||
doc.stateReason,
|
|
||||||
ordinal
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void compileWords(Consumer<Instruction> instructionConsumer,
|
|
||||||
ProcessedDocument doc,
|
|
||||||
int ordinal) {
|
|
||||||
var words = doc.words;
|
|
||||||
|
|
||||||
if (words != null) {
|
|
||||||
instructionConsumer.accept(new LoadKeywords(doc.url,
|
|
||||||
ordinal,
|
|
||||||
HtmlFeature.encode(doc.details.features),
|
|
||||||
doc.details.metadata,
|
|
||||||
words.build())
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,47 +0,0 @@
|
|||||||
package nu.marginalia.converting.compiler;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadDomainMetadata;
|
|
||||||
import nu.marginalia.converting.model.ProcessedDocument;
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
|
||||||
import nu.marginalia.model.EdgeUrl;
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
|
||||||
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
public class DomainMetadataCompiler {
|
|
||||||
|
|
||||||
|
|
||||||
public void compile(Consumer<Instruction> instructionConsumer, EdgeDomain domain, @NotNull List<ProcessedDocument> documents) {
|
|
||||||
|
|
||||||
int visitedUrls = 0;
|
|
||||||
int goodUrls = 0;
|
|
||||||
|
|
||||||
Set<EdgeUrl> knownUrls = new HashSet<>(documents.size() * 2);
|
|
||||||
|
|
||||||
for (var doc : documents) {
|
|
||||||
visitedUrls++;
|
|
||||||
|
|
||||||
if (doc.isOk()) {
|
|
||||||
goodUrls++;
|
|
||||||
}
|
|
||||||
|
|
||||||
knownUrls.add(doc.url);
|
|
||||||
|
|
||||||
Optional.ofNullable(doc.details)
|
|
||||||
.map(details -> details.linksInternal)
|
|
||||||
.ifPresent(knownUrls::addAll);
|
|
||||||
}
|
|
||||||
|
|
||||||
instructionConsumer.accept(new LoadDomainMetadata(domain, knownUrls.size(), goodUrls, visitedUrls));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void compileFake(Consumer<Instruction> instructionConsumer, EdgeDomain domain, int countAll, int countGood) {
|
|
||||||
instructionConsumer.accept(new LoadDomainMetadata(domain, countAll, countGood, countAll));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,24 +0,0 @@
|
|||||||
package nu.marginalia.converting.compiler;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadRssFeed;
|
|
||||||
import nu.marginalia.converting.model.ProcessedDocument;
|
|
||||||
import nu.marginalia.model.EdgeUrl;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
public class FeedsCompiler {
|
|
||||||
|
|
||||||
public void compile(Consumer<Instruction> instructionConsumer, List<ProcessedDocument> documents) {
|
|
||||||
|
|
||||||
EdgeUrl[] feeds = documents.stream().map(doc -> doc.details)
|
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.flatMap(dets -> dets.feedLinks.stream())
|
|
||||||
.distinct()
|
|
||||||
.toArray(EdgeUrl[]::new);
|
|
||||||
|
|
||||||
instructionConsumer.accept(new LoadRssFeed(feeds));
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,88 +0,0 @@
|
|||||||
package nu.marginalia.converting.compiler;
|
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadProcessedDomain;
|
|
||||||
import nu.marginalia.converting.model.ProcessedDocument;
|
|
||||||
import nu.marginalia.converting.model.ProcessedDomain;
|
|
||||||
import nu.marginalia.converting.sideload.SideloadSource;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
import static java.util.Objects.requireNonNullElse;
|
|
||||||
|
|
||||||
public class InstructionsCompiler {
|
|
||||||
private final DocumentsCompiler documentsCompiler;
|
|
||||||
private final DomainMetadataCompiler domainMetadataCompiler;
|
|
||||||
private final FeedsCompiler feedsCompiler;
|
|
||||||
private final LinksCompiler linksCompiler;
|
|
||||||
private final RedirectCompiler redirectCompiler;
|
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(InstructionsCompiler.class);
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
public InstructionsCompiler(DocumentsCompiler documentsCompiler,
|
|
||||||
DomainMetadataCompiler domainMetadataCompiler,
|
|
||||||
FeedsCompiler feedsCompiler,
|
|
||||||
LinksCompiler linksCompiler,
|
|
||||||
RedirectCompiler redirectCompiler)
|
|
||||||
{
|
|
||||||
this.documentsCompiler = documentsCompiler;
|
|
||||||
this.domainMetadataCompiler = domainMetadataCompiler;
|
|
||||||
this.feedsCompiler = feedsCompiler;
|
|
||||||
this.linksCompiler = linksCompiler;
|
|
||||||
this.redirectCompiler = redirectCompiler;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void compile(ProcessedDomain domain, Consumer<Instruction> instructionConsumer) {
|
|
||||||
// Guaranteed to always be first
|
|
||||||
instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));
|
|
||||||
|
|
||||||
if (domain.documents != null) {
|
|
||||||
|
|
||||||
int ordinal = 0;
|
|
||||||
for (var doc : domain.documents) {
|
|
||||||
documentsCompiler.compileDocumentDetails(instructionConsumer, doc, ordinal);
|
|
||||||
documentsCompiler.compileWords(instructionConsumer, doc, ordinal);
|
|
||||||
ordinal++;
|
|
||||||
}
|
|
||||||
|
|
||||||
feedsCompiler.compile(instructionConsumer, domain.documents);
|
|
||||||
linksCompiler.compile(instructionConsumer, domain.domain, domain.documents);
|
|
||||||
}
|
|
||||||
if (domain.redirect != null) {
|
|
||||||
redirectCompiler.compile(instructionConsumer, domain.domain, domain.redirect);
|
|
||||||
}
|
|
||||||
|
|
||||||
domainMetadataCompiler.compile(instructionConsumer, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList()));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void compileStreaming(SideloadSource sideloadSource,
|
|
||||||
Consumer<Instruction> instructionConsumer) {
|
|
||||||
ProcessedDomain domain = sideloadSource.getDomain();
|
|
||||||
Iterator<ProcessedDocument> documentsIterator = sideloadSource.getDocumentsStream();
|
|
||||||
|
|
||||||
// Guaranteed to always be first
|
|
||||||
instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));
|
|
||||||
|
|
||||||
int countAll = 0;
|
|
||||||
int countGood = 0;
|
|
||||||
|
|
||||||
logger.info("Writing docs");
|
|
||||||
|
|
||||||
while (documentsIterator.hasNext()) {
|
|
||||||
var doc = documentsIterator.next();
|
|
||||||
countAll++;
|
|
||||||
if (doc.isOk()) countGood++;
|
|
||||||
|
|
||||||
documentsCompiler.compileDocumentDetails(instructionConsumer, doc, countAll);
|
|
||||||
documentsCompiler.compileWords(instructionConsumer, doc, countAll);
|
|
||||||
}
|
|
||||||
|
|
||||||
domainMetadataCompiler.compileFake(instructionConsumer, domain.domain, countAll, countGood);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,35 +0,0 @@
|
|||||||
package nu.marginalia.converting.compiler;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.DomainLink;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadDomain;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadDomainLink;
|
|
||||||
import nu.marginalia.converting.model.ProcessedDocument;
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
public class LinksCompiler {
|
|
||||||
|
|
||||||
public void compile(Consumer<Instruction> instructionConsumer,
|
|
||||||
EdgeDomain from,
|
|
||||||
List<ProcessedDocument> documents) {
|
|
||||||
|
|
||||||
EdgeDomain[] domains = documents.stream()
|
|
||||||
.map(doc -> doc.details)
|
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.flatMap(details -> details.linksExternal.stream())
|
|
||||||
.map(link -> link.domain)
|
|
||||||
.distinct()
|
|
||||||
.toArray(EdgeDomain[]::new);
|
|
||||||
|
|
||||||
DomainLink[] links = new DomainLink[domains.length];
|
|
||||||
Arrays.setAll(links, i -> new DomainLink(from, domains[i]));
|
|
||||||
|
|
||||||
instructionConsumer.accept(new LoadDomain(domains));
|
|
||||||
instructionConsumer.accept(new LoadDomainLink(links));
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,20 +0,0 @@
|
|||||||
package nu.marginalia.converting.compiler;
|
|
||||||
|
|
||||||
import nu.marginalia.converting.instruction.Instruction;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.DomainLink;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadDomain;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadDomainLink;
|
|
||||||
import nu.marginalia.converting.instruction.instructions.LoadDomainRedirect;
|
|
||||||
import nu.marginalia.model.EdgeDomain;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
public class RedirectCompiler {
|
|
||||||
|
|
||||||
public void compile(Consumer<Instruction> instructionConsumer, EdgeDomain from, EdgeDomain to) {
|
|
||||||
instructionConsumer.accept(new LoadDomain(to));
|
|
||||||
instructionConsumer.accept(new LoadDomainLink(new DomainLink(from, to)));
|
|
||||||
instructionConsumer.accept(new LoadDomainRedirect(new DomainLink(from, to)));
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user