(journal) Fixing journal encoding

Adjusting some bit widths for entry and record sizes to ensure these don't overflow, as this would corrupt the written journal.
This commit is contained in:
Viktor Lofgren 2024-06-24 13:56:27 +02:00
parent fff2ce5721
commit b798f28443
6 changed files with 30 additions and 243 deletions

View File

@ -22,19 +22,14 @@ public class IndexJournalReadEntry implements Iterable<IndexJournalEntryTermData
this.initialPos = buffer.position();
}
static ThreadLocal<ByteBuffer> pool = ThreadLocal.withInitial(() -> ByteBuffer.allocate(8*65536));
public static IndexJournalReadEntry read(DataInputStream inputStream) throws IOException {
final long sizeBlock = inputStream.readLong();
final int entrySize = (int) (sizeBlock >>> 48L);
final int docSize = (int) ((sizeBlock >>> 32L) & 0xFFFFL);
final int docFeatures = (int) (sizeBlock & 0xFFFF_FFFFL);
final int entrySize = (inputStream.readShort() & 0xFFFF);
final int docSize = inputStream.readShort();
final int docFeatures = inputStream.readInt();
final long docId = inputStream.readLong();
final long meta = inputStream.readLong();
var header = new IndexJournalEntryHeader(
entrySize,
docFeatures,
@ -42,12 +37,9 @@ public class IndexJournalReadEntry implements Iterable<IndexJournalEntryTermData
docId,
meta);
var workArea = pool.get();
inputStream.readFully(workArea.array(), 0, header.entrySize());
workArea.position(0);
workArea.limit(header.entrySize());
return new IndexJournalReadEntry(header, workArea);
byte[] buffer = new byte[entrySize];
inputStream.readFully(buffer);
return new IndexJournalReadEntry(header, ByteBuffer.wrap(buffer));
}
public long docId() {
@ -100,7 +92,7 @@ class TermDataIterator implements Iterator<IndexJournalEntryTermData> {
long meta = buffer.getShort();
// read the size of the sequence data
int size = buffer.get() & 0xFF;
int size = buffer.getShort() & 0xFFFF;
// slice the buffer to get the sequence data
var slice = buffer.slice(buffer.position(), size);

View File

@ -13,7 +13,7 @@ public interface IndexJournalReader {
int FILE_HEADER_SIZE_BYTES = 8 * FILE_HEADER_SIZE_LONGS;
int DOCUMENT_HEADER_SIZE_BYTES = 24;
int TERM_HEADER_SIZE_BYTES = 11;
int TERM_HEADER_SIZE_BYTES = 12;
/** Create a reader for a single file. */
static IndexJournalReader singleFile(Path fileName) throws IOException {

View File

@ -2,10 +2,10 @@ package nu.marginalia.index.journal.writer;
import com.github.luben.zstd.ZstdDirectBufferCompressingStream;
import lombok.SneakyThrows;
import nu.marginalia.hash.MurmurHash3_128;
import nu.marginalia.index.journal.model.IndexJournalEntryHeader;
import nu.marginalia.index.journal.model.IndexJournalEntryData;
import nu.marginalia.index.journal.reader.IndexJournalReader;
import nu.marginalia.sequence.GammaCodedSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,10 +20,8 @@ import java.nio.file.attribute.PosixFilePermissions;
/** IndexJournalWriter implementation that creates a single journal file */
public class IndexJournalWriterSingleFileImpl implements IndexJournalWriter{
private static final int ZSTD_BUFFER_SIZE = 8192;
private static final int DATA_BUFFER_SIZE = 8192;
private final MurmurHash3_128 hasher = new MurmurHash3_128();
private static final int ZSTD_BUFFER_SIZE = 1<<16;
private static final int DATA_BUFFER_SIZE = 1<<16;
private final ByteBuffer dataBuffer = ByteBuffer.allocateDirect(DATA_BUFFER_SIZE);
@ -83,51 +81,50 @@ public class IndexJournalWriterSingleFileImpl implements IndexJournalWriter{
{
final long[] keywords = data.termIds();
final long[] metadata = data.metadata();
final var positions = data.positions();
final GammaCodedSequence[] positions = data.positions();
int recordSize = 0; // document header size is 3 longs
for (int i = 0; i < keywords.length; i++) {
// term header size is 2 longs
recordSize += IndexJournalReader.TERM_HEADER_SIZE_BYTES + positions[i].bufferSize();
int entrySize = 0;
for (var position : positions) {
entrySize += IndexJournalReader.TERM_HEADER_SIZE_BYTES + position.bufferSize();
}
int totalSize = IndexJournalReader.DOCUMENT_HEADER_SIZE_BYTES + entrySize;
if (recordSize > Short.MAX_VALUE) {
if (entrySize > DATA_BUFFER_SIZE) {
// This should never happen, but if it does, we should log it and deal with it in a way that doesn't corrupt the file
// (32 KB is *a lot* of data for a single document, larger than the uncompressed HTML of most documents)
logger.error("Omitting entry: Record size {} exceeds maximum representable size of {}", recordSize, Short.MAX_VALUE);
// (64 KB is *a lot* of data for a single document, larger than the uncompressed HTML in like the 95%th percentile of web pages)
logger.error("Omitting entry: Record size {} exceeds maximum representable size of {}", entrySize, DATA_BUFFER_SIZE);
return 0;
}
if (dataBuffer.capacity() - dataBuffer.position() < 3*8) {
if (dataBuffer.remaining() < totalSize) {
dataBuffer.flip();
compressingStream.compress(dataBuffer);
dataBuffer.clear();
}
dataBuffer.putShort((short) recordSize);
if (dataBuffer.remaining() < totalSize) {
logger.error("Omitting entry: Record size {} exceeds buffer size of {}", totalSize, dataBuffer.capacity());
return 0;
}
assert entrySize < (1 << 16) : "Entry size must not exceed USHORT_MAX";
dataBuffer.putShort((short) entrySize);
dataBuffer.putShort((short) Math.clamp(header.documentSize(), 0, Short.MAX_VALUE));
dataBuffer.putInt(header.documentFeatures());
dataBuffer.putLong(header.combinedId());
dataBuffer.putLong(header.documentMeta());
for (int i = 0; i < keywords.length; i++) {
int requiredSize = IndexJournalReader.TERM_HEADER_SIZE_BYTES + positions[i].bufferSize();
if (dataBuffer.capacity() - dataBuffer.position() < requiredSize) {
dataBuffer.flip();
compressingStream.compress(dataBuffer);
dataBuffer.clear();
}
dataBuffer.putLong(keywords[i]);
dataBuffer.putShort((short) metadata[i]);
dataBuffer.put((byte) positions[i].bufferSize());
dataBuffer.putShort((short) positions[i].bufferSize());
dataBuffer.put(positions[i].buffer());
}
numEntries++;
return recordSize;
return totalSize;
}
public void close() throws IOException {

View File

@ -1,68 +0,0 @@
package nu.marginalia.index.journal;
import nu.marginalia.index.journal.reader.IndexJournalReader;
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile;
import nu.marginalia.index.journal.writer.IndexJournalWriterSingleFileImpl;
import nu.marginalia.model.id.UrlIdCodec;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class IndexJournalTest {
// Path tempFile;
// IndexJournalReader reader;
//
// long firstDocId = UrlIdCodec.encodeId(44, 10);
// long secondDocId = UrlIdCodec.encodeId(43, 15);
//
// @BeforeEach
// public void setUp() throws IOException {
// tempFile = Files.createTempFile(getClass().getSimpleName(), ".dat");
//
// var journalWriter = new IndexJournalWriterSingleFileImpl( tempFile);
// journalWriter.put(IndexJournalEntry.builder(44, 10, 55)
// .add(1, 2)
// .add(2, 3)
// .add(3, 4)
// .add(5, 6).build());
//
// journalWriter.put(IndexJournalEntry.builder(43, 15, 10)
// .add(5, 5)
// .add(6, 6)
// .build());
// journalWriter.close();
//
// reader = new IndexJournalReaderSingleFile(tempFile);
// }
// @AfterEach
// public void tearDown() throws IOException {
// Files.delete(tempFile);
// }
//
// @Test
// public void forEachDocId() {
// List<Long> expected = List.of(firstDocId, secondDocId);
// List<Long> actual = new ArrayList<>();
//
// reader.forEachDocId(actual::add);
// assertEquals(expected, actual);
// }
//
// @Test
// public void forEachWordId() {
// List<Integer> expected = List.of(1, 2, 3, 5, 5 ,6);
// List<Integer> actual = new ArrayList<>();
//
// reader.forEachWordId(i -> actual.add((int) i));
// assertEquals(expected, actual);
// }
}

View File

@ -10,7 +10,6 @@ import nu.marginalia.index.journal.model.IndexJournalEntryTermData;
import nu.marginalia.index.journal.reader.IndexJournalReaderPagingImpl;
import nu.marginalia.index.journal.writer.IndexJournalWriterSingleFileImpl;
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.model.idx.WordFlags;

View File

@ -1,133 +0,0 @@
package nu.marginalia.index.journal.reader.pointer;
import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.List;
import java.util.ArrayList;
import static org.junit.jupiter.api.Assertions.assertEquals;
class IndexJournalPointerTest {
//
// @Test
// public void concatenate() {
// MockPointer left = new MockPointer(
// List.of(new MockDocument(1, 2, 3, List.of(
// new MockRecord(4, 5),
// new MockRecord(6, 7))
// ))
// );
//
// MockPointer right = new MockPointer(
// List.of(new MockDocument(8, 9, 10, List.of(
// new MockRecord(11, 12),
// new MockRecord(13, 14))
// ))
// );
//
// IndexJournalPointer concatenated = IndexJournalPointer.concatenate(left, right);
// List<Long> docIdsSeq = new ArrayList<>();
// List<Long> wordIdsSeq = new ArrayList<>();
// while (concatenated.nextDocument()) {
// docIdsSeq.add(concatenated.documentId());
// while (concatenated.nextRecord()) {
// wordIdsSeq.add(concatenated.termId());
// }
// }
//
// assertEquals(docIdsSeq, List.of(1L, 8L));
// assertEquals(wordIdsSeq, List.of(4L, 6L, 11L, 13L));
// }
//
// @Test
// public void filter() {
// MockPointer left = new MockPointer(
// List.of(new MockDocument(1, 2, 3, List.of(
// new MockRecord(1, 1),
// new MockRecord(2, 2),
// new MockRecord(3, 3),
// new MockRecord(4, 4),
// new MockRecord(5, 5)
// )
// ), new MockDocument(2, 2, 3, List.of(
// new MockRecord(1, 1),
// new MockRecord(3, 3),
// new MockRecord(5, 5)
// )
// ))
//
// );
// var filtered = left.filterWordMeta(meta -> (meta % 2) == 0);
//
// List<Long> docIdsSeq = new ArrayList<>();
// List<Long> wordIdsSeq = new ArrayList<>();
// while (filtered.nextDocument()) {
// docIdsSeq.add(filtered.documentId());
// while (filtered.nextRecord()) {
// wordIdsSeq.add(filtered.termId());
// }
// }
//
// assertEquals(docIdsSeq, List.of(1L, 2L));
// assertEquals(wordIdsSeq, List.of(2L, 4L));
// }
//
// class MockPointer implements IndexJournalPointer {
// private final List<MockDocument> documents;
//
// int di = -1;
// int ri;
//
// public MockPointer(Collection<MockDocument> documents) {
// this.documents = new ArrayList<>(documents);
// }
//
// @Override
// public boolean nextDocument() {
// if (++di < documents.size()) {
// ri = -1;
// return true;
// }
//
// return false;
// }
//
// @Override
// public boolean nextRecord() {
// if (++ri < documents.get(di).records.size()) {
// return true;
// }
//
// return false;
// }
//
// @Override
// public long documentId() {
// return documents.get(di).docId;
// }
//
// @Override
// public long documentMeta() {
// return documents.get(di).docMeta;
// }
//
// @Override
// public long termId() {
// return documents.get(di).records.get(ri).termId;
// }
//
// @Override
// public long wordMeta() {
// return documents.get(di).records.get(ri).wordMeta;
// }
//
// @Override
// public int documentFeatures() {
// return documents.get(di).docFeatures;
// }
// }
//
// record MockDocument(long docId, long docMeta, int docFeatures, List<MockRecord> records) {}
// record MockRecord(long termId, long wordMeta) {}
}