(loader) Implement new linkdb in loader

Deprecate the LoadUrl instruction entirely. We no longer need to be told upfront about which URLs to expect, as IDs are generated from the domain id and document ordinal.

For now, we no longer store new URLs in different domains.  We need to re-implement this somehow, probably in a different job or a as a different output.
This commit is contained in:
Viktor Lofgren 2023-08-24 11:55:58 +02:00
parent c70670bacb
commit 6a04cdfddf
38 changed files with 341 additions and 777 deletions

View File

@ -0,0 +1,63 @@
package nu.marginalia.linkdb;
import nu.marginalia.linkdb.model.UrlStatus;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
public class LinkdbStatusWriter {
private final Connection connection;
public LinkdbStatusWriter(Path outputFile) throws SQLException {
String connStr = "jdbc:sqlite:" + outputFile.toString();
connection = DriverManager.getConnection(connStr);
try (var stream = ClassLoader.getSystemResourceAsStream("db/linkdb-status.sql");
var stmt = connection.createStatement()
) {
var sql = new String(stream.readAllBytes());
stmt.executeUpdate(sql);
// Disable synchronous writing as this is a one-off operation with no recovery
stmt.execute("PRAGMA synchronous = OFF");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void add(List<UrlStatus> statuses) throws SQLException {
try (var stmt = connection.prepareStatement("""
INSERT INTO STATUS(ID, URL, STATUS, DESCRIPTION)
VALUES (?, ?, ?, ?)
""")) {
int count = 0;
for (var status : statuses) {
stmt.setLong(1, status.id());
stmt.setString(2, status.url().toString());
stmt.setString(3, status.status());
if (status.description() == null) {
stmt.setNull(4, Types.VARCHAR);
} else {
stmt.setString(4, status.description());
}
stmt.addBatch();
if (++count > 100) {
stmt.executeBatch();
}
}
if (count != 0) {
stmt.executeBatch();
}
}
}
public void close() throws SQLException {
connection.close();
}
}

View File

@ -1,7 +1,6 @@
package nu.marginalia.linkdb;
import nu.marginalia.linkdb.model.UrlDetail;
import nu.marginalia.linkdb.model.UrlProtocol;
import java.io.IOException;
import java.nio.file.Path;
@ -19,7 +18,7 @@ public class LinkdbWriter {
String connStr = "jdbc:sqlite:" + outputFile.toString();
connection = DriverManager.getConnection(connStr);
try (var stream = ClassLoader.getSystemResourceAsStream("db/linkdb.sql");
try (var stream = ClassLoader.getSystemResourceAsStream("db/linkdb-document.sql");
var stmt = connection.createStatement()
) {
var sql = new String(stream.readAllBytes());

View File

@ -0,0 +1,8 @@
package nu.marginalia.linkdb.model;
import nu.marginalia.model.EdgeUrl;
import javax.annotation.Nullable;
public record UrlStatus(long id, EdgeUrl url, String status, @Nullable String description) {
}

View File

@ -1,5 +1,5 @@
CREATE TABLE DOCUMENT (
ID LONG PRIMARY KEY,
ID INT8 PRIMARY KEY,
URL TEXT,

View File

@ -0,0 +1,6 @@
CREATE TABLE STATUS (
ID INT8 PRIMARY KEY,
URL TEXT,
STATUS TEXT NOT NULL,
DESCRIPTION TEXT
);

View File

@ -0,0 +1,33 @@
package nu.marginalia.linkdb;
import nu.marginalia.linkdb.model.UrlStatus;
import nu.marginalia.model.EdgeUrl;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
public class LinkdbStatusWriterTest {
@Test
public void testCreate() throws IOException {
Path tempPath = Files.createTempFile("linkdb-status", ".db");
try {
var writer = new LinkdbStatusWriter(tempPath);
writer.add(List.of(
new UrlStatus(5, new EdgeUrl("https://www.marginalia.nu/x"), "y", null),
new UrlStatus(6, new EdgeUrl("https://www.marginalia.nu/y"), "y", "z")
));
writer.close();
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
} finally {
Files.deleteIfExists(tempPath);
}
}
}

View File

@ -5,6 +5,7 @@ import nu.marginalia.index.journal.model.IndexJournalEntry;
import nu.marginalia.index.journal.writer.IndexJournalWriterImpl;
import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
@ -84,8 +85,9 @@ class ForwardIndexConverterTest {
}
long createId(long url, long domain) {
return (domain << 32) | url;
return UrlIdCodec.encodeId((int) domain, (int) url);
}
public void createEntry(IndexJournalWriter writer, KeywordLexicon keywordLexicon, int id) {
int[] factors = getFactorsI(id);

View File

@ -3,6 +3,7 @@ package nu.marginalia.index.journal.model;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.EdgeId;
import nu.marginalia.model.id.UrlIdCodec;
public record IndexJournalEntry(IndexJournalEntryHeader header, IndexJournalEntryData data) {
@ -15,18 +16,7 @@ public record IndexJournalEntry(IndexJournalEntryHeader header, IndexJournalEntr
long documentMeta) {
return builder(new EdgeId<>(domainId),
new EdgeId<>(urlId),
documentMeta);
return builder(UrlIdCodec.encodeId(domainId, urlId), documentMeta);
}
public static IndexJournalEntryBuilder builder(EdgeId<EdgeDomain> domainId,
EdgeId<EdgeUrl> urlId,
long documentMeta) {
return new IndexJournalEntryBuilder(0,
IndexJournalEntryHeader.combineIds(domainId, urlId),
documentMeta);
}
}

View File

@ -1,29 +1,17 @@
package nu.marginalia.index.journal.model;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.EdgeId;
public record IndexJournalEntryHeader(int entrySize,
int documentFeatures,
long combinedId,
long documentMeta) {
public IndexJournalEntryHeader(EdgeId<EdgeDomain> domainId,
public IndexJournalEntryHeader(long combinedId,
int documentFeatures,
EdgeId<EdgeUrl> urlId,
long documentMeta) {
this(-1,
documentFeatures,
combineIds(domainId, urlId),
combinedId,
documentMeta);
}
static long combineIds(EdgeId<EdgeDomain> domainId, EdgeId<EdgeUrl> urlId) {
long did = domainId.id();
long uid = urlId.id();
return (did << 32L) | uid;
}
}

View File

@ -5,7 +5,6 @@ import nu.marginalia.converting.instruction.instructions.*;
public enum InstructionTag {
DOMAIN(LoadDomain.class),
URL(LoadUrl.class),
LINK(LoadDomainLink.class),
REDIRECT(LoadDomainRedirect.class),
WORDS(LoadKeywords.class),

View File

@ -10,7 +10,6 @@ import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
public interface Interpreter {
default void loadUrl(EdgeUrl[] url) {}
default void loadDomain(EdgeDomain[] domain) {}
default void loadRssFeed(EdgeUrl[] rssFeed) {}
default void loadDomainLink(DomainLink[] links) {}
@ -19,7 +18,7 @@ public interface Interpreter {
default void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {}
default void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) {}
default void loadKeywords(EdgeUrl url, int features, DocumentMetadata metadata, DocumentKeywords words) {}
default void loadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) {}
default void loadDomainRedirect(DomainLink link) {}

View File

@ -7,11 +7,11 @@ import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.model.EdgeUrl;
public record LoadKeywords(EdgeUrl url, int features, DocumentMetadata metadata, DocumentKeywords words) implements Instruction {
public record LoadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) implements Instruction {
@Override
public void apply(Interpreter interpreter) {
interpreter.loadKeywords(url, features, metadata, words);
interpreter.loadKeywords(url, ordinal, features, metadata, words);
}
@Override

View File

@ -9,7 +9,7 @@ import org.jetbrains.annotations.Nullable;
public record LoadProcessedDocument(EdgeUrl url,
UrlIndexingState state,
int ordinal, UrlIndexingState state,
String title,
String description,
int htmlFeatures,
@ -17,7 +17,8 @@ public record LoadProcessedDocument(EdgeUrl url,
int length,
long hash,
double quality,
@Nullable Integer pubYear) implements Instruction
@Nullable Integer pubYear
) implements Instruction
{
@Override
public void apply(Interpreter interpreter) {

View File

@ -9,7 +9,8 @@ import nu.marginalia.model.EdgeUrl;
public record LoadProcessedDocumentWithError(EdgeUrl url,
UrlIndexingState state,
String reason) implements Instruction
String reason,
int ordinal) implements Instruction
{
@Override
public void apply(Interpreter interpreter) {

View File

@ -1,31 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.model.EdgeUrl;
import java.util.Arrays;
public record LoadUrl(EdgeUrl... url) implements Instruction {
@Override
public void apply(Interpreter interpreter) {
interpreter.loadUrl(url);
}
@Override
public String toString() {
return getClass().getSimpleName()+"["+ Arrays.toString(url)+"]";
}
@Override
public InstructionTag tag() {
return InstructionTag.URL;
}
@Override
public boolean isNoOp() {
return url.length == 0;
}
}

View File

@ -7,9 +7,7 @@ import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.converting.instruction.instructions.DomainLink;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
@ -130,7 +128,7 @@ public class InstructionWriterFactory {
}
@Override
public void loadKeywords(EdgeUrl url, int features, DocumentMetadata metadata, DocumentKeywords words) {
public void loadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) {
keywords++;
}

View File

@ -3,6 +3,7 @@ package nu.marginalia.converting.compiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.LoadKeywords;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.model.crawl.HtmlFeature;
@ -11,32 +12,43 @@ import java.util.function.Consumer;
public class DocumentsCompiler {
public void compile(Consumer<Instruction> instructionConsumer, List<ProcessedDocument> documents) {
for (var doc : documents) {
compileDocumentDetails(instructionConsumer, doc);
}
for (var doc : documents) {
compileWords(instructionConsumer, doc);
}
}
public void compileDocumentDetails(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
public void compileDocumentDetails(Consumer<Instruction> instructionConsumer,
ProcessedDocument doc,
int ordinal) {
var details = doc.details;
if (details != null) {
instructionConsumer.accept(new LoadProcessedDocument(doc.url, doc.state, details.title, details.description, HtmlFeature.encode(details.features), details.standard.name(), details.length, details.hashCode, details.quality, details.pubYear));
instructionConsumer.accept(new LoadProcessedDocument(doc.url,
ordinal,
doc.state,
details.title,
details.description,
HtmlFeature.encode(details.features),
details.standard.name(),
details.length,
details.hashCode,
details.quality,
details.pubYear
));
}
else {
instructionConsumer.accept(new LoadProcessedDocumentWithError(
doc.url,
doc.state,
doc.stateReason,
ordinal
));
}
}
public void compileWords(Consumer<Instruction> instructionConsumer,
ProcessedDocument doc) {
ProcessedDocument doc,
int ordinal) {
var words = doc.words;
if (words != null) {
instructionConsumer.accept(new LoadKeywords(doc.url,
ordinal,
HtmlFeature.encode(doc.details.features),
doc.details.metadata,
words.build())

View File

@ -6,7 +6,6 @@ import nu.marginalia.converting.instruction.instructions.LoadProcessedDomain;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -17,7 +16,6 @@ import java.util.function.Consumer;
import static java.util.Objects.requireNonNullElse;
public class InstructionsCompiler {
private final UrlsCompiler urlsCompiler;
private final DocumentsCompiler documentsCompiler;
private final DomainMetadataCompiler domainMetadataCompiler;
private final FeedsCompiler feedsCompiler;
@ -27,14 +25,12 @@ public class InstructionsCompiler {
private final Logger logger = LoggerFactory.getLogger(InstructionsCompiler.class);
@Inject
public InstructionsCompiler(UrlsCompiler urlsCompiler,
DocumentsCompiler documentsCompiler,
public InstructionsCompiler(DocumentsCompiler documentsCompiler,
DomainMetadataCompiler domainMetadataCompiler,
FeedsCompiler feedsCompiler,
LinksCompiler linksCompiler,
RedirectCompiler redirectCompiler)
{
this.urlsCompiler = urlsCompiler;
this.documentsCompiler = documentsCompiler;
this.domainMetadataCompiler = domainMetadataCompiler;
this.feedsCompiler = feedsCompiler;
@ -47,8 +43,13 @@ public class InstructionsCompiler {
instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));
if (domain.documents != null) {
urlsCompiler.compile(instructionConsumer, domain.documents);
documentsCompiler.compile(instructionConsumer, domain.documents);
int ordinal = 0;
for (var doc : domain.documents) {
documentsCompiler.compileDocumentDetails(instructionConsumer, doc, ordinal);
documentsCompiler.compileWords(instructionConsumer, doc, ordinal);
ordinal++;
}
feedsCompiler.compile(instructionConsumer, domain.documents);
linksCompiler.compile(instructionConsumer, domain.domain, domain.documents);
@ -63,7 +64,6 @@ public class InstructionsCompiler {
public void compileStreaming(SideloadSource sideloadSource,
Consumer<Instruction> instructionConsumer) {
ProcessedDomain domain = sideloadSource.getDomain();
Iterator<EdgeUrl> urlsIterator = sideloadSource.getUrlsIterator();
Iterator<ProcessedDocument> documentsIterator = sideloadSource.getDocumentsStream();
// Guaranteed to always be first
@ -72,11 +72,6 @@ public class InstructionsCompiler {
int countAll = 0;
int countGood = 0;
logger.info("Writing domains");
urlsCompiler.compileJustDomain(instructionConsumer, domain.domain);
logger.info("Writing urls");
urlsCompiler.compileJustUrls(instructionConsumer, urlsIterator);
logger.info("Writing docs");
while (documentsIterator.hasNext()) {
@ -84,8 +79,8 @@ public class InstructionsCompiler {
countAll++;
if (doc.isOk()) countGood++;
documentsCompiler.compileDocumentDetails(instructionConsumer, doc);
documentsCompiler.compileWords(instructionConsumer, doc);
documentsCompiler.compileDocumentDetails(instructionConsumer, doc, countAll);
documentsCompiler.compileWords(instructionConsumer, doc, countAll);
}
domainMetadataCompiler.compileFake(instructionConsumer, domain.domain, countAll, countGood);

View File

@ -2,26 +2,34 @@ package nu.marginalia.converting.compiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.DomainLink;
import nu.marginalia.converting.instruction.instructions.LoadDomain;
import nu.marginalia.converting.instruction.instructions.LoadDomainLink;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.model.EdgeDomain;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
public class LinksCompiler {
public void compile(Consumer<Instruction> instructionConsumer, EdgeDomain from, List<ProcessedDocument> documents) {
public void compile(Consumer<Instruction> instructionConsumer,
EdgeDomain from,
List<ProcessedDocument> documents) {
DomainLink[] links = documents.stream().map(doc -> doc.details)
EdgeDomain[] domains = documents.stream()
.map(doc -> doc.details)
.filter(Objects::nonNull)
.flatMap(dets -> dets.linksExternal.stream())
.flatMap(details -> details.linksExternal.stream())
.map(link -> link.domain)
.distinct()
.map(domain -> new DomainLink(from, domain))
.toArray(DomainLink[]::new);
.toArray(EdgeDomain[]::new);
DomainLink[] links = new DomainLink[domains.length];
Arrays.setAll(links, i -> new DomainLink(from, domains[i]));
instructionConsumer.accept(new LoadDomain(domains));
instructionConsumer.accept(new LoadDomainLink(links));
}
}

View File

@ -1,77 +0,0 @@
package nu.marginalia.converting.compiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.LoadDomain;
import nu.marginalia.converting.instruction.instructions.LoadUrl;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.function.Consumer;
public class UrlsCompiler {
private static final int MAX_INTERNAL_LINKS = 25;
private final Logger logger = LoggerFactory.getLogger(getClass());
public void compile(Consumer<Instruction> instructionConsumer, List<ProcessedDocument> documents) {
Set<EdgeUrl> seenUrls = new HashSet<>(documents.size()*4);
Set<EdgeDomain> seenDomains = new HashSet<>(documents.size());
for (var doc : documents) {
if (doc.url == null) {
logger.warn("Discovered document with null URL");
continue;
}
seenUrls.add(doc.url);
if (doc.details == null) {
continue;
}
// Add *some* external links; to avoid loading too many and gunking up the database with nonsense,
// only permit this once per external domain per crawled domain
for (var url : doc.details.linksExternal) {
if (seenDomains.add(url.domain)) {
seenUrls.add(url);
}
}
if (doc.isOk()) {
// Don't load more than a few from linksInternal, grows too big for no reason
var linksToAdd = new ArrayList<>(doc.details.linksInternal);
if (linksToAdd.size() > MAX_INTERNAL_LINKS) {
linksToAdd.subList(MAX_INTERNAL_LINKS, linksToAdd.size()).clear();
}
seenUrls.addAll(linksToAdd);
}
}
instructionConsumer.accept(new LoadDomain(seenDomains.toArray(EdgeDomain[]::new)));
instructionConsumer.accept(new LoadUrl(seenUrls.toArray(EdgeUrl[]::new)));
}
public void compileJustUrls(Consumer<Instruction> instructionConsumer, Iterator<EdgeUrl> urlsIterator) {
var urls = new ArrayList<EdgeUrl>(1000);
while (urlsIterator.hasNext()) {
if (urls.size() >= 1000) {
instructionConsumer.accept(new LoadUrl(urls.toArray(EdgeUrl[]::new)));
urls.clear();
}
urls.add(urlsIterator.next());
}
if (!urls.isEmpty()) {
instructionConsumer.accept(new LoadUrl(urls.toArray(EdgeUrl[]::new)));
}
}
public void compileJustDomain(Consumer<Instruction> instructionConsumer, EdgeDomain domain) {
instructionConsumer.accept(new LoadDomain(domain));
}
}

View File

@ -64,25 +64,6 @@ public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoC
return ret;
}
@Override
@SneakyThrows
public Iterator<EdgeUrl> getUrlsIterator() {
EdgeUrl base = new EdgeUrl("https://encyclopedia.marginalia.nu/");
return new SqlQueryIterator<>(connection.prepareStatement("""
SELECT url, html FROM articles
"""))
{
@Override
public EdgeUrl convert(ResultSet rs) throws Exception {
var path = URLEncoder.encode(rs.getString("url"), StandardCharsets.UTF_8);
return base.withPathAndParam("/article/"+path, null);
}
};
}
@SneakyThrows
@Override
public Iterator<ProcessedDocument> getDocumentsStream() {

View File

@ -8,7 +8,6 @@ import java.util.Iterator;
public interface SideloadSource {
ProcessedDomain getDomain();
Iterator<EdgeUrl> getUrlsIterator();
Iterator<ProcessedDocument> getDocumentsStream();
String getId();

View File

@ -22,7 +22,6 @@ import java.nio.file.Path;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
/** This code is broken */
@Deprecated()
@ -55,17 +54,6 @@ public class StackexchangeSideloader implements SideloadSource {
return ret;
}
@SneakyThrows
@Override
public Iterator<EdgeUrl> getUrlsIterator() {
var ids = reader.getIds();
return ids.stream()
.map(id -> EdgeUrl.parse("https://" + domainName + "/questions/" + id))
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();
}
@Override
public Iterator<ProcessedDocument> getDocumentsStream() {
try {

View File

@ -27,6 +27,7 @@ dependencies {
implementation project(':code:common:service')
implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client')
implementation project(':code:common:linkdb')
implementation project(':code:features-index:lexicon')
implementation project(':code:features-index:index-journal')
implementation project(':code:libraries:message-queue')

View File

@ -7,9 +7,9 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import lombok.SneakyThrows;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.linkdb.LinkdbWriter;
import nu.marginalia.loading.loader.IndexLoadKeywords;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.idx.DocumentMetadata;
@ -45,6 +45,7 @@ public class LoaderMain {
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final IndexLoadKeywords indexLoadKeywords;
private final LinkdbWriter writer;
private final Gson gson;
public static void main(String... args) throws Exception {
@ -73,6 +74,7 @@ public class LoaderMain {
MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService,
IndexLoadKeywords indexLoadKeywords,
LinkdbWriter writer,
Gson gson
) {
@ -82,6 +84,7 @@ public class LoaderMain {
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.indexLoadKeywords = indexLoadKeywords;
this.writer = writer;
this.gson = gson;
heartbeat.start();
@ -136,6 +139,7 @@ public class LoaderMain {
// This needs to be done in order to have a readable index journal
indexLoadKeywords.close();
writer.close();
logger.info("Loading finished");
}
catch (Exception ex) {
@ -215,7 +219,7 @@ public class LoaderMain {
public class InstructionCounter implements Interpreter {
private int count = 0;
public void loadKeywords(EdgeUrl url, int features, DocumentMetadata metadata, DocumentKeywords words) {
public void loadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) {
count++;
}

View File

@ -2,21 +2,29 @@ package nu.marginalia.loading;
import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Names;
import nu.marginalia.LanguageModels;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.WmsaHome;
import plan.CrawlPlan;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.linkdb.LinkdbStatusWriter;
import nu.marginalia.linkdb.LinkdbWriter;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.UUID;
public class LoaderModule extends AbstractModule {
public LoaderModule() {
}
@ -25,11 +33,32 @@ public class LoaderModule extends AbstractModule {
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("loader", 0, UUID.randomUUID()));
bind(Gson.class).toProvider(this::createGson);
bind(Path.class).annotatedWith(Names.named("local-index-path")).toInstance(Path.of(System.getProperty("local-index-path", "/vol")));
bind(LanguageModels.class).toInstance(WmsaHome.getLanguageModels());
}
@Inject @Provides @Singleton
private LinkdbWriter createLinkdbWriter(FileStorageService service) throws SQLException, IOException {
var storage = service.getStorageByType(FileStorageType.LINKDB_STAGING);
Path dbPath = storage.asPath().resolve("links.db");
if (Files.exists(dbPath)) {
Files.delete(dbPath);
}
return new LinkdbWriter(dbPath);
}
@Inject @Provides @Singleton
private LinkdbStatusWriter createLinkdbStatusWriter(FileStorageService service) throws SQLException, IOException {
var storage = service.getStorageByType(FileStorageType.LINKDB_STAGING);
Path dbPath = storage.asPath().resolve("urlstatus.db");
if (Files.exists(dbPath)) {
Files.delete(dbPath);
}
return new LinkdbStatusWriter(dbPath);
}
private Gson createGson() {
return GsonFactory.get();
}

View File

@ -3,9 +3,9 @@ package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import lombok.SneakyThrows;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.EdgeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -18,8 +18,7 @@ public class IndexLoadKeywords implements Runnable {
private final LinkedBlockingQueue<InsertTask> insertQueue = new LinkedBlockingQueue<>(32);
private final LoaderIndexJournalWriter journalWriter;
private record InsertTask(int urlId,
int domainId,
private record InsertTask(long combinedId,
int features,
DocumentMetadata metadata,
DocumentKeywords wordSet) {}
@ -40,7 +39,7 @@ public class IndexLoadKeywords implements Runnable {
while (!canceled) {
var data = insertQueue.poll(1, TimeUnit.SECONDS);
if (data != null) {
journalWriter.putWords(new EdgeId<>(data.domainId), new EdgeId<>(data.urlId),
journalWriter.putWords(data.combinedId,
data.features,
data.metadata(),
data.wordSet);
@ -57,18 +56,18 @@ public class IndexLoadKeywords implements Runnable {
}
public void load(LoaderData loaderData,
int ordinal,
EdgeUrl url,
int features,
DocumentMetadata metadata,
DocumentKeywords words) throws InterruptedException {
int domainId = loaderData.getDomainId(url.domain);
int urlId = loaderData.getUrlId(url);
long combinedId = UrlIdCodec.encodeId(loaderData.getTargetDomainId(), ordinal);
if (urlId <= 0 || domainId <= 0) {
logger.warn("Failed to get IDs for {} -- d={},u={}", url, domainId, urlId);
if (combinedId <= 0) {
logger.warn("Failed to get IDs for {} -- c={}", url, combinedId);
return;
}
insertQueue.put(new InsertTask(urlId, domainId, features, metadata, words));
insertQueue.put(new InsertTask(combinedId, features, metadata, words));
}
}

View File

@ -0,0 +1,83 @@
package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
import nu.marginalia.linkdb.LinkdbStatusWriter;
import nu.marginalia.linkdb.LinkdbWriter;
import nu.marginalia.linkdb.model.UrlDetail;
import nu.marginalia.linkdb.model.UrlStatus;
import nu.marginalia.model.id.UrlIdCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class LdbLoadProcessedDocument {
private static final Logger logger = LoggerFactory.getLogger(LdbLoadProcessedDocument.class);
private final LinkdbWriter linkdbWriter;
private final LinkdbStatusWriter linkdbStatusWriter;
@Inject
public LdbLoadProcessedDocument(LinkdbWriter linkdbWriter,
LinkdbStatusWriter linkdbStatusWriter
) {
this.linkdbWriter = linkdbWriter;
this.linkdbStatusWriter = linkdbStatusWriter;
}
public void load(LoaderData data, List<LoadProcessedDocument> documents) {
var details = new ArrayList<UrlDetail>();
int domainId = data.getTargetDomainId();
var statusList = new ArrayList<UrlStatus>();
for (var document : documents) {
long id = UrlIdCodec.encodeId(domainId, document.ordinal());
details.add(new UrlDetail(
id,
document.url(),
document.title(),
document.description(),
document.quality(),
document.standard(),
document.htmlFeatures(),
document.pubYear(),
document.hash(),
document.length()
));
statusList.add(new UrlStatus(id, document.url(), document.state().toString(), null));
}
try {
linkdbWriter.add(details);
}
catch (SQLException ex) {
logger.warn("Failed to add processed documents to linkdb", ex);
}
}
public void loadWithError(LoaderData data, List<LoadProcessedDocumentWithError> documents) {
var statusList = new ArrayList<UrlStatus>();
int domainId = data.getTargetDomainId();
for (var document : documents) {
statusList.add(new UrlStatus(
UrlIdCodec.encodeId(domainId, document.ordinal()),
document.url(),
document.state().toString(),
document.reason()
));
}
try {
linkdbStatusWriter.add(statusList);
}
catch (SQLException ex) {
logger.warn("Failed to add processed documents to linkdb", ex);
}
}
}

View File

@ -16,11 +16,10 @@ import java.util.ArrayList;
import java.util.List;
public class Loader implements Interpreter, AutoCloseable {
private final SqlLoadUrls sqlLoadUrls;
private final SqlLoadDomains sqlLoadDomains;
private final SqlLoadDomainLinks sqlLoadDomainLinks;
private final SqlLoadProcessedDomain sqlLoadProcessedDomain;
private final SqlLoadProcessedDocument sqlLoadProcessedDocument;
private final LdbLoadProcessedDocument loadProcessedDocument;
private final SqlLoadDomainMetadata sqlLoadDomainMetadata;
private final IndexLoadKeywords indexLoadKeywords;
@ -34,21 +33,19 @@ public class Loader implements Interpreter, AutoCloseable {
public final LoaderData data;
public Loader(int sizeHint,
SqlLoadUrls sqlLoadUrls,
SqlLoadDomains sqlLoadDomains,
SqlLoadDomainLinks sqlLoadDomainLinks,
SqlLoadProcessedDomain sqlLoadProcessedDomain,
SqlLoadProcessedDocument sqlLoadProcessedDocument,
LdbLoadProcessedDocument loadProcessedDocument,
SqlLoadDomainMetadata sqlLoadDomainMetadata,
IndexLoadKeywords indexLoadKeywords)
{
data = new LoaderData(sizeHint);
this.sqlLoadUrls = sqlLoadUrls;
this.sqlLoadDomains = sqlLoadDomains;
this.sqlLoadDomainLinks = sqlLoadDomainLinks;
this.sqlLoadProcessedDomain = sqlLoadProcessedDomain;
this.sqlLoadProcessedDocument = sqlLoadProcessedDocument;
this.loadProcessedDocument = loadProcessedDocument;
this.sqlLoadDomainMetadata = sqlLoadDomainMetadata;
this.indexLoadKeywords = indexLoadKeywords;
@ -56,12 +53,6 @@ public class Loader implements Interpreter, AutoCloseable {
processedDocumentWithErrorList = new ArrayList<>(sizeHint);
}
@Override
public void loadUrl(EdgeUrl[] urls) {
sqlLoadUrls.load(data, urls);
}
@Override
public void loadDomain(EdgeDomain[] domains) {
sqlLoadDomains.load(data, domains);
@ -87,25 +78,23 @@ public class Loader implements Interpreter, AutoCloseable {
processedDocumentList.add(document);
if (processedDocumentList.size() > 100) {
sqlLoadProcessedDocument.load(data, processedDocumentList);
loadProcessedDocument.load(data, processedDocumentList);
processedDocumentList.clear();
}
}
@Override
public void loadProcessedDocumentWithError(LoadProcessedDocumentWithError document) {
processedDocumentWithErrorList.add(document);
if (processedDocumentWithErrorList.size() > 100) {
sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
loadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
processedDocumentWithErrorList.clear();
}
}
@Override
public void loadKeywords(EdgeUrl url, int features, DocumentMetadata metadata, DocumentKeywords words) {
public void loadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) {
try {
indexLoadKeywords.load(data, url, features, metadata, words);
indexLoadKeywords.load(data, ordinal, url, features, metadata, words);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -123,10 +112,10 @@ public class Loader implements Interpreter, AutoCloseable {
public void close() {
if (processedDocumentList.size() > 0) {
sqlLoadProcessedDocument.load(data, processedDocumentList);
loadProcessedDocument.load(data, processedDocumentList);
}
if (processedDocumentWithErrorList.size() > 0) {
sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
loadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
}
}

View File

@ -2,17 +2,15 @@ package nu.marginalia.loading.loader;
import gnu.trove.map.hash.TObjectIntHashMap;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
public class LoaderData {
private final TObjectIntHashMap<EdgeUrl> urlIds;
private final TObjectIntHashMap<EdgeDomain> domainIds;
private EdgeDomain targetDomain;
public final int sizeHint;
private int targetDomainId = -1;
public LoaderData(int sizeHint) {
urlIds = new TObjectIntHashMap<>(sizeHint+1);
domainIds = new TObjectIntHashMap<>(10);
this.sizeHint = sizeHint;
}
@ -23,20 +21,16 @@ public class LoaderData {
public EdgeDomain getTargetDomain() {
return targetDomain;
}
public int getTargetDomainId() {
if (targetDomainId < 0)
targetDomainId = domainIds.get(targetDomain);
return targetDomainId;
}
public void addDomain(EdgeDomain domain, int id) {
domainIds.put(domain, id);
}
public void addUrl(EdgeUrl url, int id) {
urlIds.put(url, id);
}
public int getUrlId(EdgeUrl url) {
return urlIds.get(url);
}
public int getDomainId(EdgeDomain domain) {
return domainIds.get(domain);
}

View File

@ -3,24 +3,21 @@ package nu.marginalia.loading.loader;
import com.google.inject.Inject;
public class LoaderFactory {
private final SqlLoadUrls sqlLoadUrls;
private final SqlLoadDomains sqlLoadDomains;
private final SqlLoadDomainLinks sqlLoadDomainLinks;
private final SqlLoadProcessedDomain sqlLoadProcessedDomain;
private final SqlLoadProcessedDocument sqlLoadProcessedDocument;
private final LdbLoadProcessedDocument sqlLoadProcessedDocument;
private final SqlLoadDomainMetadata sqlLoadDomainMetadata;
private final IndexLoadKeywords indexLoadKeywords;
@Inject
public LoaderFactory(SqlLoadUrls sqlLoadUrls,
SqlLoadDomains sqlLoadDomains,
public LoaderFactory(SqlLoadDomains sqlLoadDomains,
SqlLoadDomainLinks sqlLoadDomainLinks,
SqlLoadProcessedDomain sqlLoadProcessedDomain,
SqlLoadProcessedDocument sqlLoadProcessedDocument,
LdbLoadProcessedDocument sqlLoadProcessedDocument,
SqlLoadDomainMetadata sqlLoadDomainMetadata,
IndexLoadKeywords indexLoadKeywords) {
this.sqlLoadUrls = sqlLoadUrls;
this.sqlLoadDomains = sqlLoadDomains;
this.sqlLoadDomainLinks = sqlLoadDomainLinks;
this.sqlLoadProcessedDomain = sqlLoadProcessedDomain;
@ -30,6 +27,6 @@ public class LoaderFactory {
}
public Loader create(int sizeHint) {
return new Loader(sizeHint, sqlLoadUrls, sqlLoadDomains, sqlLoadDomainLinks, sqlLoadProcessedDomain, sqlLoadProcessedDocument, sqlLoadDomainMetadata, indexLoadKeywords);
return new Loader(sizeHint, sqlLoadDomains, sqlLoadDomainLinks, sqlLoadProcessedDomain, sqlLoadProcessedDocument, sqlLoadDomainMetadata, indexLoadKeywords);
}
}

View File

@ -59,17 +59,17 @@ public class LoaderIndexJournalWriter {
new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES, keywordInsertTaskQueue);
@SneakyThrows
public void putWords(EdgeId<EdgeDomain> domain, EdgeId<EdgeUrl> url,
public void putWords(long combinedId,
int features,
DocumentMetadata metadata,
DocumentKeywords wordSet) {
if (wordSet.keywords().length == 0) {
logger.info("Skipping zero-length word set for {}:{}", domain, url);
logger.info("Skipping zero-length word set for {}", combinedId);
return;
}
if (domain.id() <= 0 || url.id() <= 0) {
logger.warn("Bad ID: {}:{}", domain, url);
if (combinedId <= 0) {
logger.warn("Bad ID: {}", combinedId);
return;
}
@ -77,27 +77,26 @@ public class LoaderIndexJournalWriter {
// with a chonky work queue is a fairly decent improvement
for (var chunk : KeywordListChunker.chopList(wordSet, IndexJournalEntryData.MAX_LENGTH)) {
try {
keywordInsertionExecutor.submit(() -> loadWords(domain, url, features, metadata, chunk));
keywordInsertionExecutor.submit(() -> loadWords(combinedId, features, metadata, chunk));
}
catch (RejectedExecutionException ex) {
loadWords(domain, url, features, metadata, chunk);
loadWords(combinedId, features, metadata, chunk);
}
}
}
private void loadWords(EdgeId<EdgeDomain> domain,
EdgeId<EdgeUrl> url,
private void loadWords(long combinedId,
int features,
DocumentMetadata metadata,
DocumentKeywords wordSet) {
if (null == metadata) {
logger.warn("Null metadata for {}:{}", domain, url);
logger.warn("Null metadata for {}", combinedId);
return;
}
var entry = new IndexJournalEntryData(getOrInsertWordIds(wordSet.keywords(), wordSet.metadata()));
var header = new IndexJournalEntryHeader(domain, features, url, metadata.encode());
var header = new IndexJournalEntryHeader(combinedId, features, metadata.encode());
indexWriter.put(header, entry);
}

View File

@ -1,187 +0,0 @@
package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import static java.sql.Statement.SUCCESS_NO_INFO;
public class SqlLoadProcessedDocument {
private final HikariDataSource dataSource;
private static final Logger logger = LoggerFactory.getLogger(SqlLoadProcessedDocument.class);
@Inject
public SqlLoadProcessedDocument(HikariDataSource dataSource) {
this.dataSource = dataSource;
try (var conn = dataSource.getConnection()) {
try (var stmt = conn.createStatement()) {
stmt.execute("DROP PROCEDURE IF EXISTS INSERT_PAGE_VISIT");
stmt.execute("DROP PROCEDURE IF EXISTS INSERT_PAGE_VISIT_BAD");
stmt.execute("""
CREATE PROCEDURE INSERT_PAGE_VISIT (
IN URL_ID INT,
IN STATE VARCHAR(32),
IN TITLE VARCHAR(255),
IN DESCRIPTION VARCHAR(255),
IN LENGTH INT,
IN FEATURES INT,
IN STANDARD VARCHAR(32),
IN QUALITY DOUBLE,
IN HASH BIGINT,
IN PUB_YEAR SMALLINT)
BEGIN
SET FOREIGN_KEY_CHECKS=0;
REPLACE INTO EC_PAGE_DATA(ID, TITLE, DESCRIPTION, WORDS_TOTAL, FORMAT, FEATURES, DATA_HASH, QUALITY, PUB_YEAR) VALUES (URL_ID, TITLE, DESCRIPTION, LENGTH, STANDARD, FEATURES, HASH, QUALITY, PUB_YEAR);
UPDATE EC_URL SET VISITED=1, STATE=STATE WHERE ID=URL_ID;
SET FOREIGN_KEY_CHECKS=1;
END
""");
stmt.execute("""
CREATE PROCEDURE INSERT_PAGE_VISIT_BAD (
IN URL_ID INT,
IN STATE VARCHAR(32))
BEGIN
UPDATE EC_URL SET VISITED=1, STATE=STATE WHERE ID=URL_ID;
DELETE FROM EC_PAGE_DATA WHERE ID=URL_ID;
END
""");
}
}
catch (SQLException ex) {
throw new RuntimeException("Failed to set up loader", ex);
}
}
public void load(LoaderData data, List<LoadProcessedDocument> documents) {
try (var conn = dataSource.getConnection()) {
try (var insertCall = conn.prepareCall("CALL INSERT_PAGE_VISIT(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
) {
conn.setAutoCommit(false);
int cnt = 0;
int batchOffset = 0;
for (var doc : documents) {
int urlId = data.getUrlId(doc.url());
if (urlId <= 0) {
logger.warn("Failed to resolve ID for URL {}", doc.url());
continue;
}
insertCall.setInt(1, urlId);
insertCall.setString(2, doc.state().name());
insertCall.setString(3, doc.title());
insertCall.setString(4, StringUtils.truncate(doc.description(), 255));
insertCall.setInt(5, doc.length());
insertCall.setInt(6, doc.htmlFeatures());
insertCall.setString(7, doc.standard());
insertCall.setDouble(8, doc.quality());
insertCall.setLong(9, doc.hash());
if (doc.pubYear() != null) {
insertCall.setShort(10, (short) doc.pubYear().intValue());
} else {
insertCall.setInt(10, Types.SMALLINT);
}
insertCall.addBatch();
if (++cnt == 100) {
var ret = insertCall.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
cnt = 0;
batchOffset += 100;
}
}
if (cnt > 0) {
var ret = insertCall.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
}
conn.setAutoCommit(true);
}
catch (SQLException ex) {
conn.rollback();
throw ex;
}
} catch (SQLException ex) {
logger.warn("SQL error inserting document", ex);
if (getClass().desiredAssertionStatus())
throw new RuntimeException(ex);
}
}
public void loadWithError(LoaderData data, List<LoadProcessedDocumentWithError> documents) {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareCall("CALL INSERT_PAGE_VISIT_BAD(?, ?)")) {
conn.setAutoCommit(false);
int cnt = 0; int batchOffset = 0;
for (var doc : documents) {
int urlId = data.getUrlId(doc.url());
if (urlId < 0) {
logger.warn("Failed to resolve ID for URL {}", doc.url());
return;
}
stmt.setInt(1, urlId);
stmt.setString(2, doc.state().name());
stmt.addBatch();
if (++cnt == 100) {
var ret = stmt.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
cnt = 0;
batchOffset += 100;
}
}
if (cnt > 0) {
var ret = stmt.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
}
conn.setAutoCommit(true);
} catch (SQLException ex) {
logger.warn("SQL error inserting failed document", ex);
if (getClass().desiredAssertionStatus())
throw new RuntimeException(ex);
}
}
}

View File

@ -14,14 +14,12 @@ import java.sql.SQLException;
public class SqlLoadProcessedDomain {
private final HikariDataSource dataSource;
private final SqlLoadDomains loadDomains;
private final SqlLoadUrls loadUrls;
private static final Logger logger = LoggerFactory.getLogger(SqlLoadProcessedDomain.class);
@Inject
public SqlLoadProcessedDomain(HikariDataSource dataSource, SqlLoadDomains loadDomains, SqlLoadUrls loadUrls) {
public SqlLoadProcessedDomain(HikariDataSource dataSource, SqlLoadDomains loadDomains) {
this.dataSource = dataSource;
this.loadDomains = loadDomains;
this.loadUrls = loadUrls;
try (var conn = dataSource.getConnection()) {
@ -69,8 +67,6 @@ public class SqlLoadProcessedDomain {
if (rc < 1) {
logger.warn("load({},{}) -- bad rowcount {}", domain, state, rc);
}
loadUrls.loadUrlsForDomain(data, domain, 0);
}
catch (SQLException ex) {
conn.rollback();

View File

@ -1,151 +0,0 @@
package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.hash.MurmurHash3_128;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashSet;
import java.util.Set;
import static java.sql.Statement.SUCCESS_NO_INFO;
public class SqlLoadUrls {
private final HikariDataSource dataSource;
private static final Logger logger = LoggerFactory.getLogger(SqlLoadUrls.class);
@Inject
public SqlLoadUrls(HikariDataSource dataSource) {
this.dataSource = dataSource;
}
private final MurmurHash3_128 murmurHash = new MurmurHash3_128();
public void load(LoaderData data, EdgeUrl[] urls) {
Set<EdgeDomain> affectedDomains = new HashSet<>();
if (urls.length == 0)
return;
int maxOldId = 0;
try (var conn = dataSource.getConnection()) {
try (var insertStmt = conn.prepareStatement("INSERT IGNORE INTO EC_URL (PROTO,DOMAIN_ID,PORT,PATH,PARAM,PATH_HASH) VALUES (?,?,?,?,?,?)");
var queryMaxId = conn.prepareStatement("SELECT MAX(ID) FROM EC_URL")) {
conn.setAutoCommit(false);
var rs = queryMaxId.executeQuery();
if (rs.next()) {
maxOldId = rs.getInt(1);
}
int cnt = 0;
int batchOffset = 0;
for (var url : urls) {
if (data.getUrlId(url) != 0)
continue;
if (url.path.length() >= 255) {
logger.info("Skipping bad URL {}", url);
continue;
}
var domainId = data.getDomainId(url.domain);
affectedDomains.add(url.domain);
insertStmt.setString(1, url.proto);
insertStmt.setInt(2, domainId);
if (url.port != null) {
insertStmt.setInt(3, url.port);
} else {
insertStmt.setNull(3, Types.INTEGER);
}
insertStmt.setString(4, url.path);
insertStmt.setString(5, url.param);
insertStmt.setLong(6, hashPath(url.path, url.param));
insertStmt.addBatch();
if (++cnt == 1000) {
var ret = insertStmt.executeBatch();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]);
}
}
batchOffset += cnt;
cnt = 0;
}
}
if (cnt > 0) {
var ret = insertStmt.executeBatch();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]);
}
}
}
conn.commit();
conn.setAutoCommit(true);
for (var domain : affectedDomains) {
loadUrlsForDomain(data, domain, maxOldId);
}
}
catch (SQLException ex) {
conn.rollback();
throw ex;
}
}
catch (SQLException ex) {
logger.warn("SQL error inserting URLs", ex);
if (getClass().desiredAssertionStatus())
throw new RuntimeException(ex);
}
}
/* We use a uniqueness constraint on DOMAIN_ID and this hash instead of on the PATH and PARAM
* fields as the uniqueness index grows absurdly large for some reason, possibly due to the prevalent
* shared leading substrings in paths?
*/
private long hashPath(String path, String queryParam) {
long hash = murmurHash.hashNearlyASCII(path);
if (queryParam != null) {
hash ^= murmurHash.hashNearlyASCII(queryParam);
}
return hash;
}
/** Loads urlIDs for the domain into `data` from the database, starting at URL ID minId. */
public void loadUrlsForDomain(LoaderData data, EdgeDomain domain, int minId) throws SQLException {
try (var conn = dataSource.getConnection();
var queryCall = conn.prepareStatement("SELECT ID, PROTO, PATH, PARAM FROM EC_URL WHERE DOMAIN_ID=? AND ID > ?")) {
queryCall.setFetchSize(1000);
queryCall.setInt(1, data.getDomainId(domain));
queryCall.setInt(2, minId);
var rsp = queryCall.executeQuery();
while (rsp.next()) {
int urlId = rsp.getInt(1);
String proto = rsp.getString(2);
String path = rsp.getString(3);
String param = rsp.getString(4);
data.addUrl(new EdgeUrl(proto, domain, null, path, param), urlId);
}
}
}
}

View File

@ -1,96 +0,0 @@
package nu.marginalia.loader;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.search.db.DbUrlDetailsQuery;
import nu.marginalia.loading.loader.LoaderData;
import nu.marginalia.loading.loader.SqlLoadDomains;
import nu.marginalia.loading.loader.SqlLoadProcessedDocument;
import nu.marginalia.loading.loader.SqlLoadUrls;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.crawl.UrlIndexingState;
import nu.marginalia.model.crawl.HtmlFeature;
import nu.marginalia.model.id.EdgeIdArray;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("slow")
@Testcontainers
class SqlLoadProcessedDocumentTest {
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("db/migration/V23_06_0_000__base.sql")
.withNetworkAliases("mariadb");
HikariDataSource dataSource;
LoaderData loaderData;
DbUrlDetailsQuery dbUrlDetailsQuery;
@BeforeEach
public void setUp() throws URISyntaxException {
dataSource = DbTestUtil.getConnection(mariaDBContainer.getJdbcUrl());
dbUrlDetailsQuery = new DbUrlDetailsQuery(dataSource);
var loadDomains = new SqlLoadDomains(dataSource);
var loadUrls = new SqlLoadUrls(dataSource);
loaderData = new LoaderData(10);
loaderData.setTargetDomain(new EdgeDomain("www.marginalia.nu"));
loadDomains.load(loaderData, new EdgeDomain("www.marginalia.nu"));
loadUrls.load(loaderData, new EdgeUrl[]{new EdgeUrl("https://www.marginalia.nu/")});
}
@AfterEach
public void tearDown() {
dataSource.close();
}
@Test
public void loadProcessedDocument() throws URISyntaxException {
var loader = new SqlLoadProcessedDocument(dataSource);
var url = new EdgeUrl("https://www.marginalia.nu/");
loader.load(loaderData, List.of(new LoadProcessedDocument(
url,
UrlIndexingState.OK,
"TITLE",
"DESCR",
HtmlFeature.encode(Set.of(HtmlFeature.AFFILIATE_LINK)),
HtmlStandard.HTML5.name(),
100,
12345,
-3.14,
null
)));
var details = dbUrlDetailsQuery.getUrlDetailsMulti(new EdgeIdArray<>(loaderData.getUrlId(new EdgeUrl("https://www.marginalia.nu/"))));
Assertions.assertEquals(1, details.size());
var urlDetails = details.get(0);
assertEquals("TITLE", urlDetails.getTitle());
assertEquals("DESCR", urlDetails.getDescription());
assertTrue(urlDetails.isAffiliate());
assertEquals(100, urlDetails.words);
assertEquals(12345, urlDetails.dataHash);
assertEquals(-3.14, urlDetails.getUrlQuality());
}
}

View File

@ -5,7 +5,6 @@ import nu.marginalia.loading.loader.LoaderData;
import nu.marginalia.loading.loader.SqlLoadDomains;
import nu.marginalia.loading.loader.SqlLoadProcessedDomain;
import nu.marginalia.converting.instruction.instructions.DomainLink;
import nu.marginalia.loading.loader.SqlLoadUrls;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.crawl.DomainIndexingState;
import org.junit.jupiter.api.AfterEach;
@ -51,18 +50,18 @@ class SqlLoadProcessedDomainTest {
@Test
public void loadProcessedDomain() {
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource));
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource));
loader.load(loaderData, new EdgeDomain("www.marginalia.nu"), DomainIndexingState.BLOCKED, "127.0.0.1");
}
@Test
public void loadProcessedDomainTwice() {
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource));
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource));
loader.load(loaderData, new EdgeDomain("www.marginalia.nu"), DomainIndexingState.BLOCKED, "127.0.0.1");
}
@Test
public void loadProcessedDomaiWithExtremelyLongIP() {
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource));
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource));
String ip = Stream.generate(() -> "127.").limit(1024).collect(Collectors.joining());
@ -71,7 +70,7 @@ class SqlLoadProcessedDomainTest {
@Test
public void loadDomainAlias() {
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource));
var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource));
loader.loadAlias(loaderData, new DomainLink(new EdgeDomain("memex.marginalia.nu"), new EdgeDomain("www.marginalia.nu")));
}
}

View File

@ -1,54 +0,0 @@
package nu.marginalia.loader;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.loading.loader.LoaderData;
import nu.marginalia.loading.loader.SqlLoadDomains;
import nu.marginalia.loading.loader.SqlLoadUrls;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.net.URISyntaxException;
@Tag("slow")
@Testcontainers
class SqlLoadUrlsTest {
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("db/migration/V23_06_0_000__base.sql")
.withNetworkAliases("mariadb");
HikariDataSource dataSource;
LoaderData loaderData;
@BeforeEach
public void setUp() {
dataSource = DbTestUtil.getConnection(mariaDBContainer.getJdbcUrl());
var loadDomains = new SqlLoadDomains(dataSource);
loaderData = new LoaderData(10);
loaderData.setTargetDomain(new EdgeDomain("www.marginalia.nu"));
loadDomains.load(loaderData, new EdgeDomain("www.marginalia.nu"));
}
@AfterEach
public void tearDown() {
dataSource.close();
}
@Test
public void loadUrl() throws URISyntaxException {
var loadUrls = new SqlLoadUrls(dataSource);
loadUrls.load(loaderData, new EdgeUrl[] { new EdgeUrl("https://www.marginalia.nu/") });
}
}