mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 21:18:58 +00:00
(journal/reverse index) Working WIP fix over-allocation of documents
This commit is contained in:
parent
f74b9df0a7
commit
88ac72c8eb
@ -35,7 +35,7 @@ public interface IndexJournalReader extends Iterable<IndexJournalReadEntry> {
|
||||
| WordFlags.SiteAdjacent.asBit();
|
||||
|
||||
return new IndexJournalReaderSingleCompressedFile(path, null,
|
||||
r -> (r.metadata() & highPriorityFlags) != 0);
|
||||
r -> (r & highPriorityFlags) != 0);
|
||||
}
|
||||
|
||||
void forEachWordId(LongConsumer consumer);
|
||||
@ -48,6 +48,12 @@ public interface IndexJournalReader extends Iterable<IndexJournalReadEntry> {
|
||||
@Override
|
||||
Iterator<IndexJournalReadEntry> iterator();
|
||||
|
||||
boolean filter(IndexJournalReadEntry entry);
|
||||
|
||||
boolean filter(IndexJournalReadEntry entry, IndexJournalEntryData.Record record);
|
||||
|
||||
boolean filter(IndexJournalReadEntry entry, long metadata);
|
||||
|
||||
void close() throws IOException;
|
||||
|
||||
|
||||
|
@ -52,6 +52,21 @@ public class IndexJournalReaderPagingImpl implements IndexJournalReader {
|
||||
return Iterators.concat(readers.stream().map(IndexJournalReader::iterator).iterator());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry) {
|
||||
return readers.get(0).filter(entry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry, IndexJournalEntryData.Record record) {
|
||||
return readers.get(0).filter(entry, record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry, long metadata) {
|
||||
return readers.get(0).filter(entry, metadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
for (var reader : readers) {
|
||||
|
@ -4,17 +4,13 @@ import com.github.luben.zstd.ZstdInputStream;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.index.journal.model.IndexJournalEntryData;
|
||||
import nu.marginalia.index.journal.model.IndexJournalFileHeader;
|
||||
import nu.marginalia.index.journal.model.IndexJournalStatistics;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.roaringbitmap.longlong.Roaring64Bitmap;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.function.IntConsumer;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
@ -31,23 +27,23 @@ public class IndexJournalReaderSingleCompressedFile implements IndexJournalReade
|
||||
private DataInputStream dataInputStream = null;
|
||||
|
||||
final Predicate<IndexJournalReadEntry> entryPredicate;
|
||||
final Predicate<IndexJournalEntryData.Record> recordPredicate;
|
||||
final Predicate<Long> metadataPredicate;
|
||||
|
||||
public IndexJournalReaderSingleCompressedFile(Path file) throws IOException {
|
||||
this.journalFile = file;
|
||||
|
||||
fileHeader = readHeader(file);
|
||||
|
||||
this.recordPredicate = null;
|
||||
this.metadataPredicate = null;
|
||||
this.entryPredicate = null;
|
||||
}
|
||||
|
||||
public IndexJournalReaderSingleCompressedFile(Path file, Predicate<IndexJournalReadEntry> entryPredicate, Predicate<IndexJournalEntryData.Record> recordPredicate) throws IOException {
|
||||
public IndexJournalReaderSingleCompressedFile(Path file, Predicate<IndexJournalReadEntry> entryPredicate, Predicate<Long> metadataPredicate) throws IOException {
|
||||
this.journalFile = file;
|
||||
|
||||
fileHeader = readHeader(file);
|
||||
|
||||
this.recordPredicate = recordPredicate;
|
||||
this.metadataPredicate = metadataPredicate;
|
||||
this.entryPredicate = entryPredicate;
|
||||
}
|
||||
|
||||
@ -69,13 +65,21 @@ public class IndexJournalReaderSingleCompressedFile implements IndexJournalReade
|
||||
return new DataInputStream(new ZstdInputStream(new BufferedInputStream(fileInputStream)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry) {
|
||||
return entryPredicate == null || entryPredicate.test(entry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry, IndexJournalEntryData.Record record) {
|
||||
return (entryPredicate == null || entryPredicate.test(entry))
|
||||
&& (recordPredicate == null || recordPredicate.test(record));
|
||||
&& (metadataPredicate == null || metadataPredicate.test(record.metadata()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filter(IndexJournalReadEntry entry, long metadata) {
|
||||
return (entryPredicate == null || entryPredicate.test(entry))
|
||||
&& (metadataPredicate == null || metadataPredicate.test(metadata));
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
@ -129,19 +133,35 @@ public class IndexJournalReaderSingleCompressedFile implements IndexJournalReade
|
||||
}
|
||||
|
||||
private class JournalEntryIterator implements Iterator<IndexJournalReadEntry> {
|
||||
private int i = 0;
|
||||
private int i = -1;
|
||||
private IndexJournalReadEntry next;
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public boolean hasNext() {
|
||||
return i < fileHeader.fileSize();
|
||||
if (next != null)
|
||||
return true;
|
||||
|
||||
while (++i < fileHeader.fileSize()) {
|
||||
var entry = IndexJournalReadEntry.read(dataInputStream);
|
||||
if (filter(entry)) {
|
||||
next = entry;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public IndexJournalReadEntry next() {
|
||||
i++;
|
||||
return IndexJournalReadEntry.read(dataInputStream);
|
||||
if (hasNext()) {
|
||||
var ret = next;
|
||||
next = null;
|
||||
return ret;
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ public class ReversePreindex {
|
||||
|
||||
Path docsFile = Files.createTempFile(destDir, "docs", ".dat");
|
||||
|
||||
LongArray mergedDocuments = LongArray.mmapForWriting(docsFile, 8 * (left.documents.size() + right.documents.size()));
|
||||
LongArray mergedDocuments = LongArray.mmapForWriting(docsFile, 2 * (left.documents.size() + right.documents.size()));
|
||||
|
||||
leftIter.next();
|
||||
rightIter.next();
|
||||
|
@ -40,7 +40,7 @@ public class ReversePreindexDocuments {
|
||||
logger.info("Transferring data");
|
||||
createUnsortedDocsFile(docsFile, reader, segments, docIdRewriter);
|
||||
|
||||
LongArray docsFileMap = LongArray.mmapForWriting(docsFile, 8 * Files.size(docsFile));
|
||||
LongArray docsFileMap = LongArray.mmapForModifying(docsFile);
|
||||
logger.info("Sorting data");
|
||||
sortDocsFile(docsFileMap, segments);
|
||||
|
||||
@ -64,7 +64,7 @@ public class ReversePreindexDocuments {
|
||||
IndexJournalReader reader,
|
||||
ReversePreindexWordSegments segments,
|
||||
DocIdRewriter docIdRewriter) throws IOException {
|
||||
long fileSize = 8 * segments.totalSize();
|
||||
long fileSize = RECORD_SIZE_LONGS * segments.totalSize();
|
||||
LongArray outArray = LongArray.mmapForWriting(docsFile, fileSize);
|
||||
|
||||
var offsetMap = segments.asMap(RECORD_SIZE_LONGS);
|
||||
@ -78,6 +78,10 @@ public class ReversePreindexDocuments {
|
||||
long wordId = data.get(i);
|
||||
long meta = data.get(i+1);
|
||||
|
||||
if (!reader.filter(entry, meta)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
long offset = offsetMap.addTo(wordId, RECORD_SIZE_LONGS);
|
||||
|
||||
outArray.set(offset + 0, rankEncodedId);
|
||||
|
@ -67,8 +67,7 @@ public class ReversePreindexWordSegments {
|
||||
int i = 0;
|
||||
LongIterator iter = countsMap.keySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
words.set(i, iter.nextLong());
|
||||
i++;
|
||||
words.set(i++, iter.nextLong());
|
||||
}
|
||||
|
||||
// Sort the words file
|
||||
|
Loading…
Reference in New Issue
Block a user