mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 21:18:58 +00:00
(index journal) Fix leaky abstraction in IndexJournalReader.
The caller shouldn't be required to know the on-disk layout of the file to make use of the data in a performant way.
This commit is contained in:
parent
88ac72c8eb
commit
320dad7f1a
@ -4,7 +4,7 @@ import com.upserve.uppend.blobs.NativeIO;
|
||||
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReader;
|
||||
import nu.marginalia.array.LongArray;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile;
|
||||
import nu.marginalia.model.id.UrlIdCodec;
|
||||
import nu.marginalia.model.idx.DocumentMetadata;
|
||||
import nu.marginalia.process.control.ProcessHeartbeat;
|
||||
import nu.marginalia.ranking.DomainRankings;
|
||||
@ -13,7 +13,6 @@ import org.roaringbitmap.longlong.Roaring64Bitmap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
@ -74,15 +73,19 @@ public class ForwardIndexConverter {
|
||||
|
||||
LongArray docFileData = LongArray.mmapForWriting(outputFileDocsData, ForwardIndexParameters.ENTRY_SIZE * docsFileId.size());
|
||||
|
||||
journalReader.forEach(entry -> {
|
||||
long entryOffset = (long) ForwardIndexParameters.ENTRY_SIZE * docIdToIdx.get(entry.docId());
|
||||
var pointer = journalReader.newPointer();
|
||||
while (pointer.nextDocument()) {
|
||||
long docId = pointer.documentId();
|
||||
int domainId = UrlIdCodec.getDomainId(docId);
|
||||
|
||||
int ranking = domainRankings.getRanking(entry.domainId());
|
||||
long meta = DocumentMetadata.encodeRank(entry.docMeta(), ranking);
|
||||
long entryOffset = (long) ForwardIndexParameters.ENTRY_SIZE * docIdToIdx.get(docId);
|
||||
|
||||
int ranking = domainRankings.getRanking(domainId);
|
||||
long meta = DocumentMetadata.encodeRank(pointer.documentMeta(), ranking);
|
||||
|
||||
docFileData.set(entryOffset + ForwardIndexParameters.METADATA_OFFSET, meta);
|
||||
docFileData.set(entryOffset + ForwardIndexParameters.FEATURES_OFFSET, entry.header.documentFeatures());
|
||||
});
|
||||
docFileData.set(entryOffset + ForwardIndexParameters.FEATURES_OFFSET, pointer.documentFeatures());
|
||||
}
|
||||
|
||||
progress.progress(TaskSteps.FORCE);
|
||||
|
||||
|
@ -2,19 +2,16 @@ package nu.marginalia.index.forward;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.index.journal.model.IndexJournalEntry;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile;
|
||||
import nu.marginalia.index.journal.writer.IndexJournalWriter;
|
||||
import nu.marginalia.index.journal.writer.IndexJournalWriterSingleFileImpl;
|
||||
import nu.marginalia.model.id.UrlIdCodec;
|
||||
import nu.marginalia.process.control.FakeProcessHeartbeat;
|
||||
import nu.marginalia.process.control.ProcessHeartbeatImpl;
|
||||
import nu.marginalia.process.control.ProcessTaskHeartbeatImpl;
|
||||
import nu.marginalia.ranking.DomainRankings;
|
||||
import nu.marginalia.test.TestUtil;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -24,7 +21,6 @@ import java.nio.file.Path;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
class ForwardIndexConverterTest {
|
||||
|
||||
@ -96,7 +92,7 @@ class ForwardIndexConverterTest {
|
||||
@Test
|
||||
void testForwardIndex() throws IOException {
|
||||
|
||||
new ForwardIndexConverter(new FakeProcessHeartbeat(), new IndexJournalReaderSingleCompressedFile(indexFile), docsFileId, docsFileData, new DomainRankings()).convert();
|
||||
new ForwardIndexConverter(new FakeProcessHeartbeat(), new IndexJournalReaderSingleFile(indexFile), docsFileId, docsFileData, new DomainRankings()).convert();
|
||||
|
||||
var forwardReader = new ForwardIndexReader(docsFileId, docsFileData);
|
||||
|
||||
|
@ -1,65 +1,70 @@
|
||||
package nu.marginalia.index.journal.reader;
|
||||
|
||||
import nu.marginalia.index.journal.model.IndexJournalEntryData;
|
||||
import nu.marginalia.index.journal.reader.pointer.IndexJournalPointer;
|
||||
import nu.marginalia.model.idx.WordFlags;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Iterator;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.LongPredicate;
|
||||
|
||||
public interface IndexJournalReader extends Iterable<IndexJournalReadEntry> {
|
||||
public interface IndexJournalReader {
|
||||
int FILE_HEADER_SIZE_LONGS = 2;
|
||||
int FILE_HEADER_SIZE_BYTES = 8 * FILE_HEADER_SIZE_LONGS;
|
||||
|
||||
static IndexJournalReader singleFile(Path fileName) throws IOException {
|
||||
return new IndexJournalReaderSingleCompressedFile(fileName);
|
||||
return new IndexJournalReaderSingleFile(fileName);
|
||||
}
|
||||
|
||||
static IndexJournalReader paging(Path baseDir) throws IOException {
|
||||
return new IndexJournalReaderPagingImpl(baseDir);
|
||||
}
|
||||
|
||||
static IndexJournalReader singleFileWithPriorityFilters(Path path) throws IOException {
|
||||
|
||||
long highPriorityFlags =
|
||||
WordFlags.Title.asBit()
|
||||
| WordFlags.Subjects.asBit()
|
||||
| WordFlags.TfIdfHigh.asBit()
|
||||
| WordFlags.NamesWords.asBit()
|
||||
| WordFlags.UrlDomain.asBit()
|
||||
| WordFlags.UrlPath.asBit()
|
||||
| WordFlags.Site.asBit()
|
||||
| WordFlags.SiteAdjacent.asBit();
|
||||
|
||||
return new IndexJournalReaderSingleCompressedFile(path, null,
|
||||
r -> (r & highPriorityFlags) != 0);
|
||||
static IndexJournalReader filteringSingleFile(Path path, LongPredicate wordMetaFilter) throws IOException {
|
||||
return new IndexJournalReaderSingleFile(path)
|
||||
.filtering(wordMetaFilter);
|
||||
}
|
||||
|
||||
void forEachWordId(LongConsumer consumer);
|
||||
default void forEachWordId(LongConsumer consumer) {
|
||||
var ptr = this.newPointer();
|
||||
while (ptr.nextDocument()) {
|
||||
while (ptr.nextRecord()) {
|
||||
consumer.accept(ptr.wordId());
|
||||
}
|
||||
}
|
||||
}
|
||||
default void forEachDocId(LongConsumer consumer) {
|
||||
var ptr = this.newPointer();
|
||||
while (ptr.nextDocument()) {
|
||||
consumer.accept(ptr.documentId());
|
||||
}
|
||||
}
|
||||
|
||||
void forEachDocIdRecord(LongObjectConsumer<IndexJournalEntryData.Record> consumer);
|
||||
|
||||
void forEachDocId(LongConsumer consumer);
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
Iterator<IndexJournalReadEntry> iterator();
|
||||
|
||||
boolean filter(IndexJournalReadEntry entry);
|
||||
|
||||
boolean filter(IndexJournalReadEntry entry, IndexJournalEntryData.Record record);
|
||||
|
||||
boolean filter(IndexJournalReadEntry entry, long metadata);
|
||||
|
||||
void close() throws IOException;
|
||||
IndexJournalPointer newPointer();
|
||||
|
||||
|
||||
default IndexJournalReader filtering(LongPredicate termMetaFilter) {
|
||||
return new FilteringIndexJournalReader(this, termMetaFilter);
|
||||
}
|
||||
|
||||
interface LongObjectConsumer<T> {
|
||||
void accept(long left, T right);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class FilteringIndexJournalReader implements IndexJournalReader {
|
||||
private final IndexJournalReader base;
|
||||
private final LongPredicate termMetaFilter;
|
||||
|
||||
FilteringIndexJournalReader(IndexJournalReader base, LongPredicate termMetaFilter) {
|
||||
this.base = base;
|
||||
this.termMetaFilter = termMetaFilter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexJournalPointer newPointer() {
|
||||
return base
|
||||
.newPointer()
|
||||
.filterWordMeta(termMetaFilter);
|
||||
}
|
||||
}
|
@ -1,17 +1,12 @@
|
||||
package nu.marginalia.index.journal.reader;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
import nu.marginalia.index.journal.model.IndexJournalEntryData;
|
||||
import nu.marginalia.index.journal.model.IndexJournalStatistics;
|
||||
import nu.marginalia.index.journal.reader.pointer.IndexJournalPointer;
|
||||
import nu.marginallia.index.journal.IndexJournalFileNames;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
public class IndexJournalReaderPagingImpl implements IndexJournalReader {
|
||||
|
||||
@ -22,55 +17,16 @@ public class IndexJournalReaderPagingImpl implements IndexJournalReader {
|
||||
this.readers = new ArrayList<>(inputFiles.size());
|
||||
|
||||
for (var inputFile : inputFiles) {
|
||||
readers.add(new IndexJournalReaderSingleCompressedFile(inputFile));
|
||||
readers.add(new IndexJournalReaderSingleFile(inputFile));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachWordId(LongConsumer consumer) {
|
||||
for (var reader : readers) {
|
||||
reader.forEachWordId(consumer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachDocIdRecord(LongObjectConsumer<IndexJournalEntryData.Record> consumer) {
|
||||
for (var reader : readers) {
|
||||
reader.forEachDocIdRecord(consumer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachDocId(LongConsumer consumer) {
|
||||
for (var reader : readers) {
|
||||
reader.forEachDocId(consumer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NotNull Iterator<IndexJournalReadEntry> iterator() {
|
||||
return Iterators.concat(readers.stream().map(IndexJournalReader::iterator).iterator());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry) {
|
||||
return readers.get(0).filter(entry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry, IndexJournalEntryData.Record record) {
|
||||
return readers.get(0).filter(entry, record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry, long metadata) {
|
||||
return readers.get(0).filter(entry, metadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
for (var reader : readers) {
|
||||
reader.close();
|
||||
}
|
||||
public IndexJournalPointer newPointer() {
|
||||
return IndexJournalPointer.concatenate(
|
||||
readers.stream()
|
||||
.map(IndexJournalReader::newPointer)
|
||||
.toArray(IndexJournalPointer[]::new)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -1,169 +0,0 @@
|
||||
package nu.marginalia.index.journal.reader;
|
||||
|
||||
import com.github.luben.zstd.ZstdInputStream;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.index.journal.model.IndexJournalEntryData;
|
||||
import nu.marginalia.index.journal.model.IndexJournalFileHeader;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Iterator;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class IndexJournalReaderSingleCompressedFile implements IndexJournalReader {
|
||||
|
||||
private Path journalFile;
|
||||
public final IndexJournalFileHeader fileHeader;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "IndexJournalReaderSingleCompressedFile{" + journalFile + " }";
|
||||
}
|
||||
|
||||
private DataInputStream dataInputStream = null;
|
||||
|
||||
final Predicate<IndexJournalReadEntry> entryPredicate;
|
||||
final Predicate<Long> metadataPredicate;
|
||||
|
||||
public IndexJournalReaderSingleCompressedFile(Path file) throws IOException {
|
||||
this.journalFile = file;
|
||||
|
||||
fileHeader = readHeader(file);
|
||||
|
||||
this.metadataPredicate = null;
|
||||
this.entryPredicate = null;
|
||||
}
|
||||
|
||||
public IndexJournalReaderSingleCompressedFile(Path file, Predicate<IndexJournalReadEntry> entryPredicate, Predicate<Long> metadataPredicate) throws IOException {
|
||||
this.journalFile = file;
|
||||
|
||||
fileHeader = readHeader(file);
|
||||
|
||||
this.metadataPredicate = metadataPredicate;
|
||||
this.entryPredicate = entryPredicate;
|
||||
}
|
||||
|
||||
private static IndexJournalFileHeader readHeader(Path file) throws IOException {
|
||||
try (var raf = new RandomAccessFile(file.toFile(), "r")) {
|
||||
long unused = raf.readLong();
|
||||
long wordCount = raf.readLong();
|
||||
|
||||
return new IndexJournalFileHeader(unused, wordCount);
|
||||
}
|
||||
}
|
||||
|
||||
private static DataInputStream createInputStream(Path file) throws IOException {
|
||||
var fileInputStream = Files.newInputStream(file, StandardOpenOption.READ);
|
||||
|
||||
// skip the header
|
||||
fileInputStream.skipNBytes(16);
|
||||
|
||||
return new DataInputStream(new ZstdInputStream(new BufferedInputStream(fileInputStream)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry) {
|
||||
return entryPredicate == null || entryPredicate.test(entry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry, IndexJournalEntryData.Record record) {
|
||||
return (entryPredicate == null || entryPredicate.test(entry))
|
||||
&& (metadataPredicate == null || metadataPredicate.test(record.metadata()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry, long metadata) {
|
||||
return (entryPredicate == null || entryPredicate.test(entry))
|
||||
&& (metadataPredicate == null || metadataPredicate.test(metadata));
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
dataInputStream.close();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void forEachWordId(LongConsumer consumer) {
|
||||
for (var entry : this) {
|
||||
var data = entry.readEntry();
|
||||
for (var post : data) {
|
||||
if (filter(entry, post)) {
|
||||
consumer.accept(post.wordId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachDocIdRecord(LongObjectConsumer<IndexJournalEntryData.Record> consumer) {
|
||||
for (var entry : this) {
|
||||
var data = entry.readEntry();
|
||||
|
||||
for (var post : data) {
|
||||
if (filter(entry, post)) {
|
||||
consumer.accept(entry.docId(), post);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void forEachDocId(LongConsumer consumer) {
|
||||
for (var entry : this) {
|
||||
if (filter(entry)) {
|
||||
consumer.accept(entry.docId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@NotNull
|
||||
@Override
|
||||
public Iterator<IndexJournalReadEntry> iterator() {
|
||||
if (dataInputStream != null) {
|
||||
dataInputStream.close();
|
||||
}
|
||||
dataInputStream = createInputStream(journalFile);
|
||||
|
||||
return new JournalEntryIterator();
|
||||
}
|
||||
|
||||
private class JournalEntryIterator implements Iterator<IndexJournalReadEntry> {
|
||||
private int i = -1;
|
||||
private IndexJournalReadEntry next;
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public boolean hasNext() {
|
||||
if (next != null)
|
||||
return true;
|
||||
|
||||
while (++i < fileHeader.fileSize()) {
|
||||
var entry = IndexJournalReadEntry.read(dataInputStream);
|
||||
if (filter(entry)) {
|
||||
next = entry;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public IndexJournalReadEntry next() {
|
||||
if (hasNext()) {
|
||||
var ret = next;
|
||||
next = null;
|
||||
return ret;
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,130 @@
|
||||
package nu.marginalia.index.journal.reader;
|
||||
|
||||
import com.github.luben.zstd.ZstdInputStream;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.index.journal.model.IndexJournalEntryData;
|
||||
import nu.marginalia.index.journal.model.IndexJournalFileHeader;
|
||||
import nu.marginalia.index.journal.reader.pointer.IndexJournalPointer;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
|
||||
public class IndexJournalReaderSingleFile implements IndexJournalReader {
|
||||
|
||||
private Path journalFile;
|
||||
public final IndexJournalFileHeader fileHeader;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "IndexJournalReaderSingleCompressedFile{" + journalFile + " }";
|
||||
}
|
||||
|
||||
public IndexJournalReaderSingleFile(Path file) throws IOException {
|
||||
this.journalFile = file;
|
||||
|
||||
fileHeader = readHeader(file);
|
||||
}
|
||||
|
||||
private static IndexJournalFileHeader readHeader(Path file) throws IOException {
|
||||
try (var raf = new RandomAccessFile(file.toFile(), "r")) {
|
||||
long unused = raf.readLong();
|
||||
long wordCount = raf.readLong();
|
||||
|
||||
return new IndexJournalFileHeader(unused, wordCount);
|
||||
}
|
||||
}
|
||||
|
||||
private static DataInputStream createInputStream(Path file) throws IOException {
|
||||
var fileInputStream = Files.newInputStream(file, StandardOpenOption.READ);
|
||||
|
||||
// skip the header
|
||||
fileInputStream.skipNBytes(16);
|
||||
|
||||
return new DataInputStream(new ZstdInputStream(new BufferedInputStream(fileInputStream)));
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public IndexJournalPointer newPointer() {
|
||||
return new SingleFileJournalPointer(fileHeader, createInputStream(journalFile));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class SingleFileJournalPointer implements IndexJournalPointer {
|
||||
|
||||
private final IndexJournalFileHeader fileHeader;
|
||||
private final DataInputStream dataInputStream;
|
||||
private IndexJournalReadEntry entry;
|
||||
private IndexJournalEntryData entryData;
|
||||
private int recordIdx = -2;
|
||||
private int docIdx = -1;
|
||||
|
||||
public SingleFileJournalPointer(
|
||||
IndexJournalFileHeader fileHeader,
|
||||
DataInputStream dataInputStream)
|
||||
{
|
||||
this.fileHeader = fileHeader;
|
||||
this.dataInputStream = dataInputStream;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public boolean nextDocument() {
|
||||
recordIdx = -2;
|
||||
entryData = null;
|
||||
|
||||
if (++docIdx < fileHeader.fileSize()) {
|
||||
entry = IndexJournalReadEntry.read(dataInputStream);
|
||||
return true;
|
||||
}
|
||||
|
||||
dataInputStream.close();
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRecord() {
|
||||
if (entryData == null) {
|
||||
entryData = entry.readEntry();
|
||||
}
|
||||
|
||||
recordIdx += 2;
|
||||
if (recordIdx < entryData.size()) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long documentId() {
|
||||
return entry.docId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long documentMeta() {
|
||||
return entry.docMeta();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long wordId() {
|
||||
return entryData.get(recordIdx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long wordMeta() {
|
||||
return entryData.get(recordIdx + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int documentFeatures() {
|
||||
if (entryData == null) {
|
||||
entryData = entry.readEntry();
|
||||
}
|
||||
|
||||
return entry.header.documentFeatures();
|
||||
}
|
||||
}
|
@ -0,0 +1,167 @@
|
||||
package nu.marginalia.index.journal.reader.pointer;
|
||||
|
||||
import java.util.function.LongPredicate;
|
||||
|
||||
/**
|
||||
* This is something like a double iterator. The Index Journal consists of
|
||||
* blocks of words and word-metadata for each document and document metadata.
|
||||
* <br>
|
||||
*
|
||||
* Perhaps best conceptualized as something like
|
||||
*
|
||||
* <pre>[doc1: word1 word2 word3 word4] [doc2: word1 word2 word3 ]</pre>
|
||||
* nextDocument() will move the pointer from doc1 to doc2;<br>
|
||||
* nextRecord() will move the pointer from word1 to word2...<br>
|
||||
*/
|
||||
public interface IndexJournalPointer {
|
||||
/**
|
||||
* Advance to the next document in the journal,
|
||||
* returning true if such a document exists.
|
||||
* Resets the record index to before the first
|
||||
* record (if it exists).
|
||||
*/
|
||||
boolean nextDocument();
|
||||
|
||||
/**
|
||||
* Advance to the next record in the journal
|
||||
*/
|
||||
boolean nextRecord();
|
||||
|
||||
/**
|
||||
* Get the id associated with the current document
|
||||
*/
|
||||
long documentId();
|
||||
|
||||
/**
|
||||
* Get the metadata associated with the current document
|
||||
*/
|
||||
long documentMeta();
|
||||
|
||||
/**
|
||||
* Get the wordId associated with the current record
|
||||
*/
|
||||
long wordId();
|
||||
|
||||
/**
|
||||
* Get the termMeta associated with the current record
|
||||
*/
|
||||
long wordMeta();
|
||||
|
||||
/**
|
||||
* Get the documentFeatures associated with the current record
|
||||
*/
|
||||
int documentFeatures();
|
||||
|
||||
/** Concatenate a number of journal pointers */
|
||||
static IndexJournalPointer concatenate(IndexJournalPointer... pointers) {
|
||||
if (pointers.length == 1)
|
||||
return pointers[0];
|
||||
|
||||
return new JoiningJournalPointer(pointers);
|
||||
}
|
||||
|
||||
/** Add a filter on word metadata to the pointer */
|
||||
default IndexJournalPointer filterWordMeta(LongPredicate filter) {
|
||||
return new FilteringJournalPointer(this, filter);
|
||||
}
|
||||
}
|
||||
|
||||
class JoiningJournalPointer implements IndexJournalPointer {
|
||||
private final IndexJournalPointer[] pointers;
|
||||
private int pIndex = 0;
|
||||
|
||||
JoiningJournalPointer(IndexJournalPointer[] pointers) {
|
||||
this.pointers = pointers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextDocument() {
|
||||
|
||||
while (pIndex < pointers.length) {
|
||||
if (pointers[pIndex].nextDocument())
|
||||
return true;
|
||||
else pIndex++;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRecord() {
|
||||
return pointers[pIndex].nextRecord();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long documentId() {
|
||||
return pointers[pIndex].documentId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long documentMeta() {
|
||||
return pointers[pIndex].documentMeta();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long wordId() {
|
||||
return pointers[pIndex].wordId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long wordMeta() {
|
||||
return pointers[pIndex].wordMeta();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int documentFeatures() {
|
||||
return pointers[pIndex].documentFeatures();
|
||||
}
|
||||
}
|
||||
|
||||
class FilteringJournalPointer implements IndexJournalPointer {
|
||||
private final IndexJournalPointer base;
|
||||
private final LongPredicate filter;
|
||||
|
||||
FilteringJournalPointer(IndexJournalPointer base, LongPredicate filter) {
|
||||
this.base = base;
|
||||
this.filter = filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextDocument() {
|
||||
return base.nextDocument();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRecord() {
|
||||
while (base.nextRecord()) {
|
||||
if (filter.test(wordMeta()))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long documentId() {
|
||||
return base.documentId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long documentMeta() {
|
||||
return base.documentMeta();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long wordId() {
|
||||
return base.wordId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long wordMeta() {
|
||||
return base.wordMeta();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int documentFeatures() {
|
||||
return base.documentFeatures();
|
||||
}
|
||||
}
|
@ -1,12 +1,10 @@
|
||||
package nu.marginalia.index.journal;
|
||||
|
||||
import nu.marginalia.index.journal.model.IndexJournalEntry;
|
||||
import nu.marginalia.index.journal.model.IndexJournalEntryData;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReader;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile;
|
||||
import nu.marginalia.index.journal.writer.IndexJournalWriterSingleFileImpl;
|
||||
import nu.marginalia.model.id.UrlIdCodec;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@ -43,27 +41,13 @@ public class IndexJournalTest {
|
||||
.build());
|
||||
journalWriter.close();
|
||||
|
||||
reader = new IndexJournalReaderSingleCompressedFile(tempFile);
|
||||
reader = new IndexJournalReaderSingleFile(tempFile);
|
||||
}
|
||||
@AfterEach
|
||||
public void tearDown() throws IOException {
|
||||
reader.close();
|
||||
Files.delete(tempFile);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void reiterable() {
|
||||
// Verifies that the reader can be run twice to the same effect
|
||||
|
||||
int cnt = 0;
|
||||
int cnt2 = 0;
|
||||
|
||||
for (var item : reader) cnt++;
|
||||
for (var item : reader) cnt2++;
|
||||
|
||||
assertEquals(cnt2, cnt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void forEachDocId() {
|
||||
List<Long> expected = List.of(firstDocId, secondDocId);
|
||||
@ -82,20 +66,4 @@ public class IndexJournalTest {
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void forEachDocIdRecord() {
|
||||
List<Pair<Long, IndexJournalEntryData.Record>> expected = List.of(
|
||||
Pair.of(firstDocId, new IndexJournalEntryData.Record(1, 2)),
|
||||
Pair.of(firstDocId, new IndexJournalEntryData.Record(2, 3)),
|
||||
Pair.of(firstDocId, new IndexJournalEntryData.Record(3, 4)),
|
||||
Pair.of(firstDocId, new IndexJournalEntryData.Record(5, 6)),
|
||||
Pair.of(secondDocId, new IndexJournalEntryData.Record(5, 5)),
|
||||
Pair.of(secondDocId, new IndexJournalEntryData.Record(6, 6))
|
||||
);
|
||||
List<Pair<Long, IndexJournalEntryData.Record>> actual = new ArrayList<>();
|
||||
|
||||
reader.forEachDocIdRecord((url, word) -> actual.add(Pair.of(url, word)));
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,133 @@
|
||||
package nu.marginalia.index.journal.reader.pointer;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
class IndexJournalPointerTest {
|
||||
|
||||
@Test
|
||||
public void concatenate() {
|
||||
MockPointer left = new MockPointer(
|
||||
List.of(new MockDocument(1, 2, 3, List.of(
|
||||
new MockRecord(4, 5),
|
||||
new MockRecord(6, 7))
|
||||
))
|
||||
);
|
||||
|
||||
MockPointer right = new MockPointer(
|
||||
List.of(new MockDocument(8, 9, 10, List.of(
|
||||
new MockRecord(11, 12),
|
||||
new MockRecord(13, 14))
|
||||
))
|
||||
);
|
||||
|
||||
IndexJournalPointer concatenated = IndexJournalPointer.concatenate(left, right);
|
||||
List<Long> docIdsSeq = new ArrayList<>();
|
||||
List<Long> wordIdsSeq = new ArrayList<>();
|
||||
while (concatenated.nextDocument()) {
|
||||
docIdsSeq.add(concatenated.documentId());
|
||||
while (concatenated.nextRecord()) {
|
||||
wordIdsSeq.add(concatenated.wordId());
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(docIdsSeq, List.of(1L, 8L));
|
||||
assertEquals(wordIdsSeq, List.of(4L, 6L, 11L, 13L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void filter() {
|
||||
MockPointer left = new MockPointer(
|
||||
List.of(new MockDocument(1, 2, 3, List.of(
|
||||
new MockRecord(1, 1),
|
||||
new MockRecord(2, 2),
|
||||
new MockRecord(3, 3),
|
||||
new MockRecord(4, 4),
|
||||
new MockRecord(5, 5)
|
||||
)
|
||||
), new MockDocument(2, 2, 3, List.of(
|
||||
new MockRecord(1, 1),
|
||||
new MockRecord(3, 3),
|
||||
new MockRecord(5, 5)
|
||||
)
|
||||
))
|
||||
|
||||
);
|
||||
var filtered = left.filterWordMeta(meta -> (meta % 2) == 0);
|
||||
|
||||
List<Long> docIdsSeq = new ArrayList<>();
|
||||
List<Long> wordIdsSeq = new ArrayList<>();
|
||||
while (filtered.nextDocument()) {
|
||||
docIdsSeq.add(filtered.documentId());
|
||||
while (filtered.nextRecord()) {
|
||||
wordIdsSeq.add(filtered.wordId());
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(docIdsSeq, List.of(1L, 2L));
|
||||
assertEquals(wordIdsSeq, List.of(2L, 4L));
|
||||
}
|
||||
|
||||
class MockPointer implements IndexJournalPointer {
|
||||
private final List<MockDocument> documents;
|
||||
|
||||
int di = -1;
|
||||
int ri;
|
||||
|
||||
public MockPointer(Collection<MockDocument> documents) {
|
||||
this.documents = new ArrayList<>(documents);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextDocument() {
|
||||
if (++di < documents.size()) {
|
||||
ri = -1;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRecord() {
|
||||
if (++ri < documents.get(di).records.size()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long documentId() {
|
||||
return documents.get(di).docId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long documentMeta() {
|
||||
return documents.get(di).docMeta;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long wordId() {
|
||||
return documents.get(di).records.get(ri).wordId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long wordMeta() {
|
||||
return documents.get(di).records.get(ri).wordMeta;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int documentFeatures() {
|
||||
return documents.get(di).docFeatures;
|
||||
}
|
||||
}
|
||||
|
||||
record MockDocument(long docId, long docMeta, int docFeatures, List<MockRecord> records) {}
|
||||
record MockRecord(long wordId, long wordMeta) {}
|
||||
}
|
@ -2,7 +2,6 @@ package nu.marginalia.index.construction;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.array.LongArray;
|
||||
import nu.marginalia.array.algo.SortingContext;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -70,22 +69,17 @@ public class ReversePreindexDocuments {
|
||||
var offsetMap = segments.asMap(RECORD_SIZE_LONGS);
|
||||
offsetMap.defaultReturnValue(0);
|
||||
|
||||
for (var entry : reader) {
|
||||
long rankEncodedId = docIdRewriter.rewriteDocId(entry.docId());
|
||||
|
||||
var data = entry.readEntry();
|
||||
for (int i = 0; i + 1 < data.size(); i+=2) {
|
||||
long wordId = data.get(i);
|
||||
long meta = data.get(i+1);
|
||||
|
||||
if (!reader.filter(entry, meta)) {
|
||||
continue;
|
||||
}
|
||||
var pointer = reader.newPointer();
|
||||
while (pointer.nextDocument()) {
|
||||
long rankEncodedId = docIdRewriter.rewriteDocId(pointer.documentId());
|
||||
while (pointer.nextRecord()) {
|
||||
long wordId = pointer.wordId();
|
||||
long wordMeta = pointer.wordMeta();
|
||||
|
||||
long offset = offsetMap.addTo(wordId, RECORD_SIZE_LONGS);
|
||||
|
||||
outArray.set(offset + 0, rankEncodedId);
|
||||
outArray.set(offset + 1, meta);
|
||||
outArray.set(offset + 1, wordMeta);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,7 @@ package nu.marginalia.index.construction;
|
||||
import nu.marginalia.index.journal.model.IndexJournalEntryData;
|
||||
import nu.marginalia.index.journal.model.IndexJournalEntryHeader;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReader;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile;
|
||||
import nu.marginalia.index.journal.writer.IndexJournalWriterSingleFileImpl;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -68,7 +68,7 @@ public class TestJournalFactory {
|
||||
new IndexJournalEntryData(data));
|
||||
}
|
||||
writer.close();
|
||||
var ret = new IndexJournalReaderSingleCompressedFile(jf);
|
||||
var ret = new IndexJournalReaderSingleFile(jf);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -87,7 +87,7 @@ public class TestJournalFactory {
|
||||
new IndexJournalEntryData(data));
|
||||
}
|
||||
writer.close();
|
||||
var ret = new IndexJournalReaderSingleCompressedFile(jf);
|
||||
var ret = new IndexJournalReaderSingleFile(jf);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import nu.marginalia.index.forward.ForwardIndexFileNames;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReader;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.model.id.UrlIdCodec;
|
||||
import nu.marginalia.model.idx.WordFlags;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mq.MqMessage;
|
||||
import nu.marginalia.mq.inbox.MqInboxResponse;
|
||||
@ -31,6 +32,7 @@ import java.sql.SQLException;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.LongPredicate;
|
||||
|
||||
import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX;
|
||||
|
||||
@ -123,12 +125,29 @@ public class IndexConstructorMain {
|
||||
Path tmpDir = indexStaging.asPath().resolve("tmp");
|
||||
if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir);
|
||||
|
||||
LongPredicate wordMetaFilter = getPriorityIndexWordMetaFilter();
|
||||
|
||||
ReverseIndexConstructor.
|
||||
createReverseIndex(heartbeat,
|
||||
IndexJournalReader::singleFileWithPriorityFilters,
|
||||
(path) -> IndexJournalReader.filteringSingleFile(path, wordMetaFilter),
|
||||
indexStaging.asPath(), this::addRank, tmpDir, outputFileDocs, outputFileWords);
|
||||
}
|
||||
|
||||
private static LongPredicate getPriorityIndexWordMetaFilter() {
|
||||
|
||||
long highPriorityFlags =
|
||||
WordFlags.Title.asBit()
|
||||
| WordFlags.Subjects.asBit()
|
||||
| WordFlags.TfIdfHigh.asBit()
|
||||
| WordFlags.NamesWords.asBit()
|
||||
| WordFlags.UrlDomain.asBit()
|
||||
| WordFlags.UrlPath.asBit()
|
||||
| WordFlags.Site.asBit()
|
||||
| WordFlags.SiteAdjacent.asBit();
|
||||
|
||||
return r -> (r & highPriorityFlags) != 0;
|
||||
}
|
||||
|
||||
private void createForwardIndex() throws SQLException, IOException {
|
||||
|
||||
FileStorage indexLive = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE);
|
||||
|
@ -3,14 +3,13 @@ package nu.marginalia.loading.loader;
|
||||
import nu.marginalia.db.storage.FileStorageService;
|
||||
import nu.marginalia.db.storage.model.FileStorage;
|
||||
import nu.marginalia.db.storage.model.FileStorageType;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile;
|
||||
import nu.marginalia.keyword.model.DocumentKeywords;
|
||||
import nu.marginalia.model.idx.DocumentMetadata;
|
||||
import nu.marginallia.index.journal.IndexJournalFileNames;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -18,7 +17,6 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.LongStream;
|
||||
|
||||
@ -64,19 +62,18 @@ class LoaderIndexJournalWriterTest {
|
||||
List<Path> journalFiles =IndexJournalFileNames.findJournalFiles(tempDir);
|
||||
assertEquals(1, journalFiles.size());
|
||||
|
||||
var reader = new IndexJournalReaderSingleCompressedFile(journalFiles.get(0));
|
||||
var reader = new IndexJournalReaderSingleFile(journalFiles.get(0));
|
||||
List<Long> docIds = new ArrayList<>();
|
||||
reader.forEachDocId(docIds::add);
|
||||
assertEquals(List.of(1L, 1L), docIds);
|
||||
|
||||
List<Long> metas = new ArrayList<Long>();
|
||||
reader.forEach(r -> {
|
||||
var entry = r.readEntry();
|
||||
for (int i = 0; i + 1 < entry.size(); i+=2) {
|
||||
entry.get(i);
|
||||
metas.add(entry.get(i+1));
|
||||
var ptr = reader.newPointer();
|
||||
while (ptr.nextDocument()) {
|
||||
while (ptr.nextRecord()) {
|
||||
metas.add(ptr.wordMeta());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
assertEquals(LongStream.of(metadata).boxed().toList(), metas);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user