(loader) Optimize loader by using zstd's direct streaming writer and the Murmur3_128 string hash

This commit is contained in:
Viktor Lofgren 2023-08-01 15:00:15 +02:00
parent 86a5cc5c5f
commit ea66195b97
11 changed files with 124 additions and 69 deletions

View File

@ -64,7 +64,6 @@ class ForwardIndexConverterTest {
keywordLexicon.commitToDisk();
writer.forceWrite();
writer.close();

View File

@ -8,7 +8,7 @@ import java.util.Iterator;
public class IndexJournalEntryData implements Iterable<IndexJournalEntryData.Record> {
private final int size;
private final long[] underlyingArray;
public final long[] underlyingArray;
public static final int MAX_LENGTH = 1000;
public static final int ENTRY_SIZE = 2;
@ -23,11 +23,6 @@ public class IndexJournalEntryData implements Iterable<IndexJournalEntryData.Rec
this.underlyingArray = underlyingArray;
}
public void write(DataOutputStream dos) throws IOException {
for (int i = 0; i < size; i++) {
dos.writeLong(underlyingArray[i]);
}
}
public long get(int idx) {
if (idx >= size)
throw new ArrayIndexOutOfBoundsException();
@ -37,7 +32,6 @@ public class IndexJournalEntryData implements Iterable<IndexJournalEntryData.Rec
public int size() {
return size;
}
public long[] toArray() {
if (size == underlyingArray.length)
return underlyingArray;

View File

@ -12,9 +12,6 @@ public interface IndexJournalWriter {
put(entry.header(), entry.data());
}
void forceWrite() throws IOException;
void flushWords();
void close() throws IOException;
}

View File

@ -1,5 +1,6 @@
package nu.marginalia.index.journal.writer;
import com.github.luben.zstd.ZstdDirectBufferCompressingStream;
import com.github.luben.zstd.ZstdOutputStream;
import lombok.SneakyThrows;
import nu.marginalia.index.journal.model.IndexJournalEntryData;
@ -10,65 +11,105 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class IndexJournalWriterImpl implements IndexJournalWriter{
private final KeywordLexicon lexicon;
private final Path outputFile;
private final DataOutputStream outputStream;
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final int ZSTD_BUFFER_SIZE = 8192;
private static final int DATA_BUFFER_SIZE = 8192;
private final ByteBuffer dataBuffer = ByteBuffer.allocateDirect(DATA_BUFFER_SIZE);
private final ZstdDirectBufferCompressingStream compressingStream;
private int numEntries = 0;
private final FileChannel fileChannel;
public IndexJournalWriterImpl(KeywordLexicon lexicon, Path outputFile) throws IOException {
this.lexicon = lexicon;
this.outputFile = outputFile;
var fileStream = Files.newOutputStream(outputFile, StandardOpenOption.CREATE,
fileChannel = FileChannel.open(outputFile, StandardOpenOption.CREATE,
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
writeHeaderPlaceholder(fileStream);
writeHeaderPlaceholder(fileChannel);
compressingStream = new ZstdDirectBufferCompressingStream(ByteBuffer.allocateDirect(ZSTD_BUFFER_SIZE), 3) {
protected ByteBuffer flushBuffer(ByteBuffer toFlush) throws IOException {
toFlush.flip();
while (toFlush.hasRemaining()) {
fileChannel.write(toFlush);
}
toFlush.clear();
outputStream = new DataOutputStream(new ZstdOutputStream(new BufferedOutputStream(fileStream)));
return toFlush;
}
};
}
private static void writeHeaderPlaceholder(OutputStream fileStream) throws IOException {
fileStream.write(new byte[IndexJournalReader.FILE_HEADER_SIZE_BYTES]);
private static void writeHeaderPlaceholder(FileChannel fileStream) throws IOException {
var buffer = ByteBuffer.allocate(IndexJournalReader.FILE_HEADER_SIZE_BYTES);
buffer.position(0);
buffer.limit(buffer.capacity());
while (buffer.hasRemaining())
fileStream.write(buffer, buffer.position());
fileStream.position(IndexJournalReader.FILE_HEADER_SIZE_BYTES);
}
@Override
@SneakyThrows
public void put(IndexJournalEntryHeader header, IndexJournalEntryData entry) {
outputStream.writeInt(entry.size());
outputStream.writeInt(0);
outputStream.writeLong(header.combinedId());
outputStream.writeLong(header.documentMeta());
entry.write(outputStream);
public synchronized void put(IndexJournalEntryHeader header, IndexJournalEntryData entry) {
if (dataBuffer.capacity() - dataBuffer.position() < 3*8) {
dataBuffer.flip();
compressingStream.compress(dataBuffer);
dataBuffer.clear();
}
dataBuffer.putInt(entry.size());
dataBuffer.putInt(0);
dataBuffer.putLong(header.combinedId());
dataBuffer.putLong(header.documentMeta());
for (int i = 0; i < entry.size(); ) {
int remaining = (dataBuffer.capacity() - dataBuffer.position()) / 8;
if (remaining <= 0) {
dataBuffer.flip();
compressingStream.compress(dataBuffer);
dataBuffer.clear();
}
else while (remaining-- > 0 && i < entry.size()) {
dataBuffer.putLong(entry.underlyingArray[i++]);
}
}
numEntries++;
}
@Override
public void forceWrite() throws IOException {
outputStream.flush();
try (var raf = new RandomAccessFile(outputFile.toFile(), "rws")) {
raf.writeLong(numEntries);
raf.writeLong(lexicon.size());
}
}
@Override
public void flushWords() {
lexicon.commitToDisk();
}
public void close() throws IOException {
forceWrite();
dataBuffer.flip();
compressingStream.compress(dataBuffer);
dataBuffer.clear();
compressingStream.flush();
compressingStream.close();
outputStream.close();
// Finalize the file by writing a header
ByteBuffer header = ByteBuffer.allocate(16);
header.putLong(numEntries);
header.putLong(lexicon.size());
header.flip();
while (header.position() < header.limit()) {
fileChannel.write(header, header.position());
}
fileChannel.close();
}
}

View File

@ -41,7 +41,6 @@ public class IndexJournalTest {
.add(5, 5)
.add(6, 6)
.build());
journalWriter.forceWrite();
journalWriter.close();
reader = new IndexJournalReaderSingleCompressedFile(tempFile);

View File

@ -76,7 +76,7 @@ class ReverseIndexFullConverterTest2 {
keywordLexicon.commitToDisk();
Thread.sleep(1000);
writer.forceWrite();
writer.close();
var reader = new IndexJournalReaderSingleCompressedFile(indexFile);

View File

@ -76,7 +76,7 @@ class ReverseIndexPriorityConverterTest2 {
keywordLexicon.commitToDisk();
Thread.sleep(1000);
writer.forceWrite();
writer.close();
var reader = new IndexJournalReaderSingleCompressedFile(indexFile);

View File

@ -31,7 +31,7 @@ dependencies {
implementation project(':code:features-index:lexicon')
implementation project(':code:features-index:index-journal')
implementation project(':code:libraries:language-processing')
implementation project(':third-party:commons-codec')
testImplementation project(':code:services-core:search-service')
implementation project(':code:process-models:crawling-model')

View File

@ -16,7 +16,7 @@ public class IndexLoadKeywords implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(IndexLoadKeywords.class);
private final LinkedBlockingQueue<InsertTask> insertQueue = new LinkedBlockingQueue<>(32);
private final LoaderIndexJournalWriter client;
private final LoaderIndexJournalWriter journalWriter;
private record InsertTask(int urlId, int domainId, DocumentMetadata metadata, DocumentKeywords wordSet) {}
@ -25,8 +25,8 @@ public class IndexLoadKeywords implements Runnable {
private volatile boolean canceled = false;
@Inject
public IndexLoadKeywords(LoaderIndexJournalWriter client) {
this.client = client;
public IndexLoadKeywords(LoaderIndexJournalWriter journalWriter) {
this.journalWriter = journalWriter;
runThread = new Thread(this, getClass().getSimpleName());
runThread.start();
}
@ -36,7 +36,7 @@ public class IndexLoadKeywords implements Runnable {
while (!canceled) {
var data = insertQueue.poll(1, TimeUnit.SECONDS);
if (data != null) {
client.putWords(new EdgeId<>(data.domainId), new EdgeId<>(data.urlId), data.metadata(), data.wordSet);
journalWriter.putWords(new EdgeId<>(data.domainId), new EdgeId<>(data.urlId), data.metadata(), data.wordSet);
}
}
}
@ -45,7 +45,7 @@ public class IndexLoadKeywords implements Runnable {
if (!canceled) {
canceled = true;
runThread.join();
client.close();
journalWriter.close();
}
}

View File

@ -2,6 +2,7 @@ package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.dict.OffHeapDictionaryHashMap;
@ -25,6 +26,7 @@ import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermissions;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.concurrent.*;
@Singleton
public class LoaderIndexJournalWriter {
@ -51,6 +53,12 @@ public class LoaderIndexJournalWriter {
indexWriter = new IndexJournalWriterImpl(lexicon, indexPath);
}
private final LinkedBlockingQueue<Runnable> keywordInsertTaskQueue =
new LinkedBlockingQueue<>(65536);
private final ExecutorService keywordInsertionExecutor =
new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES, keywordInsertTaskQueue);
@SneakyThrows
public void putWords(EdgeId<EdgeDomain> domain, EdgeId<EdgeUrl> url,
DocumentMetadata metadata,
DocumentKeywords wordSet) {
@ -62,16 +70,29 @@ public class LoaderIndexJournalWriter {
return;
}
// Due to the very bursty access patterns of this method, doing the actual insertions in separate threads
// with a chonky work queue is a fairly decent improvement
for (var chunk : KeywordListChunker.chopList(wordSet, IndexJournalEntryData.MAX_LENGTH)) {
var entry = new IndexJournalEntryData(getOrInsertWordIds(chunk.keywords(), chunk.metadata()));
var header = new IndexJournalEntryHeader(domain, url, metadata.encode());
indexWriter.put(header, entry);
try {
keywordInsertionExecutor.submit(() -> loadWords(domain, url, metadata, chunk));
}
catch (RejectedExecutionException ex) {
loadWords(domain, url, metadata, chunk);
}
}
}
private void loadWords(EdgeId<EdgeDomain> domain,
EdgeId<EdgeUrl> url,
DocumentMetadata metadata,
DocumentKeywords wordSet) {
var entry = new IndexJournalEntryData(getOrInsertWordIds(wordSet.keywords(), wordSet.metadata()));
var header = new IndexJournalEntryHeader(domain, url, metadata.encode());
indexWriter.put(header, entry);
}
private long[] getOrInsertWordIds(String[] words, long[] meta) {
long[] ids = new long[words.length*2];
int putIdx = 0;
@ -93,6 +114,10 @@ public class LoaderIndexJournalWriter {
}
public void close() throws Exception {
keywordInsertionExecutor.shutdown();
while (!keywordInsertionExecutor.awaitTermination(1, TimeUnit.DAYS)) {
// ...?
}
indexWriter.close();
lexicon.close();
}

View File

@ -1,15 +1,13 @@
package nu.marginalia.loading.loader;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.hash.MurmurHash3_128;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashSet;
@ -26,6 +24,7 @@ public class SqlLoadUrls {
public SqlLoadUrls(HikariDataSource dataSource) {
this.dataSource = dataSource;
}
private final MurmurHash3_128 murmurHash = new MurmurHash3_128();
public void load(LoaderData data, EdgeUrl[] urls) {
Set<EdgeDomain> affectedDomains = new HashSet<>();
@ -52,6 +51,7 @@ public class SqlLoadUrls {
for (var url : urls) {
if (data.getUrlId(url) != 0)
continue;
if (url.path.length() >= 255) {
logger.info("Skipping bad URL {}", url);
continue;
@ -114,16 +114,16 @@ public class SqlLoadUrls {
}
}
private static final HashFunction murmur3_128 = Hashing.murmur3_128();
/* We use a uniqueness constraint on DOMAIN_ID and this hash instead of on the PATH and PARAM
* fields as the uniqueness index grows absurdly large for some reason, possibly due to the prevalent
* shared leading substrings in paths?
*/
private long hashPath(String path, String queryParam) {
long pathHash = murmur3_128.hashString(path, StandardCharsets.UTF_8).padToLong();
if (queryParam == null) {
return pathHash;
}
else {
return pathHash + murmur3_128.hashString(queryParam, StandardCharsets.UTF_8).padToLong();
long hash = murmurHash.hashNearlyASCII(path);
if (queryParam != null) {
hash ^= murmurHash.hashNearlyASCII(queryParam);
}
return hash;
}
/** Loads urlIDs for the domain into `data` from the database, starting at URL ID minId. */
@ -131,11 +131,11 @@ public class SqlLoadUrls {
try (var conn = dataSource.getConnection();
var queryCall = conn.prepareStatement("SELECT ID, PROTO, PATH, PARAM FROM EC_URL WHERE DOMAIN_ID=? AND ID > ?")) {
queryCall.setFetchSize(1000);
queryCall.setInt(1, data.getDomainId(domain));
queryCall.setInt(2, minId);
var rsp = queryCall.executeQuery();
rsp.setFetchSize(1000);
while (rsp.next()) {
int urlId = rsp.getInt(1);