diff --git a/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexConverter.java b/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexConverter.java index edba7da6..a1b59b07 100644 --- a/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexConverter.java +++ b/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexConverter.java @@ -4,7 +4,7 @@ import com.upserve.uppend.blobs.NativeIO; import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; import nu.marginalia.index.journal.reader.IndexJournalReader; import nu.marginalia.array.LongArray; -import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile; +import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.model.idx.DocumentMetadata; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.ranking.DomainRankings; @@ -13,7 +13,6 @@ import org.roaringbitmap.longlong.Roaring64Bitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -74,15 +73,19 @@ public class ForwardIndexConverter { LongArray docFileData = LongArray.mmapForWriting(outputFileDocsData, ForwardIndexParameters.ENTRY_SIZE * docsFileId.size()); - journalReader.forEach(entry -> { - long entryOffset = (long) ForwardIndexParameters.ENTRY_SIZE * docIdToIdx.get(entry.docId()); + var pointer = journalReader.newPointer(); + while (pointer.nextDocument()) { + long docId = pointer.documentId(); + int domainId = UrlIdCodec.getDomainId(docId); - int ranking = domainRankings.getRanking(entry.domainId()); - long meta = DocumentMetadata.encodeRank(entry.docMeta(), ranking); + long entryOffset = (long) ForwardIndexParameters.ENTRY_SIZE * docIdToIdx.get(docId); + + int ranking = domainRankings.getRanking(domainId); + long meta = DocumentMetadata.encodeRank(pointer.documentMeta(), ranking); docFileData.set(entryOffset + ForwardIndexParameters.METADATA_OFFSET, meta); - docFileData.set(entryOffset + ForwardIndexParameters.FEATURES_OFFSET, entry.header.documentFeatures()); - }); + docFileData.set(entryOffset + ForwardIndexParameters.FEATURES_OFFSET, pointer.documentFeatures()); + } progress.progress(TaskSteps.FORCE); diff --git a/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java b/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java index b3485475..062d3716 100644 --- a/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java +++ b/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java @@ -2,19 +2,16 @@ package nu.marginalia.index.forward; import lombok.SneakyThrows; import nu.marginalia.index.journal.model.IndexJournalEntry; -import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile; +import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile; import nu.marginalia.index.journal.writer.IndexJournalWriter; import nu.marginalia.index.journal.writer.IndexJournalWriterSingleFileImpl; import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.process.control.FakeProcessHeartbeat; -import nu.marginalia.process.control.ProcessHeartbeatImpl; -import nu.marginalia.process.control.ProcessTaskHeartbeatImpl; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.test.TestUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +21,6 @@ import java.nio.file.Path; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.when; class ForwardIndexConverterTest { @@ -96,7 +92,7 @@ class ForwardIndexConverterTest { @Test void testForwardIndex() throws IOException { - new ForwardIndexConverter(new FakeProcessHeartbeat(), new IndexJournalReaderSingleCompressedFile(indexFile), docsFileId, docsFileData, new DomainRankings()).convert(); + new ForwardIndexConverter(new FakeProcessHeartbeat(), new IndexJournalReaderSingleFile(indexFile), docsFileId, docsFileData, new DomainRankings()).convert(); var forwardReader = new ForwardIndexReader(docsFileId, docsFileData); diff --git a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReader.java b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReader.java index 7e574dbe..7f06c588 100644 --- a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReader.java +++ b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReader.java @@ -1,65 +1,70 @@ package nu.marginalia.index.journal.reader; -import nu.marginalia.index.journal.model.IndexJournalEntryData; +import nu.marginalia.index.journal.reader.pointer.IndexJournalPointer; import nu.marginalia.model.idx.WordFlags; -import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.nio.file.Path; -import java.util.Iterator; import java.util.function.LongConsumer; -import java.util.function.Predicate; +import java.util.function.LongPredicate; -public interface IndexJournalReader extends Iterable { +public interface IndexJournalReader { int FILE_HEADER_SIZE_LONGS = 2; int FILE_HEADER_SIZE_BYTES = 8 * FILE_HEADER_SIZE_LONGS; static IndexJournalReader singleFile(Path fileName) throws IOException { - return new IndexJournalReaderSingleCompressedFile(fileName); + return new IndexJournalReaderSingleFile(fileName); } static IndexJournalReader paging(Path baseDir) throws IOException { return new IndexJournalReaderPagingImpl(baseDir); } - - static IndexJournalReader singleFileWithPriorityFilters(Path path) throws IOException { - - long highPriorityFlags = - WordFlags.Title.asBit() - | WordFlags.Subjects.asBit() - | WordFlags.TfIdfHigh.asBit() - | WordFlags.NamesWords.asBit() - | WordFlags.UrlDomain.asBit() - | WordFlags.UrlPath.asBit() - | WordFlags.Site.asBit() - | WordFlags.SiteAdjacent.asBit(); - - return new IndexJournalReaderSingleCompressedFile(path, null, - r -> (r & highPriorityFlags) != 0); + static IndexJournalReader filteringSingleFile(Path path, LongPredicate wordMetaFilter) throws IOException { + return new IndexJournalReaderSingleFile(path) + .filtering(wordMetaFilter); } - void forEachWordId(LongConsumer consumer); + default void forEachWordId(LongConsumer consumer) { + var ptr = this.newPointer(); + while (ptr.nextDocument()) { + while (ptr.nextRecord()) { + consumer.accept(ptr.wordId()); + } + } + } + default void forEachDocId(LongConsumer consumer) { + var ptr = this.newPointer(); + while (ptr.nextDocument()) { + consumer.accept(ptr.documentId()); + } + } - void forEachDocIdRecord(LongObjectConsumer consumer); - - void forEachDocId(LongConsumer consumer); - - @NotNull - @Override - Iterator iterator(); - - boolean filter(IndexJournalReadEntry entry); - - boolean filter(IndexJournalReadEntry entry, IndexJournalEntryData.Record record); - - boolean filter(IndexJournalReadEntry entry, long metadata); - - void close() throws IOException; + IndexJournalPointer newPointer(); + default IndexJournalReader filtering(LongPredicate termMetaFilter) { + return new FilteringIndexJournalReader(this, termMetaFilter); + } interface LongObjectConsumer { void accept(long left, T right); } } + +class FilteringIndexJournalReader implements IndexJournalReader { + private final IndexJournalReader base; + private final LongPredicate termMetaFilter; + + FilteringIndexJournalReader(IndexJournalReader base, LongPredicate termMetaFilter) { + this.base = base; + this.termMetaFilter = termMetaFilter; + } + + @Override + public IndexJournalPointer newPointer() { + return base + .newPointer() + .filterWordMeta(termMetaFilter); + } +} \ No newline at end of file diff --git a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReaderPagingImpl.java b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReaderPagingImpl.java index 8a80753d..37db0b70 100644 --- a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReaderPagingImpl.java +++ b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReaderPagingImpl.java @@ -1,17 +1,12 @@ package nu.marginalia.index.journal.reader; -import com.google.common.collect.Iterators; -import nu.marginalia.index.journal.model.IndexJournalEntryData; -import nu.marginalia.index.journal.model.IndexJournalStatistics; +import nu.marginalia.index.journal.reader.pointer.IndexJournalPointer; import nu.marginallia.index.journal.IndexJournalFileNames; -import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.function.LongConsumer; public class IndexJournalReaderPagingImpl implements IndexJournalReader { @@ -22,55 +17,16 @@ public class IndexJournalReaderPagingImpl implements IndexJournalReader { this.readers = new ArrayList<>(inputFiles.size()); for (var inputFile : inputFiles) { - readers.add(new IndexJournalReaderSingleCompressedFile(inputFile)); + readers.add(new IndexJournalReaderSingleFile(inputFile)); } } @Override - public void forEachWordId(LongConsumer consumer) { - for (var reader : readers) { - reader.forEachWordId(consumer); - } - } - - @Override - public void forEachDocIdRecord(LongObjectConsumer consumer) { - for (var reader : readers) { - reader.forEachDocIdRecord(consumer); - } - } - - @Override - public void forEachDocId(LongConsumer consumer) { - for (var reader : readers) { - reader.forEachDocId(consumer); - } - } - - @Override - public @NotNull Iterator iterator() { - return Iterators.concat(readers.stream().map(IndexJournalReader::iterator).iterator()); - } - - @Override - public boolean filter(IndexJournalReadEntry entry) { - return readers.get(0).filter(entry); - } - - @Override - public boolean filter(IndexJournalReadEntry entry, IndexJournalEntryData.Record record) { - return readers.get(0).filter(entry, record); - } - - @Override - public boolean filter(IndexJournalReadEntry entry, long metadata) { - return readers.get(0).filter(entry, metadata); - } - - @Override - public void close() throws IOException { - for (var reader : readers) { - reader.close(); - } + public IndexJournalPointer newPointer() { + return IndexJournalPointer.concatenate( + readers.stream() + .map(IndexJournalReader::newPointer) + .toArray(IndexJournalPointer[]::new) + ); } } diff --git a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReaderSingleCompressedFile.java b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReaderSingleCompressedFile.java deleted file mode 100644 index 27739274..00000000 --- a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReaderSingleCompressedFile.java +++ /dev/null @@ -1,169 +0,0 @@ -package nu.marginalia.index.journal.reader; - -import com.github.luben.zstd.ZstdInputStream; -import lombok.SneakyThrows; -import nu.marginalia.index.journal.model.IndexJournalEntryData; -import nu.marginalia.index.journal.model.IndexJournalFileHeader; -import org.jetbrains.annotations.NotNull; - -import java.io.*; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.Iterator; -import java.util.function.LongConsumer; -import java.util.function.Predicate; - -public class IndexJournalReaderSingleCompressedFile implements IndexJournalReader { - - private Path journalFile; - public final IndexJournalFileHeader fileHeader; - - @Override - public String toString() { - return "IndexJournalReaderSingleCompressedFile{" + journalFile + " }"; - } - - private DataInputStream dataInputStream = null; - - final Predicate entryPredicate; - final Predicate metadataPredicate; - - public IndexJournalReaderSingleCompressedFile(Path file) throws IOException { - this.journalFile = file; - - fileHeader = readHeader(file); - - this.metadataPredicate = null; - this.entryPredicate = null; - } - - public IndexJournalReaderSingleCompressedFile(Path file, Predicate entryPredicate, Predicate metadataPredicate) throws IOException { - this.journalFile = file; - - fileHeader = readHeader(file); - - this.metadataPredicate = metadataPredicate; - this.entryPredicate = entryPredicate; - } - - private static IndexJournalFileHeader readHeader(Path file) throws IOException { - try (var raf = new RandomAccessFile(file.toFile(), "r")) { - long unused = raf.readLong(); - long wordCount = raf.readLong(); - - return new IndexJournalFileHeader(unused, wordCount); - } - } - - private static DataInputStream createInputStream(Path file) throws IOException { - var fileInputStream = Files.newInputStream(file, StandardOpenOption.READ); - - // skip the header - fileInputStream.skipNBytes(16); - - return new DataInputStream(new ZstdInputStream(new BufferedInputStream(fileInputStream))); - } - - @Override - public boolean filter(IndexJournalReadEntry entry) { - return entryPredicate == null || entryPredicate.test(entry); - } - - @Override - public boolean filter(IndexJournalReadEntry entry, IndexJournalEntryData.Record record) { - return (entryPredicate == null || entryPredicate.test(entry)) - && (metadataPredicate == null || metadataPredicate.test(record.metadata())); - } - - @Override - public boolean filter(IndexJournalReadEntry entry, long metadata) { - return (entryPredicate == null || entryPredicate.test(entry)) - && (metadataPredicate == null || metadataPredicate.test(metadata)); - } - - public void close() throws IOException { - dataInputStream.close(); - } - - - @Override - public void forEachWordId(LongConsumer consumer) { - for (var entry : this) { - var data = entry.readEntry(); - for (var post : data) { - if (filter(entry, post)) { - consumer.accept(post.wordId()); - } - } - } - } - - @Override - public void forEachDocIdRecord(LongObjectConsumer consumer) { - for (var entry : this) { - var data = entry.readEntry(); - - for (var post : data) { - if (filter(entry, post)) { - consumer.accept(entry.docId(), post); - } - } - } - } - @Override - public void forEachDocId(LongConsumer consumer) { - for (var entry : this) { - if (filter(entry)) { - consumer.accept(entry.docId()); - } - } - } - - @SneakyThrows - @NotNull - @Override - public Iterator iterator() { - if (dataInputStream != null) { - dataInputStream.close(); - } - dataInputStream = createInputStream(journalFile); - - return new JournalEntryIterator(); - } - - private class JournalEntryIterator implements Iterator { - private int i = -1; - private IndexJournalReadEntry next; - - @Override - @SneakyThrows - public boolean hasNext() { - if (next != null) - return true; - - while (++i < fileHeader.fileSize()) { - var entry = IndexJournalReadEntry.read(dataInputStream); - if (filter(entry)) { - next = entry; - return true; - } - } - - return false; - } - - @SneakyThrows - @Override - public IndexJournalReadEntry next() { - if (hasNext()) { - var ret = next; - next = null; - return ret; - } - throw new IllegalStateException(); - } - - } - -} diff --git a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReaderSingleFile.java b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReaderSingleFile.java new file mode 100644 index 00000000..378ab826 --- /dev/null +++ b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/reader/IndexJournalReaderSingleFile.java @@ -0,0 +1,130 @@ +package nu.marginalia.index.journal.reader; + +import com.github.luben.zstd.ZstdInputStream; +import lombok.SneakyThrows; +import nu.marginalia.index.journal.model.IndexJournalEntryData; +import nu.marginalia.index.journal.model.IndexJournalFileHeader; +import nu.marginalia.index.journal.reader.pointer.IndexJournalPointer; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public class IndexJournalReaderSingleFile implements IndexJournalReader { + + private Path journalFile; + public final IndexJournalFileHeader fileHeader; + + @Override + public String toString() { + return "IndexJournalReaderSingleCompressedFile{" + journalFile + " }"; + } + + public IndexJournalReaderSingleFile(Path file) throws IOException { + this.journalFile = file; + + fileHeader = readHeader(file); + } + + private static IndexJournalFileHeader readHeader(Path file) throws IOException { + try (var raf = new RandomAccessFile(file.toFile(), "r")) { + long unused = raf.readLong(); + long wordCount = raf.readLong(); + + return new IndexJournalFileHeader(unused, wordCount); + } + } + + private static DataInputStream createInputStream(Path file) throws IOException { + var fileInputStream = Files.newInputStream(file, StandardOpenOption.READ); + + // skip the header + fileInputStream.skipNBytes(16); + + return new DataInputStream(new ZstdInputStream(new BufferedInputStream(fileInputStream))); + } + + @SneakyThrows + @Override + public IndexJournalPointer newPointer() { + return new SingleFileJournalPointer(fileHeader, createInputStream(journalFile)); + } + +} + +class SingleFileJournalPointer implements IndexJournalPointer { + + private final IndexJournalFileHeader fileHeader; + private final DataInputStream dataInputStream; + private IndexJournalReadEntry entry; + private IndexJournalEntryData entryData; + private int recordIdx = -2; + private int docIdx = -1; + + public SingleFileJournalPointer( + IndexJournalFileHeader fileHeader, + DataInputStream dataInputStream) + { + this.fileHeader = fileHeader; + this.dataInputStream = dataInputStream; + } + + @SneakyThrows + @Override + public boolean nextDocument() { + recordIdx = -2; + entryData = null; + + if (++docIdx < fileHeader.fileSize()) { + entry = IndexJournalReadEntry.read(dataInputStream); + return true; + } + + dataInputStream.close(); + + return false; + } + + @Override + public boolean nextRecord() { + if (entryData == null) { + entryData = entry.readEntry(); + } + + recordIdx += 2; + if (recordIdx < entryData.size()) { + return true; + } + return false; + } + + @Override + public long documentId() { + return entry.docId(); + } + + @Override + public long documentMeta() { + return entry.docMeta(); + } + + @Override + public long wordId() { + return entryData.get(recordIdx); + } + + @Override + public long wordMeta() { + return entryData.get(recordIdx + 1); + } + + @Override + public int documentFeatures() { + if (entryData == null) { + entryData = entry.readEntry(); + } + + return entry.header.documentFeatures(); + } +} \ No newline at end of file diff --git a/code/features-index/index-journal/src/main/java/nu/marginalia/index/journal/reader/pointer/IndexJournalPointer.java b/code/features-index/index-journal/src/main/java/nu/marginalia/index/journal/reader/pointer/IndexJournalPointer.java new file mode 100644 index 00000000..37100335 --- /dev/null +++ b/code/features-index/index-journal/src/main/java/nu/marginalia/index/journal/reader/pointer/IndexJournalPointer.java @@ -0,0 +1,167 @@ +package nu.marginalia.index.journal.reader.pointer; + +import java.util.function.LongPredicate; + +/** + * This is something like a double iterator. The Index Journal consists of + * blocks of words and word-metadata for each document and document metadata. + *
+ * + * Perhaps best conceptualized as something like + * + *
[doc1: word1 word2 word3 word4] [doc2: word1 word2 word3 ]
+ * nextDocument() will move the pointer from doc1 to doc2;
+ * nextRecord() will move the pointer from word1 to word2...
+ */ +public interface IndexJournalPointer { + /** + * Advance to the next document in the journal, + * returning true if such a document exists. + * Resets the record index to before the first + * record (if it exists). + */ + boolean nextDocument(); + + /** + * Advance to the next record in the journal + */ + boolean nextRecord(); + + /** + * Get the id associated with the current document + */ + long documentId(); + + /** + * Get the metadata associated with the current document + */ + long documentMeta(); + + /** + * Get the wordId associated with the current record + */ + long wordId(); + + /** + * Get the termMeta associated with the current record + */ + long wordMeta(); + + /** + * Get the documentFeatures associated with the current record + */ + int documentFeatures(); + + /** Concatenate a number of journal pointers */ + static IndexJournalPointer concatenate(IndexJournalPointer... pointers) { + if (pointers.length == 1) + return pointers[0]; + + return new JoiningJournalPointer(pointers); + } + + /** Add a filter on word metadata to the pointer */ + default IndexJournalPointer filterWordMeta(LongPredicate filter) { + return new FilteringJournalPointer(this, filter); + } +} + +class JoiningJournalPointer implements IndexJournalPointer { + private final IndexJournalPointer[] pointers; + private int pIndex = 0; + + JoiningJournalPointer(IndexJournalPointer[] pointers) { + this.pointers = pointers; + } + + @Override + public boolean nextDocument() { + + while (pIndex < pointers.length) { + if (pointers[pIndex].nextDocument()) + return true; + else pIndex++; + } + + return false; + } + + @Override + public boolean nextRecord() { + return pointers[pIndex].nextRecord(); + } + + @Override + public long documentId() { + return pointers[pIndex].documentId(); + } + + @Override + public long documentMeta() { + return pointers[pIndex].documentMeta(); + } + + @Override + public long wordId() { + return pointers[pIndex].wordId(); + } + + @Override + public long wordMeta() { + return pointers[pIndex].wordMeta(); + } + + @Override + public int documentFeatures() { + return pointers[pIndex].documentFeatures(); + } +} + +class FilteringJournalPointer implements IndexJournalPointer { + private final IndexJournalPointer base; + private final LongPredicate filter; + + FilteringJournalPointer(IndexJournalPointer base, LongPredicate filter) { + this.base = base; + this.filter = filter; + } + + @Override + public boolean nextDocument() { + return base.nextDocument(); + } + + @Override + public boolean nextRecord() { + while (base.nextRecord()) { + if (filter.test(wordMeta())) + return true; + } + return false; + } + + @Override + public long documentId() { + return base.documentId(); + } + + @Override + public long documentMeta() { + return base.documentMeta(); + } + + @Override + public long wordId() { + return base.wordId(); + } + + @Override + public long wordMeta() { + return base.wordMeta(); + } + + @Override + public int documentFeatures() { + return base.documentFeatures(); + } +} \ No newline at end of file diff --git a/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/IndexJournalTest.java b/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/IndexJournalTest.java index 23814556..47e8ac7f 100644 --- a/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/IndexJournalTest.java +++ b/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/IndexJournalTest.java @@ -1,12 +1,10 @@ package nu.marginalia.index.journal; import nu.marginalia.index.journal.model.IndexJournalEntry; -import nu.marginalia.index.journal.model.IndexJournalEntryData; import nu.marginalia.index.journal.reader.IndexJournalReader; -import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile; +import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile; import nu.marginalia.index.journal.writer.IndexJournalWriterSingleFileImpl; import nu.marginalia.model.id.UrlIdCodec; -import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -43,27 +41,13 @@ public class IndexJournalTest { .build()); journalWriter.close(); - reader = new IndexJournalReaderSingleCompressedFile(tempFile); + reader = new IndexJournalReaderSingleFile(tempFile); } @AfterEach public void tearDown() throws IOException { - reader.close(); Files.delete(tempFile); } - @Test - public void reiterable() { - // Verifies that the reader can be run twice to the same effect - - int cnt = 0; - int cnt2 = 0; - - for (var item : reader) cnt++; - for (var item : reader) cnt2++; - - assertEquals(cnt2, cnt); - } - @Test public void forEachDocId() { List expected = List.of(firstDocId, secondDocId); @@ -82,20 +66,4 @@ public class IndexJournalTest { assertEquals(expected, actual); } - @Test - public void forEachDocIdRecord() { - List> expected = List.of( - Pair.of(firstDocId, new IndexJournalEntryData.Record(1, 2)), - Pair.of(firstDocId, new IndexJournalEntryData.Record(2, 3)), - Pair.of(firstDocId, new IndexJournalEntryData.Record(3, 4)), - Pair.of(firstDocId, new IndexJournalEntryData.Record(5, 6)), - Pair.of(secondDocId, new IndexJournalEntryData.Record(5, 5)), - Pair.of(secondDocId, new IndexJournalEntryData.Record(6, 6)) - ); - List> actual = new ArrayList<>(); - - reader.forEachDocIdRecord((url, word) -> actual.add(Pair.of(url, word))); - assertEquals(expected, actual); - } - } diff --git a/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/reader/pointer/IndexJournalPointerTest.java b/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/reader/pointer/IndexJournalPointerTest.java new file mode 100644 index 00000000..202a229c --- /dev/null +++ b/code/features-index/index-journal/src/test/java/nu/marginalia/index/journal/reader/pointer/IndexJournalPointerTest.java @@ -0,0 +1,133 @@ +package nu.marginalia.index.journal.reader.pointer; + +import org.junit.jupiter.api.Test; + +import java.util.Collection; +import java.util.List; +import java.util.ArrayList; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class IndexJournalPointerTest { + + @Test + public void concatenate() { + MockPointer left = new MockPointer( + List.of(new MockDocument(1, 2, 3, List.of( + new MockRecord(4, 5), + new MockRecord(6, 7)) + )) + ); + + MockPointer right = new MockPointer( + List.of(new MockDocument(8, 9, 10, List.of( + new MockRecord(11, 12), + new MockRecord(13, 14)) + )) + ); + + IndexJournalPointer concatenated = IndexJournalPointer.concatenate(left, right); + List docIdsSeq = new ArrayList<>(); + List wordIdsSeq = new ArrayList<>(); + while (concatenated.nextDocument()) { + docIdsSeq.add(concatenated.documentId()); + while (concatenated.nextRecord()) { + wordIdsSeq.add(concatenated.wordId()); + } + } + + assertEquals(docIdsSeq, List.of(1L, 8L)); + assertEquals(wordIdsSeq, List.of(4L, 6L, 11L, 13L)); + } + + @Test + public void filter() { + MockPointer left = new MockPointer( + List.of(new MockDocument(1, 2, 3, List.of( + new MockRecord(1, 1), + new MockRecord(2, 2), + new MockRecord(3, 3), + new MockRecord(4, 4), + new MockRecord(5, 5) + ) + ), new MockDocument(2, 2, 3, List.of( + new MockRecord(1, 1), + new MockRecord(3, 3), + new MockRecord(5, 5) + ) + )) + + ); + var filtered = left.filterWordMeta(meta -> (meta % 2) == 0); + + List docIdsSeq = new ArrayList<>(); + List wordIdsSeq = new ArrayList<>(); + while (filtered.nextDocument()) { + docIdsSeq.add(filtered.documentId()); + while (filtered.nextRecord()) { + wordIdsSeq.add(filtered.wordId()); + } + } + + assertEquals(docIdsSeq, List.of(1L, 2L)); + assertEquals(wordIdsSeq, List.of(2L, 4L)); + } + + class MockPointer implements IndexJournalPointer { + private final List documents; + + int di = -1; + int ri; + + public MockPointer(Collection documents) { + this.documents = new ArrayList<>(documents); + } + + @Override + public boolean nextDocument() { + if (++di < documents.size()) { + ri = -1; + return true; + } + + return false; + } + + @Override + public boolean nextRecord() { + if (++ri < documents.get(di).records.size()) { + return true; + } + + return false; + } + + @Override + public long documentId() { + return documents.get(di).docId; + } + + @Override + public long documentMeta() { + return documents.get(di).docMeta; + } + + @Override + public long wordId() { + return documents.get(di).records.get(ri).wordId; + } + + @Override + public long wordMeta() { + return documents.get(di).records.get(ri).wordMeta; + } + + @Override + public int documentFeatures() { + return documents.get(di).docFeatures; + } + } + + record MockDocument(long docId, long docMeta, int docFeatures, List records) {} + record MockRecord(long wordId, long wordMeta) {} +} \ No newline at end of file diff --git a/code/features-index/index-reverse/src/main/java/nu/marginalia/index/construction/ReversePreindexDocuments.java b/code/features-index/index-reverse/src/main/java/nu/marginalia/index/construction/ReversePreindexDocuments.java index 537b45df..378f40b6 100644 --- a/code/features-index/index-reverse/src/main/java/nu/marginalia/index/construction/ReversePreindexDocuments.java +++ b/code/features-index/index-reverse/src/main/java/nu/marginalia/index/construction/ReversePreindexDocuments.java @@ -2,7 +2,6 @@ package nu.marginalia.index.construction; import lombok.SneakyThrows; import nu.marginalia.array.LongArray; -import nu.marginalia.array.algo.SortingContext; import nu.marginalia.index.journal.reader.IndexJournalReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,22 +69,17 @@ public class ReversePreindexDocuments { var offsetMap = segments.asMap(RECORD_SIZE_LONGS); offsetMap.defaultReturnValue(0); - for (var entry : reader) { - long rankEncodedId = docIdRewriter.rewriteDocId(entry.docId()); - - var data = entry.readEntry(); - for (int i = 0; i + 1 < data.size(); i+=2) { - long wordId = data.get(i); - long meta = data.get(i+1); - - if (!reader.filter(entry, meta)) { - continue; - } + var pointer = reader.newPointer(); + while (pointer.nextDocument()) { + long rankEncodedId = docIdRewriter.rewriteDocId(pointer.documentId()); + while (pointer.nextRecord()) { + long wordId = pointer.wordId(); + long wordMeta = pointer.wordMeta(); long offset = offsetMap.addTo(wordId, RECORD_SIZE_LONGS); outArray.set(offset + 0, rankEncodedId); - outArray.set(offset + 1, meta); + outArray.set(offset + 1, wordMeta); } } diff --git a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/construction/TestJournalFactory.java b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/construction/TestJournalFactory.java index 5fdb0ac1..b122921b 100644 --- a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/construction/TestJournalFactory.java +++ b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/construction/TestJournalFactory.java @@ -3,7 +3,7 @@ package nu.marginalia.index.construction; import nu.marginalia.index.journal.model.IndexJournalEntryData; import nu.marginalia.index.journal.model.IndexJournalEntryHeader; import nu.marginalia.index.journal.reader.IndexJournalReader; -import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile; +import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile; import nu.marginalia.index.journal.writer.IndexJournalWriterSingleFileImpl; import java.io.IOException; @@ -68,7 +68,7 @@ public class TestJournalFactory { new IndexJournalEntryData(data)); } writer.close(); - var ret = new IndexJournalReaderSingleCompressedFile(jf); + var ret = new IndexJournalReaderSingleFile(jf); return ret; } @@ -87,7 +87,7 @@ public class TestJournalFactory { new IndexJournalEntryData(data)); } writer.close(); - var ret = new IndexJournalReaderSingleCompressedFile(jf); + var ret = new IndexJournalReaderSingleFile(jf); return ret; } } diff --git a/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java index 8e7be9d9..67bdefe1 100644 --- a/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java +++ b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java @@ -12,6 +12,7 @@ import nu.marginalia.index.forward.ForwardIndexFileNames; import nu.marginalia.index.journal.reader.IndexJournalReader; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.id.UrlIdCodec; +import nu.marginalia.model.idx.WordFlags; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.inbox.MqInboxResponse; @@ -31,6 +32,7 @@ import java.sql.SQLException; import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.LongPredicate; import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX; @@ -123,12 +125,29 @@ public class IndexConstructorMain { Path tmpDir = indexStaging.asPath().resolve("tmp"); if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir); + LongPredicate wordMetaFilter = getPriorityIndexWordMetaFilter(); + ReverseIndexConstructor. createReverseIndex(heartbeat, - IndexJournalReader::singleFileWithPriorityFilters, + (path) -> IndexJournalReader.filteringSingleFile(path, wordMetaFilter), indexStaging.asPath(), this::addRank, tmpDir, outputFileDocs, outputFileWords); } + private static LongPredicate getPriorityIndexWordMetaFilter() { + + long highPriorityFlags = + WordFlags.Title.asBit() + | WordFlags.Subjects.asBit() + | WordFlags.TfIdfHigh.asBit() + | WordFlags.NamesWords.asBit() + | WordFlags.UrlDomain.asBit() + | WordFlags.UrlPath.asBit() + | WordFlags.Site.asBit() + | WordFlags.SiteAdjacent.asBit(); + + return r -> (r & highPriorityFlags) != 0; + } + private void createForwardIndex() throws SQLException, IOException { FileStorage indexLive = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE); diff --git a/code/processes/loading-process/src/test/java/nu/marginalia/loading/loader/LoaderIndexJournalWriterTest.java b/code/processes/loading-process/src/test/java/nu/marginalia/loading/loader/LoaderIndexJournalWriterTest.java index 0fe79093..f6958431 100644 --- a/code/processes/loading-process/src/test/java/nu/marginalia/loading/loader/LoaderIndexJournalWriterTest.java +++ b/code/processes/loading-process/src/test/java/nu/marginalia/loading/loader/LoaderIndexJournalWriterTest.java @@ -3,14 +3,13 @@ package nu.marginalia.loading.loader; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.db.storage.model.FileStorageType; -import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile; +import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile; import nu.marginalia.keyword.model.DocumentKeywords; import nu.marginalia.model.idx.DocumentMetadata; import nu.marginallia.index.journal.IndexJournalFileNames; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import org.mockito.Mockito; import java.io.IOException; @@ -18,7 +17,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.stream.LongStream; @@ -64,19 +62,18 @@ class LoaderIndexJournalWriterTest { List journalFiles =IndexJournalFileNames.findJournalFiles(tempDir); assertEquals(1, journalFiles.size()); - var reader = new IndexJournalReaderSingleCompressedFile(journalFiles.get(0)); + var reader = new IndexJournalReaderSingleFile(journalFiles.get(0)); List docIds = new ArrayList<>(); reader.forEachDocId(docIds::add); assertEquals(List.of(1L, 1L), docIds); List metas = new ArrayList(); - reader.forEach(r -> { - var entry = r.readEntry(); - for (int i = 0; i + 1 < entry.size(); i+=2) { - entry.get(i); - metas.add(entry.get(i+1)); + var ptr = reader.newPointer(); + while (ptr.nextDocument()) { + while (ptr.nextRecord()) { + metas.add(ptr.wordMeta()); } - }); + } assertEquals(LongStream.of(metadata).boxed().toList(), metas); }