WIP Loading

This commit is contained in:
vlofgren 2022-09-05 17:51:49 +02:00
parent c912d3127d
commit c6976acdfc
15 changed files with 260 additions and 71 deletions

View File

@ -6,11 +6,15 @@ import com.google.inject.name.Names;
import marcono1234.gson.recordadapter.RecordTypeAdapterFactory;
import nu.marginalia.util.language.conf.LanguageModels;
import nu.marginalia.wmsa.configuration.WmsaHome;
import nu.marginalia.wmsa.edge.index.client.EdgeIndexClient;
import nu.marginalia.wmsa.edge.index.client.EdgeIndexLocalService;
import nu.marginalia.wmsa.edge.index.client.EdgeIndexWriterClient;
import nu.marginalia.wmsa.edge.model.EdgeCrawlPlan;
import nu.marginalia.wmsa.edge.model.EdgeDomain;
import nu.marginalia.wmsa.edge.model.EdgeUrl;
import java.net.URISyntaxException;
import java.nio.file.Path;
public class ConverterModule extends AbstractModule {
@ -31,6 +35,15 @@ public class ConverterModule extends AbstractModule {
bind(Integer.class).annotatedWith(Names.named("max-title-length")).toInstance(128);
bind(Integer.class).annotatedWith(Names.named("max-summary-length")).toInstance(255);
if (null != System.getProperty("local-index-path")) {
bind(Path.class).annotatedWith(Names.named("local-index-path")).toInstance(Path.of(System.getProperty("local-index-path")));
bind(EdgeIndexWriterClient.class).to(EdgeIndexLocalService.class);
}
else {
bind(EdgeIndexWriterClient.class).to(EdgeIndexClient.class);
}
bind(LanguageModels.class).toInstance(WmsaHome.getLanguageModels());
}

View File

@ -2,11 +2,10 @@ package nu.marginalia.wmsa.edge.converting;
import nu.marginalia.wmsa.configuration.module.DatabaseModule;
import nu.marginalia.wmsa.configuration.server.Context;
import nu.marginalia.wmsa.edge.converting.interpreter.instruction.DocumentKeywords;
import nu.marginalia.wmsa.edge.index.client.EdgeIndexClient;
import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.model.EdgeId;
import nu.marginalia.wmsa.edge.model.crawl.EdgePageWordSet;
import nu.marginalia.wmsa.edge.model.crawl.EdgePageWords;
import java.io.IOException;
import java.nio.file.Files;
@ -76,9 +75,8 @@ public class LinkKeywordLoaderMain {
// System.out.println(lastLine + " -/- " + domainId + ":" + urlId + " : " + keywords);
indexClient.putWords(Context.internal(), new EdgeId<>(domainId), new EdgeId<>(urlId), new EdgePageWordSet(
new EdgePageWords(IndexBlock.Link, new HashSet<>(keywords))), 0
).blockingSubscribe();
indexClient.putWords(Context.internal(), new EdgeId<>(domainId), new EdgeId<>(urlId),
new DocumentKeywords(IndexBlock.Link, keywords.toArray(String[]::new)), 0);
}
lastLine = urlKeyword.url;

View File

@ -27,7 +27,6 @@ public class LoaderMain {
private static final Logger logger = LoggerFactory.getLogger(LoaderMain.class);
private final Path processDir;
private final EdgeCrawlPlan plan;
private final ConvertedDomainReader instructionsReader;
private final LoaderFactory loaderFactory;
@ -59,7 +58,6 @@ public class LoaderMain {
LoaderFactory loaderFactory,
EdgeIndexClient indexClient) {
this.processDir = plan.process.getDir();
this.plan = plan;
this.instructionsReader = instructionsReader;
this.loaderFactory = loaderFactory;

View File

@ -4,23 +4,21 @@ import com.google.inject.Inject;
import lombok.SneakyThrows;
import nu.marginalia.wmsa.configuration.server.Context;
import nu.marginalia.wmsa.edge.converting.interpreter.instruction.DocumentKeywords;
import nu.marginalia.wmsa.edge.index.client.EdgeIndexClient;
import nu.marginalia.wmsa.edge.index.client.EdgeIndexWriterClient;
import nu.marginalia.wmsa.edge.model.EdgeId;
import nu.marginalia.wmsa.edge.model.EdgeUrl;
import nu.marginalia.wmsa.edge.model.crawl.EdgePageWordSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class IndexLoadKeywords implements Runnable {
private final EdgeIndexClient client;
private static final Logger logger = LoggerFactory.getLogger(IndexLoadKeywords.class);
private final LinkedBlockingQueue<InsertTask> insertQueue = new LinkedBlockingQueue<>(32);
private EdgeIndexWriterClient client;
private record InsertTask(int urlId, int domainId, EdgePageWordSet wordSet) {}
private record InsertTask(int urlId, int domainId, DocumentKeywords wordSet) {}
private final Thread runThread;
private volatile boolean canceled = false;
@ -28,7 +26,7 @@ public class IndexLoadKeywords implements Runnable {
private static final int index = Integer.getInteger("keyword-index", 1);
@Inject
public IndexLoadKeywords(EdgeIndexClient client) {
public IndexLoadKeywords(EdgeIndexWriterClient client) {
this.client = client;
runThread = new Thread(this, getClass().getSimpleName());
runThread.start();
@ -39,7 +37,7 @@ public class IndexLoadKeywords implements Runnable {
while (!canceled) {
var data = insertQueue.poll(1, TimeUnit.SECONDS);
if (data != null) {
client.putWords(Context.internal(), new EdgeId<>(data.domainId), new EdgeId<>(data.urlId), data.wordSet, index).blockingSubscribe();
client.putWords(Context.internal(), new EdgeId<>(data.domainId), new EdgeId<>(data.urlId), data.wordSet, index);
}
}
}
@ -57,11 +55,8 @@ public class IndexLoadKeywords implements Runnable {
logger.warn("Failed to get IDs for {} -- d={},u={}", url, domainId, urlId);
}
var ws = new EdgePageWordSet();
for (var doc : words) {
ws.append(doc.block(), Arrays.asList(doc.keywords()));
for (var ws : words) {
insertQueue.put(new InsertTask(urlId, domainId, ws));
}
insertQueue.put(new InsertTask(urlId, domainId, ws));
}
}

View File

@ -67,16 +67,34 @@ public class SqlLoadDomains {
try (var insertCall = connection.prepareCall("CALL INSERT_DOMAIN(?,?)")) {
int cnt = 0; int batchOffset = 0;
for (var domain : domains) {
insertCall.setString(1, domain.toString());
insertCall.setString(2, domain.domain);
insertCall.addBatch();
}
var ret = insertCall.executeBatch();
for (int rv = 0; rv < domains.length; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", domains[rv], ret[rv]);
if (++cnt == 1000) {
var ret = insertCall.executeBatch();
connection.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", domains[batchOffset + rv], ret[rv]);
}
}
cnt = 0;
batchOffset += 1000;
}
}
if (cnt > 0) {
var ret = insertCall.executeBatch();
connection.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", domains[batchOffset + rv], ret[rv]);
}
}
}

View File

@ -64,6 +64,7 @@ public class SqlLoadProcessedDocument {
var stmt = conn.prepareCall("CALL INSERT_PAGE_VISIT(?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
conn.setAutoCommit(false);
int cnt = 0; int batchOffset = 0;
for (var doc : documents) {
int urlId = data.getUrlId(doc.url());
if (urlId < 0) {
@ -81,16 +82,31 @@ public class SqlLoadProcessedDocument {
stmt.setDouble(8, doc.quality());
stmt.setInt(9, (int) doc.hash());
stmt.addBatch();
}
var ret = stmt.executeBatch();
for (int rv = 0; rv < documents.size(); rv++) {
if (ret[rv] < 1 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(rv), ret[rv]);
if (++cnt == 100) {
var ret = stmt.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
cnt = 0;
batchOffset += 100;
}
}
if (cnt > 0) {
var ret = stmt.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
}
conn.commit();
} catch (SQLException ex) {
logger.warn("SQL error inserting document", ex);
}
@ -100,6 +116,7 @@ public class SqlLoadProcessedDocument {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareCall("CALL INSERT_PAGE_VISIT_BAD(?, ?)")) {
int cnt = 0; int batchOffset = 0;
for (var doc : documents) {
int urlId = data.getUrlId(doc.url());
if (urlId < 0) {
@ -110,13 +127,31 @@ public class SqlLoadProcessedDocument {
stmt.setInt(1, urlId);
stmt.setString(2, doc.state().name());
stmt.addBatch();
}
var ret = stmt.executeBatch();
for (int rv = 0; rv < documents.size(); rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(rv), ret[rv]);
if (++cnt == 100) {
var ret = stmt.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
cnt = 0;
batchOffset += 100;
}
}
if (cnt > 0) {
var ret = stmt.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", documents.get(batchOffset + rv), ret[rv]);
}
}
}
} catch (SQLException ex) {
logger.warn("SQL error inserting failed document", ex);
}

View File

@ -52,6 +52,8 @@ public class SqlLoadUrls {
)
{
conn.setAutoCommit(false);
int cnt = 0; int batchOffset = 0;
for (var url : urls) {
if (url.path.length() >= 255) {
logger.warn("Skipping bad URL {}", url);
@ -70,11 +72,29 @@ public class SqlLoadUrls {
insertCall.setString(5, url.param);
insertCall.setLong(6, hashPath(url.path, url.param));
insertCall.addBatch();
if (++cnt == 250) {
var ret = insertCall.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]);
}
}
cnt = 0;
batchOffset += 250;
}
}
var ret = insertCall.executeBatch();
for (int rv = 0; rv < ret.length; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", urls[rv], ret[rv]);
if (cnt > 0) {
var ret = insertCall.executeBatch();
conn.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]);
}
}
}
@ -86,6 +106,7 @@ public class SqlLoadUrls {
queryCall.setInt(1, data.getDomainId(targetDomain));
var rsp = queryCall.executeQuery();
rsp.setFetchSize(urls.length);
while (rsp.next()) {
int urlId = rsp.getInt(1);

View File

@ -190,10 +190,20 @@ public class EdgeIndexService extends Service {
}
private long[] getOrInsertWordIds(List<String> words) {
return words.stream()
.filter(w -> w.getBytes().length < Byte.MAX_VALUE)
.mapToLong(keywordLexicon::getOrInsert)
.toArray();
long[] ids = new long[words.size()];
int putId = 0;
for (String word : words) {
long id = keywordLexicon.getOrInsert(word);
if (id != DictionaryHashMap.NO_VALUE) {
ids[putId++] = id;
}
}
if (putId != words.size()) {
ids = Arrays.copyOf(ids, putId);
}
return ids;
}
private Object searchDomain(Request request, Response response) {

View File

@ -6,13 +6,12 @@ import com.google.inject.Singleton;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import nu.marginalia.wmsa.client.AbstractDynamicClient;
import nu.marginalia.wmsa.client.HttpStatusCode;
import nu.marginalia.wmsa.configuration.ServiceDescriptor;
import nu.marginalia.wmsa.configuration.server.Context;
import nu.marginalia.wmsa.edge.converting.interpreter.instruction.DocumentKeywords;
import nu.marginalia.wmsa.edge.model.EdgeDomain;
import nu.marginalia.wmsa.edge.model.EdgeId;
import nu.marginalia.wmsa.edge.model.EdgeUrl;
import nu.marginalia.wmsa.edge.model.crawl.EdgePageWordSet;
import nu.marginalia.wmsa.edge.model.search.EdgeSearchResultSet;
import nu.marginalia.wmsa.edge.model.search.EdgeSearchSpecification;
import nu.marginalia.wmsa.edge.model.search.domain.EdgeDomainSearchResults;
@ -26,7 +25,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
@Singleton
public class EdgeIndexClient extends AbstractDynamicClient {
public class EdgeIndexClient extends AbstractDynamicClient implements EdgeIndexWriterClient {
private final Gson gson = new GsonBuilder()
.create();
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -36,10 +35,10 @@ public class EdgeIndexClient extends AbstractDynamicClient {
setTimeout(30);
}
@CheckReturnValue
public Observable<HttpStatusCode> putWords(Context ctx, EdgeId<EdgeDomain> domain, EdgeId<EdgeUrl> url,
EdgePageWordSet wordSet, int writer
)
@Override
public void putWords(Context ctx, EdgeId<EdgeDomain> domain, EdgeId<EdgeUrl> url,
DocumentKeywords wordSet, int writer
)
{
var keywordBuilder =
@ -48,16 +47,16 @@ public class EdgeIndexClient extends AbstractDynamicClient {
.setUrl(url.id())
.setIndex(writer);
for (var set : wordSet.wordSets.values()) {
for (var set : wordSet.keywords()) {
var wordSetBuilder = IndexPutKeywordsReq.WordSet.newBuilder();
wordSetBuilder.setIndex(set.block.ordinal());
wordSetBuilder.addAllWords(set.words);
wordSetBuilder.setIndex(wordSet.block().ordinal());
wordSetBuilder.addAllWords(List.of(wordSet.keywords()));
keywordBuilder.addWordSet(wordSetBuilder.build());
}
var req = keywordBuilder.build();
return this.post(ctx, "/words/", req);
this.post(ctx, "/words/", req).blockingSubscribe();
}

View File

@ -0,0 +1,79 @@
package nu.marginalia.wmsa.edge.index.client;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import nu.marginalia.util.ListChunker;
import nu.marginalia.util.dict.DictionaryHashMap;
import nu.marginalia.wmsa.configuration.server.Context;
import nu.marginalia.wmsa.edge.converting.interpreter.instruction.DocumentKeywords;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriterImpl;
import nu.marginalia.wmsa.edge.index.journal.model.SearchIndexJournalEntry;
import nu.marginalia.wmsa.edge.index.journal.model.SearchIndexJournalEntryHeader;
import nu.marginalia.wmsa.edge.index.lexicon.KeywordLexicon;
import nu.marginalia.wmsa.edge.index.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.wmsa.edge.model.EdgeDomain;
import nu.marginalia.wmsa.edge.model.EdgeId;
import nu.marginalia.wmsa.edge.model.EdgeUrl;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
@Singleton
public class EdgeIndexLocalService implements EdgeIndexWriterClient {
private final KeywordLexicon lexicon;
private final SearchIndexJournalWriterImpl indexWriter;
@Inject
public EdgeIndexLocalService(@Named("local-index-path") Path path) throws IOException {
long hashMapSize = 1L << 31;
if (Boolean.getBoolean("small-ram")) {
hashMapSize = 1L << 27;
}
var lexiconJournal = new KeywordLexiconJournal(path.resolve("dictionary.dat").toFile());
lexicon = new KeywordLexicon(lexiconJournal, new DictionaryHashMap(hashMapSize));
indexWriter = new SearchIndexJournalWriterImpl(lexicon, path.resolve("index.dat").toFile());
}
public void putWords(Context ctx, EdgeId<EdgeDomain> domain, EdgeId<EdgeUrl> url,
DocumentKeywords wordSet, int writer) {
if (wordSet.keywords().length == 0) return;
for (var chunk : ListChunker.chopList(List.of(wordSet.keywords()), SearchIndexJournalEntry.MAX_LENGTH)) {
var entry = new SearchIndexJournalEntry(getOrInsertWordIds(chunk));
var header = new SearchIndexJournalEntryHeader(domain.id(), url.id(), wordSet.block());
indexWriter.put(header, entry);
}
}
private long[] getOrInsertWordIds(List<String> words) {
long[] ids = new long[words.size()];
int putId = 0;
for (String word : words) {
long id = lexicon.getOrInsert(word);
if (id != DictionaryHashMap.NO_VALUE) {
ids[putId++] = id;
}
}
if (putId != words.size()) {
ids = Arrays.copyOf(ids, putId);
}
return ids;
}
@Override
public void close() throws Exception {
indexWriter.close();
lexicon.close();
}
}

View File

@ -0,0 +1,13 @@
package nu.marginalia.wmsa.edge.index.client;
import nu.marginalia.wmsa.configuration.server.Context;
import nu.marginalia.wmsa.edge.converting.interpreter.instruction.DocumentKeywords;
import nu.marginalia.wmsa.edge.model.EdgeDomain;
import nu.marginalia.wmsa.edge.model.EdgeId;
import nu.marginalia.wmsa.edge.model.EdgeUrl;
public interface EdgeIndexWriterClient extends AutoCloseable {
void putWords(Context ctx, EdgeId<EdgeDomain> domain, EdgeId<EdgeUrl> url,
DocumentKeywords wordSets, int writer);
}

View File

@ -19,7 +19,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class SearchIndexJournalWriterImpl implements SearchIndexJournalWriter {
private final KeywordLexicon dictionaryWriter;
private final KeywordLexicon lexicon;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Disposable writerTask;
@ -31,8 +31,8 @@ public class SearchIndexJournalWriterImpl implements SearchIndexJournalWriter {
private long pos;
@SneakyThrows
public SearchIndexJournalWriterImpl(KeywordLexicon dictionaryWriter, File indexFile) {
this.dictionaryWriter = dictionaryWriter;
public SearchIndexJournalWriterImpl(KeywordLexicon lexicon, File indexFile) {
this.lexicon = lexicon;
initializeIndexFile(indexFile);
byteBuffer = ByteBuffer.allocate(MAX_BLOCK_SIZE);
@ -113,14 +113,14 @@ public class SearchIndexJournalWriterImpl implements SearchIndexJournalWriter {
@Override
public void flushWords() {
dictionaryWriter.commitToDisk();
lexicon.commitToDisk();
}
private void writePositionMarker() throws IOException {
pos = channel.size();
raf.seek(0);
raf.writeLong(pos);
raf.writeLong(dictionaryWriter.size());
raf.writeLong(lexicon.size());
raf.seek(pos);
}

View File

@ -53,7 +53,7 @@ public class KeywordLexicon implements AutoCloseable {
@SneakyThrows
private int getOrInsert(byte[] bytes) {
if (bytes.length >= Byte.MAX_VALUE) {
logger.warn("getOrInsert({}), illegal length {}", bytes, bytes.length);
logger.warn("getOrInsert({}), illegal length {}", new String(bytes), bytes.length);
return DictionaryHashMap.NO_VALUE;
}

View File

@ -96,11 +96,23 @@ public class EdgeUrl implements WideHashable {
}
public EdgeUrl(URI URI) {
this.domain = new EdgeDomain(URI.getHost());
this.path = URI.getPath().isEmpty() ? "/" : URI.getPath();
this.proto = URI.getScheme().toLowerCase();
this.port = port(URI.getPort(), proto);
this.param = QueryParams.queryParamsSanitizer(this.path, URI.getQuery());
try {
String host = URI.getHost();
if (host == null) { // deal with a rare serialization error
host = "parse-error.invalid.example.com";
}
this.domain = new EdgeDomain(host);
this.path = URI.getPath().isEmpty() ? "/" : URI.getPath();
this.proto = URI.getScheme().toLowerCase();
this.port = port(URI.getPort(), proto);
this.param = QueryParams.queryParamsSanitizer(this.path, URI.getQuery());
}
catch (Exception ex) {
System.err.println("Failed to parse " + URI);
throw ex;
}
}

View File

@ -3,12 +3,11 @@ package nu.marginalia.wmsa.edge.tools;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.wmsa.configuration.module.DatabaseModule;
import nu.marginalia.wmsa.configuration.server.Context;
import nu.marginalia.wmsa.edge.converting.interpreter.instruction.DocumentKeywords;
import nu.marginalia.wmsa.edge.converting.processor.logic.HtmlFeature;
import nu.marginalia.wmsa.edge.index.client.EdgeIndexClient;
import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.model.EdgeId;
import nu.marginalia.wmsa.edge.model.crawl.EdgePageWordSet;
import nu.marginalia.wmsa.edge.model.crawl.EdgePageWords;
import java.io.IOException;
import java.nio.file.Files;
@ -17,7 +16,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -34,7 +32,6 @@ public class FeaturesLoaderTool {
var linesStream = Files.lines(file)) {
var urls = getUrls(ds);
var wordSet = new EdgePageWordSet(new EdgePageWords(IndexBlock.Meta, List.of(feature.getKeyword())));
linesStream
.map(urls::get)
.filter(Objects::nonNull)
@ -51,8 +48,9 @@ public class FeaturesLoaderTool {
throw new RuntimeException(ex);
}
client.putWords(Context.internal(), new EdgeId<>(domainId), new EdgeId<>(urlId), wordSet, 0)
.blockingSubscribe();
client.putWords(Context.internal(), new EdgeId<>(domainId), new EdgeId<>(urlId),
new DocumentKeywords(IndexBlock.Meta, feature.getKeyword())
, 0);
});
} catch (IOException e) {