diff --git a/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java b/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java index c2411575..33acceea 100644 --- a/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java +++ b/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java @@ -64,7 +64,6 @@ class ForwardIndexConverterTest { keywordLexicon.commitToDisk(); - writer.forceWrite(); writer.close(); diff --git a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/model/IndexJournalEntryData.java b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/model/IndexJournalEntryData.java index 423626ce..f24be823 100644 --- a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/model/IndexJournalEntryData.java +++ b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/model/IndexJournalEntryData.java @@ -8,7 +8,7 @@ import java.util.Iterator; public class IndexJournalEntryData implements Iterable { private final int size; - private final long[] underlyingArray; + public final long[] underlyingArray; public static final int MAX_LENGTH = 1000; public static final int ENTRY_SIZE = 2; @@ -23,11 +23,6 @@ public class IndexJournalEntryData implements Iterable= size) throw new ArrayIndexOutOfBoundsException(); @@ -37,7 +32,6 @@ public class IndexJournalEntryData implements Iterable 0 && i < entry.size()) { + dataBuffer.putLong(entry.underlyingArray[i++]); + } + } numEntries++; } - @Override - public void forceWrite() throws IOException { - outputStream.flush(); - - try (var raf = new RandomAccessFile(outputFile.toFile(), "rws")) { - raf.writeLong(numEntries); - raf.writeLong(lexicon.size()); - } - } - - @Override - public void flushWords() { - lexicon.commitToDisk(); - } - public void close() throws IOException { - forceWrite(); + dataBuffer.flip(); + compressingStream.compress(dataBuffer); + dataBuffer.clear(); + compressingStream.flush(); + compressingStream.close(); - outputStream.close(); + + // Finalize the file by writing a header + + ByteBuffer header = ByteBuffer.allocate(16); + header.putLong(numEntries); + header.putLong(lexicon.size()); + header.flip(); + + while (header.position() < header.limit()) { + fileChannel.write(header, header.position()); + } + + fileChannel.close(); } } diff --git a/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/IndexJournalTest.java b/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/IndexJournalTest.java index 67b23dee..9cb96781 100644 --- a/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/IndexJournalTest.java +++ b/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/IndexJournalTest.java @@ -41,7 +41,6 @@ public class IndexJournalTest { .add(5, 5) .add(6, 6) .build()); - journalWriter.forceWrite(); journalWriter.close(); reader = new IndexJournalReaderSingleCompressedFile(tempFile); diff --git a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java index 4488912b..a99ab674 100644 --- a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java +++ b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java @@ -76,7 +76,7 @@ class ReverseIndexFullConverterTest2 { keywordLexicon.commitToDisk(); Thread.sleep(1000); - writer.forceWrite(); + writer.close(); var reader = new IndexJournalReaderSingleCompressedFile(indexFile); diff --git a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java index d634c175..1f9763c8 100644 --- a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java +++ b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java @@ -76,7 +76,7 @@ class ReverseIndexPriorityConverterTest2 { keywordLexicon.commitToDisk(); Thread.sleep(1000); - writer.forceWrite(); + writer.close(); var reader = new IndexJournalReaderSingleCompressedFile(indexFile); diff --git a/code/processes/loading-process/build.gradle b/code/processes/loading-process/build.gradle index d204247d..0a89c350 100644 --- a/code/processes/loading-process/build.gradle +++ b/code/processes/loading-process/build.gradle @@ -31,7 +31,7 @@ dependencies { implementation project(':code:features-index:lexicon') implementation project(':code:features-index:index-journal') implementation project(':code:libraries:language-processing') - + implementation project(':third-party:commons-codec') testImplementation project(':code:services-core:search-service') implementation project(':code:process-models:crawling-model') diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/IndexLoadKeywords.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/IndexLoadKeywords.java index dd627f85..7374c0a3 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/IndexLoadKeywords.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/IndexLoadKeywords.java @@ -16,7 +16,7 @@ public class IndexLoadKeywords implements Runnable { private static final Logger logger = LoggerFactory.getLogger(IndexLoadKeywords.class); private final LinkedBlockingQueue insertQueue = new LinkedBlockingQueue<>(32); - private final LoaderIndexJournalWriter client; + private final LoaderIndexJournalWriter journalWriter; private record InsertTask(int urlId, int domainId, DocumentMetadata metadata, DocumentKeywords wordSet) {} @@ -25,8 +25,8 @@ public class IndexLoadKeywords implements Runnable { private volatile boolean canceled = false; @Inject - public IndexLoadKeywords(LoaderIndexJournalWriter client) { - this.client = client; + public IndexLoadKeywords(LoaderIndexJournalWriter journalWriter) { + this.journalWriter = journalWriter; runThread = new Thread(this, getClass().getSimpleName()); runThread.start(); } @@ -36,7 +36,7 @@ public class IndexLoadKeywords implements Runnable { while (!canceled) { var data = insertQueue.poll(1, TimeUnit.SECONDS); if (data != null) { - client.putWords(new EdgeId<>(data.domainId), new EdgeId<>(data.urlId), data.metadata(), data.wordSet); + journalWriter.putWords(new EdgeId<>(data.domainId), new EdgeId<>(data.urlId), data.metadata(), data.wordSet); } } } @@ -45,7 +45,7 @@ public class IndexLoadKeywords implements Runnable { if (!canceled) { canceled = true; runThread.join(); - client.close(); + journalWriter.close(); } } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java index 14962f9b..87b00192 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java @@ -2,6 +2,7 @@ package nu.marginalia.loading.loader; import com.google.inject.Inject; import com.google.inject.Singleton; +import lombok.SneakyThrows; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.dict.OffHeapDictionaryHashMap; @@ -25,6 +26,7 @@ import java.nio.file.Files; import java.nio.file.attribute.PosixFilePermissions; import java.sql.SQLException; import java.util.Arrays; +import java.util.concurrent.*; @Singleton public class LoaderIndexJournalWriter { @@ -51,6 +53,12 @@ public class LoaderIndexJournalWriter { indexWriter = new IndexJournalWriterImpl(lexicon, indexPath); } + private final LinkedBlockingQueue keywordInsertTaskQueue = + new LinkedBlockingQueue<>(65536); + private final ExecutorService keywordInsertionExecutor = + new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES, keywordInsertTaskQueue); + + @SneakyThrows public void putWords(EdgeId domain, EdgeId url, DocumentMetadata metadata, DocumentKeywords wordSet) { @@ -62,16 +70,29 @@ public class LoaderIndexJournalWriter { return; } + // Due to the very bursty access patterns of this method, doing the actual insertions in separate threads + // with a chonky work queue is a fairly decent improvement for (var chunk : KeywordListChunker.chopList(wordSet, IndexJournalEntryData.MAX_LENGTH)) { - - var entry = new IndexJournalEntryData(getOrInsertWordIds(chunk.keywords(), chunk.metadata())); - var header = new IndexJournalEntryHeader(domain, url, metadata.encode()); - - indexWriter.put(header, entry); + try { + keywordInsertionExecutor.submit(() -> loadWords(domain, url, metadata, chunk)); + } + catch (RejectedExecutionException ex) { + loadWords(domain, url, metadata, chunk); + } } } + private void loadWords(EdgeId domain, + EdgeId url, + DocumentMetadata metadata, + DocumentKeywords wordSet) { + var entry = new IndexJournalEntryData(getOrInsertWordIds(wordSet.keywords(), wordSet.metadata())); + var header = new IndexJournalEntryHeader(domain, url, metadata.encode()); + + indexWriter.put(header, entry); + } + private long[] getOrInsertWordIds(String[] words, long[] meta) { long[] ids = new long[words.length*2]; int putIdx = 0; @@ -93,6 +114,10 @@ public class LoaderIndexJournalWriter { } public void close() throws Exception { + keywordInsertionExecutor.shutdown(); + while (!keywordInsertionExecutor.awaitTermination(1, TimeUnit.DAYS)) { + // ...? + } indexWriter.close(); lexicon.close(); } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java index 922baf91..4ef1509e 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java @@ -1,15 +1,13 @@ package nu.marginalia.loading.loader; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; import com.google.inject.Inject; import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.hash.MurmurHash3_128; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.sql.Types; import java.util.HashSet; @@ -26,6 +24,7 @@ public class SqlLoadUrls { public SqlLoadUrls(HikariDataSource dataSource) { this.dataSource = dataSource; } + private final MurmurHash3_128 murmurHash = new MurmurHash3_128(); public void load(LoaderData data, EdgeUrl[] urls) { Set affectedDomains = new HashSet<>(); @@ -52,6 +51,7 @@ public class SqlLoadUrls { for (var url : urls) { if (data.getUrlId(url) != 0) continue; + if (url.path.length() >= 255) { logger.info("Skipping bad URL {}", url); continue; @@ -114,16 +114,16 @@ public class SqlLoadUrls { } } - private static final HashFunction murmur3_128 = Hashing.murmur3_128(); + /* We use a uniqueness constraint on DOMAIN_ID and this hash instead of on the PATH and PARAM + * fields as the uniqueness index grows absurdly large for some reason, possibly due to the prevalent + * shared leading substrings in paths? + */ private long hashPath(String path, String queryParam) { - long pathHash = murmur3_128.hashString(path, StandardCharsets.UTF_8).padToLong(); - - if (queryParam == null) { - return pathHash; - } - else { - return pathHash + murmur3_128.hashString(queryParam, StandardCharsets.UTF_8).padToLong(); + long hash = murmurHash.hashNearlyASCII(path); + if (queryParam != null) { + hash ^= murmurHash.hashNearlyASCII(queryParam); } + return hash; } /** Loads urlIDs for the domain into `data` from the database, starting at URL ID minId. */ @@ -131,11 +131,11 @@ public class SqlLoadUrls { try (var conn = dataSource.getConnection(); var queryCall = conn.prepareStatement("SELECT ID, PROTO, PATH, PARAM FROM EC_URL WHERE DOMAIN_ID=? AND ID > ?")) { + queryCall.setFetchSize(1000); queryCall.setInt(1, data.getDomainId(domain)); queryCall.setInt(2, minId); var rsp = queryCall.executeQuery(); - rsp.setFetchSize(1000); while (rsp.next()) { int urlId = rsp.getInt(1);