mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 13:09:00 +00:00
(index-reverse) Parallel construction of the reverse indexes. (#52)
* (index-reverse) Parallel construction of the reverse indexes. * (array) Remove wasteful calculation of numDistinct before merging two sorted arrays. * (index-reverse) Force changes to disk on close, reduce logging. * (index-reverse) Clean up merging process and add back logging * (run) Add a conservative default for INDEX_CONSTRUCTION_PROCESS_OPTS's parallelism as it eats a lot of RAM * (index-reverse) Better logging during processing * (array) 2GB+ compatible write() function * (array) 2GB+ compatible write() function * (index-reverse) We are logging like Bolsonaro and I will not have it. * (reverse-index) Self-diagnostics * (btree) Fix bug in btree reader to do with large data sizes
This commit is contained in:
parent
e498c6907a
commit
8e1abc3f10
@ -54,13 +54,14 @@ public class ForwardIndexReader {
|
||||
|
||||
private static TLongIntHashMap loadIds(Path idsFile) throws IOException {
|
||||
try (var idsArray = LongArrayFactory.mmapForReadingShared(idsFile)) {
|
||||
assert idsArray.size() < Integer.MAX_VALUE;
|
||||
|
||||
var ids = new TLongIntHashMap((int) idsArray.size(), 0.5f, -1, -1);
|
||||
// This hash table should be of the same size as the number of documents, so typically less than 1 Gb
|
||||
idsArray.forEach(0, idsArray.size(), (pos, val) -> ids.put(val, (int) pos));
|
||||
|
||||
return ids;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private static LongArray loadData(Path dataFile) throws IOException {
|
||||
@ -93,6 +94,13 @@ public class ForwardIndexReader {
|
||||
private int idxForDoc(long docId) {
|
||||
assert UrlIdCodec.getRank(docId) == 0 : "Forward Index Reader fed dirty reverse index id";
|
||||
|
||||
if (getClass().desiredAssertionStatus()) {
|
||||
long offset = idToOffset.get(docId);
|
||||
if (offset < 0) { // Ideally we'd always check this, but this is a very hot method
|
||||
logger.warn("Could not find offset for doc {}", docId);
|
||||
}
|
||||
}
|
||||
|
||||
return idToOffset.get(docId);
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class ReverseIndexReader {
|
||||
private final LongArray words;
|
||||
@ -40,6 +41,73 @@ public class ReverseIndexReader {
|
||||
|
||||
wordsBTreeReader = new BTreeReader(this.words, ReverseIndexParameters.wordsBTreeContext, 0);
|
||||
wordsDataOffset = wordsBTreeReader.getHeader().dataOffsetLongs();
|
||||
|
||||
if (getClass().desiredAssertionStatus()) {
|
||||
Executors.newSingleThreadExecutor().execute(this::selfTest);
|
||||
}
|
||||
}
|
||||
|
||||
private void selfTest() {
|
||||
logger.info("Running self test program");
|
||||
|
||||
long wordsDataSize = wordsBTreeReader.getHeader().numEntries() * 2L;
|
||||
|
||||
var wordsDataRange = words.range(wordsDataOffset, wordsDataOffset + wordsDataSize);
|
||||
if (!wordsDataRange.isSortedN(2, 0, wordsDataSize))
|
||||
logger.error("Failed test 1: Words data is not sorted");
|
||||
else
|
||||
logger.info("Passed test 1");
|
||||
|
||||
boolean failed2 = false;
|
||||
for (long i = 1; i < wordsDataRange.size(); i+=2) {
|
||||
var docsBTreeReader = new BTreeReader(this.documents, ReverseIndexParameters.docsBTreeContext, wordsDataRange.get(i));
|
||||
var header = docsBTreeReader.getHeader();
|
||||
var docRange = documents.range(header.dataOffsetLongs(), header.dataOffsetLongs() + header.numEntries() * 2L);
|
||||
if (!docRange.isSortedN(2, 0, header.numEntries() * 2L)) {
|
||||
logger.error("Failed test 2: numEntries={}, offset={}", header.numEntries(), header.dataOffsetLongs());
|
||||
failed2 = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!failed2)
|
||||
logger.info("Passed test 2");
|
||||
|
||||
boolean failed3 = false;
|
||||
for (long i = 0; i < wordsDataRange.size(); i+=2) {
|
||||
if (wordOffset(wordsDataRange.get(i)) < 0) {
|
||||
failed3 = true;
|
||||
|
||||
logger.error("Failed test 3");
|
||||
if (wordsBTreeReader.findEntry(wordsDataRange.get(i)) < 0) {
|
||||
logger.error("Scenario A");
|
||||
}
|
||||
else {
|
||||
logger.error("Scenario B");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!failed3) {
|
||||
logger.info("Passed test 3");
|
||||
}
|
||||
|
||||
boolean failed4 = false;
|
||||
outer:
|
||||
for (long i = 1; i < wordsDataRange.size(); i+=2) {
|
||||
var docsBTreeReader = new BTreeReader(this.documents, ReverseIndexParameters.docsBTreeContext, wordsDataRange.get(i));
|
||||
var header = docsBTreeReader.getHeader();
|
||||
var docRange = documents.range(header.dataOffsetLongs(), header.dataOffsetLongs() + header.numEntries() * 2L);
|
||||
for (int j = 0; j < docRange.size(); j+=2) {
|
||||
if (docsBTreeReader.findEntry(docRange.get(j)) < 0) {
|
||||
logger.info("Failed test 4");
|
||||
break outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!failed4) {
|
||||
logger.info("Passed test 4");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -98,6 +166,7 @@ public class ReverseIndexReader {
|
||||
long offset = wordOffset(wordId);
|
||||
|
||||
if (offset < 0) {
|
||||
logger.warn("Missing offset for word {}", wordId);
|
||||
return new long[docIds.length];
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
package nu.marginalia.index.construction;
|
||||
|
||||
import nu.marginalia.process.control.ProcessAdHocTaskHeartbeat;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.process.control.ProcessHeartbeat;
|
||||
import nu.marginallia.index.journal.IndexJournalFileNames;
|
||||
import org.slf4j.Logger;
|
||||
@ -8,28 +8,37 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class ReverseIndexConstructor {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ReverseIndexConstructor.class);
|
||||
|
||||
public enum CreateReverseIndexSteps {
|
||||
CREATE_PREINDEXES,
|
||||
MERGE_PREINDEXES,
|
||||
CONSTRUCT,
|
||||
FINALIZE,
|
||||
FINISHED
|
||||
}
|
||||
public static void createReverseIndex(
|
||||
ProcessHeartbeat processHeartbeat,
|
||||
JournalReaderSource readerSource,
|
||||
Path sourceBaseDir,
|
||||
DocIdRewriter docIdRewriter,
|
||||
Path tmpDir,
|
||||
Path outputFileDocs,
|
||||
Path outputFileWords) throws IOException
|
||||
|
||||
private final Path outputFileDocs;
|
||||
private final Path outputFileWords;
|
||||
private final JournalReaderSource readerSource;
|
||||
private final DocIdRewriter docIdRewriter;
|
||||
private final Path tmpDir;
|
||||
|
||||
public ReverseIndexConstructor(Path outputFileDocs,
|
||||
Path outputFileWords,
|
||||
JournalReaderSource readerSource,
|
||||
DocIdRewriter docIdRewriter,
|
||||
Path tmpDir) {
|
||||
this.outputFileDocs = outputFileDocs;
|
||||
this.outputFileWords = outputFileWords;
|
||||
this.readerSource = readerSource;
|
||||
this.docIdRewriter = docIdRewriter;
|
||||
this.tmpDir = tmpDir;
|
||||
}
|
||||
|
||||
public void createReverseIndex(ProcessHeartbeat processHeartbeat, Path sourceBaseDir) throws IOException
|
||||
{
|
||||
var inputs = IndexJournalFileNames.findJournalFiles(sourceBaseDir);
|
||||
if (inputs.isEmpty()) {
|
||||
@ -38,91 +47,59 @@ public class ReverseIndexConstructor {
|
||||
}
|
||||
|
||||
try (var heartbeat = processHeartbeat.createProcessTaskHeartbeat(CreateReverseIndexSteps.class, "createReverseIndex")) {
|
||||
List<ReversePreindexReference> preindexes = new ArrayList<>();
|
||||
|
||||
heartbeat.progress(CreateReverseIndexSteps.CREATE_PREINDEXES);
|
||||
heartbeat.progress(CreateReverseIndexSteps.CONSTRUCT);
|
||||
|
||||
try (var preindexHeartbeat = processHeartbeat.createAdHocTaskHeartbeat("constructPreindexes")) {
|
||||
for (int i = 0; i < inputs.size(); i++) {
|
||||
var input = inputs.get(i);
|
||||
|
||||
preindexHeartbeat.progress(input.toFile().getName(), i, inputs.size());
|
||||
|
||||
preindexes.add(
|
||||
ReversePreindex
|
||||
.constructPreindex(readerSource.construct(input), docIdRewriter, tmpDir)
|
||||
.closeToReference()
|
||||
);
|
||||
}
|
||||
|
||||
preindexHeartbeat.progress("FINISHED", inputs.size(), inputs.size());
|
||||
AtomicInteger progress = new AtomicInteger(0);
|
||||
inputs
|
||||
.parallelStream()
|
||||
.map(in -> {
|
||||
preindexHeartbeat.progress("PREINDEX/MERGE", progress.incrementAndGet(), inputs.size());
|
||||
return construct(in);
|
||||
})
|
||||
.reduce(this::merge)
|
||||
.ifPresent((index) -> {
|
||||
heartbeat.progress(CreateReverseIndexSteps.FINALIZE);
|
||||
finalizeIndex(index);
|
||||
heartbeat.progress(CreateReverseIndexSteps.FINISHED);
|
||||
});
|
||||
}
|
||||
|
||||
heartbeat.progress(CreateReverseIndexSteps.MERGE_PREINDEXES);
|
||||
ReversePreindex finalPreindex = null;
|
||||
|
||||
try (var mergeHeartbeat = processHeartbeat.createAdHocTaskHeartbeat("mergePreindexes")) {
|
||||
finalPreindex = mergePreindexes(tmpDir, mergeHeartbeat, preindexes)
|
||||
.open();
|
||||
|
||||
heartbeat.progress(CreateReverseIndexSteps.FINALIZE);
|
||||
finalPreindex.finalizeIndex(outputFileDocs, outputFileWords);
|
||||
}
|
||||
finally {
|
||||
if (null != finalPreindex)
|
||||
finalPreindex.delete();
|
||||
}
|
||||
|
||||
heartbeat.progress(CreateReverseIndexSteps.FINISHED);
|
||||
}
|
||||
}
|
||||
|
||||
private static ReversePreindexReference mergePreindexes(Path workDir,
|
||||
ProcessAdHocTaskHeartbeat mergeHeartbeat,
|
||||
List<ReversePreindexReference> preindexes) throws IOException {
|
||||
assert !preindexes.isEmpty();
|
||||
|
||||
if (preindexes.size() == 1) {
|
||||
logger.info("Single preindex, no merge necessary");
|
||||
return preindexes.get(0);
|
||||
}
|
||||
|
||||
LinkedList<ReversePreindexReference> toMerge = new LinkedList<>(preindexes);
|
||||
List<ReversePreindexReference> mergedItems = new ArrayList<>(preindexes.size() / 2);
|
||||
|
||||
int pass = 0;
|
||||
while (toMerge.size() > 1) {
|
||||
String stage = String.format("PASS[%d]: %d -> %d", ++pass, toMerge.size(), toMerge.size()/2 + (toMerge.size() % 2));
|
||||
|
||||
int totalToMergeCount = toMerge.size()/2;
|
||||
int toMergeProgress = 0;
|
||||
|
||||
while (toMerge.size() >= 2) {
|
||||
mergeHeartbeat.progress(stage, toMergeProgress++, totalToMergeCount);
|
||||
|
||||
var left = toMerge.removeFirst().open();
|
||||
var right = toMerge.removeFirst().open();
|
||||
|
||||
mergedItems.add(
|
||||
ReversePreindex
|
||||
.merge(workDir, left, right)
|
||||
.closeToReference()
|
||||
);
|
||||
|
||||
left.delete();
|
||||
right.delete();
|
||||
}
|
||||
|
||||
// Pour the merged items back in the toMerge queue
|
||||
// (note, toMerge may still have a single item in it,
|
||||
// in the case where it had an odd population)
|
||||
toMerge.addAll(mergedItems);
|
||||
mergedItems.clear();
|
||||
}
|
||||
|
||||
mergeHeartbeat.progress("FINISHED", 1, 1);
|
||||
|
||||
return toMerge.getFirst();
|
||||
@SneakyThrows
|
||||
private ReversePreindexReference construct(Path input) {
|
||||
return ReversePreindex
|
||||
.constructPreindex(readerSource.construct(input), docIdRewriter, tmpDir)
|
||||
.closeToReference();
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private ReversePreindexReference merge(ReversePreindexReference leftR, ReversePreindexReference rightR) {
|
||||
|
||||
var left = leftR.open();
|
||||
var right = rightR.open();
|
||||
|
||||
try {
|
||||
return ReversePreindex.merge(tmpDir, left, right).closeToReference();
|
||||
}
|
||||
finally {
|
||||
left.delete();
|
||||
right.delete();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private void finalizeIndex(ReversePreindexReference finalPR) {
|
||||
var finalP = finalPR.open();
|
||||
finalP.finalizeIndex(outputFileDocs, outputFileWords);
|
||||
finalP.delete();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -47,11 +47,8 @@ public class ReversePreindex {
|
||||
Path segmentCountsFile = Files.createTempFile(destDir, "segment_counts", ".dat");
|
||||
Path docsFile = Files.createTempFile(destDir, "docs", ".dat");
|
||||
|
||||
logger.info("Segmenting");
|
||||
var segments = ReversePreindexWordSegments.construct(reader, segmentWordsFile, segmentCountsFile);
|
||||
logger.info("Mapping docs");
|
||||
var docs = ReversePreindexDocuments.construct(docsFile, reader, docIdRewriter, segments);
|
||||
logger.info("Done");
|
||||
return new ReversePreindex(segments, docs);
|
||||
}
|
||||
|
||||
@ -64,6 +61,8 @@ public class ReversePreindex {
|
||||
return new ReversePreindexReference(segments, documents);
|
||||
}
|
||||
finally {
|
||||
segments.force();
|
||||
documents.force();
|
||||
segments.close();
|
||||
documents.close();
|
||||
}
|
||||
@ -94,6 +93,9 @@ public class ReversePreindex {
|
||||
LongArray wordIds = segments.wordIds;
|
||||
|
||||
assert offsets.size() == wordIds.size() : "Offsets and word-ids of different size";
|
||||
if (offsets.size() > Integer.MAX_VALUE) {
|
||||
throw new IllegalStateException("offsets.size() too big!");
|
||||
}
|
||||
|
||||
// Estimate the size of the words index data
|
||||
long wordsSize = ReverseIndexParameters.wordsBTreeContext.calculateSize((int) offsets.size());
|
||||
@ -214,7 +216,7 @@ public class ReversePreindex {
|
||||
LongArray wordIdsFile = LongArrayFactory.mmapForWritingConfined(segmentWordsFile, segmentsSize);
|
||||
|
||||
mergeArrays(wordIdsFile, left.wordIds, right.wordIds,
|
||||
0, wordIdsFile.size(),
|
||||
0,
|
||||
0, left.wordIds.size(),
|
||||
0, right.wordIds.size());
|
||||
|
||||
@ -256,20 +258,14 @@ public class ReversePreindex {
|
||||
LongArray dest,
|
||||
ReversePreindexWordSegments.SegmentConstructionIterator destIter)
|
||||
{
|
||||
long distinct = countDistinctElementsN(2,
|
||||
left.documents,
|
||||
right.documents,
|
||||
leftIter.startOffset, leftIter.endOffset,
|
||||
rightIter.startOffset, rightIter.endOffset);
|
||||
|
||||
mergeArrays2(dest,
|
||||
long segSize = mergeArrays2(dest,
|
||||
left.documents,
|
||||
right.documents,
|
||||
destIter.startOffset,
|
||||
destIter.startOffset + 2*distinct,
|
||||
leftIter.startOffset, leftIter.endOffset,
|
||||
rightIter.startOffset, rightIter.endOffset);
|
||||
|
||||
long distinct = segSize / 2;
|
||||
destIter.putNext(distinct);
|
||||
leftIter.next();
|
||||
rightIter.next();
|
||||
|
@ -36,12 +36,9 @@ public class ReversePreindexDocuments {
|
||||
DocIdRewriter docIdRewriter,
|
||||
ReversePreindexWordSegments segments) throws IOException {
|
||||
|
||||
|
||||
logger.info("Transferring data");
|
||||
createUnsortedDocsFile(docsFile, reader, segments, docIdRewriter);
|
||||
|
||||
LongArray docsFileMap = LongArrayFactory.mmapForModifyingShared(docsFile);
|
||||
logger.info("Sorting data");
|
||||
sortDocsFile(docsFileMap, segments);
|
||||
|
||||
return new ReversePreindexDocuments(docsFileMap, docsFile);
|
||||
@ -110,8 +107,6 @@ public class ReversePreindexDocuments {
|
||||
}
|
||||
|
||||
sortingWorkers.shutdown();
|
||||
logger.info("Awaiting shutdown");
|
||||
|
||||
while (!sortingWorkers.awaitTermination(1, TimeUnit.HOURS));
|
||||
|
||||
sortingWorkers.close();
|
||||
@ -125,4 +120,8 @@ public class ReversePreindexDocuments {
|
||||
public void close() {
|
||||
documents.close();
|
||||
}
|
||||
|
||||
public void force() {
|
||||
documents.force();
|
||||
}
|
||||
}
|
||||
|
@ -6,57 +6,18 @@ import nu.marginalia.array.algo.LongArraySearch;
|
||||
import nu.marginalia.array.algo.LongArraySort;
|
||||
import nu.marginalia.array.algo.LongArrayTransformations;
|
||||
import nu.marginalia.array.delegate.ShiftedLongArray;
|
||||
import nu.marginalia.array.page.LongArrayPage;
|
||||
import nu.marginalia.array.page.PagingLongArray;
|
||||
import nu.marginalia.array.scheme.ArrayPartitioningScheme;
|
||||
import nu.marginalia.array.page.SegmentLongArray;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.lang.foreign.Arena;
|
||||
|
||||
|
||||
public interface LongArray extends LongArrayBase, LongArrayTransformations, LongArraySearch, LongArraySort, AutoCloseable {
|
||||
int WORD_SIZE = 8;
|
||||
|
||||
ArrayPartitioningScheme DEFAULT_PARTITIONING_SCHEME
|
||||
= ArrayPartitioningScheme.forPartitionSize(Integer.getInteger("wmsa.page-size",1<<30) / WORD_SIZE);
|
||||
|
||||
int MAX_CONTINUOUS_SIZE = Integer.MAX_VALUE/WORD_SIZE - 8;
|
||||
|
||||
@Deprecated
|
||||
static LongArray allocate(long size) {
|
||||
if (size < MAX_CONTINUOUS_SIZE) {
|
||||
return LongArrayPage.onHeap((int) size);
|
||||
}
|
||||
|
||||
return PagingLongArray.newOnHeap(DEFAULT_PARTITIONING_SCHEME, size);
|
||||
}
|
||||
|
||||
static LongArray mmapRead(Path path) throws IOException {
|
||||
long sizeBytes = Files.size(path);
|
||||
|
||||
if (sizeBytes < MAX_CONTINUOUS_SIZE) {
|
||||
return LongArrayPage.fromMmapReadOnly(path, 0, (int) sizeBytes / 8);
|
||||
}
|
||||
|
||||
return PagingLongArray.mapFileReadOnly(DEFAULT_PARTITIONING_SCHEME, path);
|
||||
}
|
||||
|
||||
/** Map an existing file for writing */
|
||||
static LongArray mmapForModifying(Path path) throws IOException {
|
||||
long sizeBytes = Files.size(path);
|
||||
assert sizeBytes % WORD_SIZE == 0;
|
||||
|
||||
long size = sizeBytes / WORD_SIZE;
|
||||
|
||||
return mmapForWriting(path, size);
|
||||
}
|
||||
|
||||
static LongArray mmapForWriting(Path path, long size) throws IOException {
|
||||
if (size < MAX_CONTINUOUS_SIZE) {
|
||||
return LongArrayPage.fromMmapReadWrite(path, 0, (int) size);
|
||||
}
|
||||
|
||||
return PagingLongArray.mapFileReadWrite(DEFAULT_PARTITIONING_SCHEME, path, size);
|
||||
return SegmentLongArray.onHeap(Arena.ofShared(), size);
|
||||
}
|
||||
|
||||
default LongArray shifted(long offset) {
|
||||
|
@ -12,106 +12,13 @@ public class TwoArrayOperations {
|
||||
/**
|
||||
* Merge two sorted arrays into a third array, removing duplicates.
|
||||
*/
|
||||
public static long mergeArrays(LongArray out, LongArray a, LongArray b, long outStart, long outEnd, long aStart, long aEnd, long bStart, long bEnd) {
|
||||
|
||||
public static long mergeArrays(LongArray out, LongArray a, LongArray b, long outStart, long aStart, long aEnd, long bStart, long bEnd) {
|
||||
// Ensure that the arrays are sorted and that the output array is large enough
|
||||
if (TwoArrayOperations.class.desiredAssertionStatus()) {
|
||||
assert (a.isSorted(aStart, aEnd));
|
||||
assert (b.isSorted(bStart, bEnd));
|
||||
assert ((outEnd - outStart) >= countDistinctElements(a, b, aStart, aEnd, bStart, bEnd));
|
||||
}
|
||||
|
||||
// Try to get direct access to the arrays if possible, this an order of magnitude faster
|
||||
var directRangeA = a.directRangeIfPossible(aStart, aEnd);
|
||||
var directRangeB = b.directRangeIfPossible(bStart, bEnd);
|
||||
var directRangeOut = out.directRangeIfPossible(outStart, outEnd);
|
||||
|
||||
return mergeArraysDirect(directRangeOut.array(), directRangeA.array(), directRangeB.array(),
|
||||
directRangeOut.start(), directRangeA.start(), directRangeA.end(), directRangeB.start(), directRangeB.end());
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge two sorted arrays into a third array, removing duplicates.
|
||||
* <p>
|
||||
* The operation is performed with a step size of 2. For each pair of values,
|
||||
* only the first is considered to signify a key. The second value is retained along
|
||||
* with the first. In the case of a duplicate, the value associated with array 'a'
|
||||
* is retained, the other is discarded.
|
||||
*
|
||||
*/
|
||||
public static void mergeArrays2(LongArray out, LongArray a, LongArray b,
|
||||
long outStart, long outEnd,
|
||||
long aStart, long aEnd,
|
||||
long bStart, long bEnd)
|
||||
{
|
||||
// Ensure that the arrays are sorted and that the output array is large enough
|
||||
if (TwoArrayOperations.class.desiredAssertionStatus()) {
|
||||
assert (a.isSortedN(2, aStart, aEnd));
|
||||
assert (b.isSortedN(2, bStart, bEnd));
|
||||
assert ((outEnd - outStart) == 2 * countDistinctElementsN(2, a, b, aStart, aEnd, bStart, bEnd));
|
||||
}
|
||||
|
||||
// Try to get direct access to the arrays if possible, this an order of magnitude faster
|
||||
var directRangeA = a.directRangeIfPossible(aStart, aEnd);
|
||||
var directRangeB = b.directRangeIfPossible(bStart, bEnd);
|
||||
var directRangeOut = out.directRangeIfPossible(outStart, outEnd);
|
||||
|
||||
mergeArraysDirect2(directRangeOut.array(), directRangeA.array(), directRangeB.array(),
|
||||
directRangeOut.start(),
|
||||
directRangeA.start(), directRangeA.end(),
|
||||
directRangeB.start(), directRangeB.end());
|
||||
}
|
||||
|
||||
/** For each value in the source array, merge it with the corresponding value in the destination array.
|
||||
*
|
||||
*/
|
||||
public static void mergeArrayValues(LongArray dest, LongArray source, LongBinaryOperator mergeFunction, long destStart, long destEnd, long sourceStart, long sourceEnd) {
|
||||
|
||||
if (TwoArrayOperations.class.desiredAssertionStatus()) {
|
||||
assert (dest.isSortedN(2, destStart, destEnd));
|
||||
assert (source.isSortedN(2, sourceStart, sourceEnd));
|
||||
}
|
||||
|
||||
// Try to get direct access to the arrays if possible, this an order of magnitude faster
|
||||
var destRange = dest.directRangeIfPossible(destStart, destEnd);
|
||||
var sourceRange = source.directRangeIfPossible(sourceStart, sourceEnd);
|
||||
|
||||
mergeArrayValuesDirect(
|
||||
destRange.array(), sourceRange.array(),
|
||||
mergeFunction,
|
||||
destRange.start(), destRange.end(),
|
||||
sourceRange.start(), sourceRange.end());
|
||||
}
|
||||
|
||||
private static void mergeArrayValuesDirect(LongArray dest, LongArray source, LongBinaryOperator mergeFunction, long destStart, long destEnd, long sourceStart, long sourceEnd) {
|
||||
|
||||
long destPos = destStart;
|
||||
long sourcePos = sourceStart;
|
||||
|
||||
while (destPos < destEnd && sourcePos < sourceEnd) {
|
||||
long destVal = dest.get(destPos);
|
||||
long sourceVal = source.get(sourcePos);
|
||||
|
||||
if (destVal < sourceVal) {
|
||||
destPos += 2;
|
||||
} else if (sourceVal < destVal) {
|
||||
sourcePos += 2;
|
||||
} else {
|
||||
long mergedVal = mergeFunction.applyAsLong(dest.get(destPos + 1), source.get(sourcePos + 1));
|
||||
dest.set(destPos + 1, mergedVal);
|
||||
|
||||
destPos += 2;
|
||||
sourcePos += 2;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static long mergeArraysDirect(LongArray out,
|
||||
LongArray a, LongArray b,
|
||||
long outStart,
|
||||
long aStart, long aEnd,
|
||||
long bStart, long bEnd) {
|
||||
long aPos = aStart;
|
||||
long bPos = bStart;
|
||||
long outPos = outStart;
|
||||
@ -166,11 +73,24 @@ public class TwoArrayOperations {
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge two sorted arrays into a third array, step size 2, removing duplicates.
|
||||
* Merge two sorted arrays into a third array, removing duplicates.
|
||||
* <p>
|
||||
* It will prefer the first array if there are duplicates.
|
||||
* The operation is performed with a step size of 2. For each pair of values,
|
||||
* only the first is considered to signify a key. The second value is retained along
|
||||
* with the first. In the case of a duplicate, the value associated with array 'a'
|
||||
* is retained, the other is discarded.
|
||||
*
|
||||
*/
|
||||
private static void mergeArraysDirect2(LongArray out, LongArray a, LongArray b, long outStart, long aStart, long aEnd, long bStart, long bEnd) {
|
||||
public static long mergeArrays2(LongArray out, LongArray a, LongArray b,
|
||||
long outStart,
|
||||
long aStart, long aEnd,
|
||||
long bStart, long bEnd)
|
||||
{
|
||||
if (TwoArrayOperations.class.desiredAssertionStatus()) {
|
||||
assert (a.isSortedN(2, aStart, aEnd));
|
||||
assert (b.isSortedN(2, bStart, bEnd));
|
||||
}
|
||||
|
||||
long aPos = aStart;
|
||||
long bPos = bStart;
|
||||
long outPos = outStart;
|
||||
@ -232,10 +152,10 @@ public class TwoArrayOperations {
|
||||
lastValue = val;
|
||||
}
|
||||
}
|
||||
|
||||
return outPos - outStart;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Count the number of distinct elements in two sorted arrays.
|
||||
*/
|
||||
|
@ -133,8 +133,11 @@ public class SegmentLongArray implements PartitionPage, LongArray {
|
||||
|
||||
@Override
|
||||
public void write(Path filename) throws IOException {
|
||||
try (var channel = (FileChannel) Files.newByteChannel(filename, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
|
||||
write(channel);
|
||||
try (var arena = Arena.ofConfined()) {
|
||||
var destSegment = SegmentLongArray.fromMmapReadWrite(arena, filename, 0, segment.byteSize());
|
||||
|
||||
destSegment.segment.copyFrom(segment);
|
||||
destSegment.force();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,111 +0,0 @@
|
||||
package nu.marginalia.array;
|
||||
|
||||
import nu.marginalia.array.algo.SortingContext;
|
||||
import nu.marginalia.array.page.PagingIntArray;
|
||||
import nu.marginalia.array.page.PagingLongArray;
|
||||
import nu.marginalia.array.scheme.SequentialPartitioningScheme;
|
||||
import nu.marginalia.util.test.TestUtil;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
class PagingIntArrayTest {
|
||||
Path tempDir;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws IOException {
|
||||
tempDir = Files.createTempDirectory(getClass().getSimpleName());
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
TestUtil.clearTempDir(tempDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demo() throws IOException {
|
||||
var array =
|
||||
LongArray.mmapForWriting(Path.of("/tmp/test"), 1<<16);
|
||||
|
||||
array.transformEach(50, 1000, (pos, val) -> Long.hashCode(pos));
|
||||
array.quickSort(50, 1000);
|
||||
if (array.binarySearch(array.get(100), 50, 1000) >= 0) {
|
||||
System.out.println("Nevermind, I found it!");
|
||||
}
|
||||
array.range(50, 1000).fill(0, 950, 1);
|
||||
array.forEach(0, 100, (pos, val) -> {
|
||||
System.out.println(pos + ":" + val);
|
||||
});
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testReadLoad() throws IOException {
|
||||
SequentialPartitioningScheme partitioningScheme = new SequentialPartitioningScheme(7);
|
||||
Path file = Files.createTempFile(tempDir, "test", "dat");
|
||||
|
||||
var heapArray = PagingIntArray.newOnHeap(partitioningScheme, 51);
|
||||
for (int i = 0; i < 51; i++) {
|
||||
heapArray.set(i, 2 * i);
|
||||
}
|
||||
heapArray.write(file);
|
||||
|
||||
|
||||
var diskArray = PagingIntArray.mapFileReadOnly(partitioningScheme, file);
|
||||
for (int i = 0; i < 51; i++) {
|
||||
assertEquals(2 * i, diskArray.get(i));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadLoadLong() throws IOException {
|
||||
SequentialPartitioningScheme partitioningScheme = new SequentialPartitioningScheme(7);
|
||||
Path file = Files.createTempFile(tempDir, "test", "dat");
|
||||
|
||||
var heapArray = PagingLongArray.newOnHeap(partitioningScheme, 51);
|
||||
for (int i = 0; i < 51; i++) {
|
||||
heapArray.set(i, 2 * i);
|
||||
}
|
||||
heapArray.write(file);
|
||||
|
||||
|
||||
var diskArray = PagingLongArray.mapFileReadOnly(partitioningScheme, file);
|
||||
for (int i = 0; i < 51; i++) {
|
||||
assertEquals(2 * i, diskArray.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadFromFileChannel() throws IOException {
|
||||
SequentialPartitioningScheme partitioningScheme = new SequentialPartitioningScheme(7);
|
||||
Path file = Files.createTempFile(tempDir, "test", "dat");
|
||||
|
||||
var heapArray = PagingLongArray.newOnHeap(partitioningScheme, 51);
|
||||
for (int i = 0; i < 51; i++) {
|
||||
heapArray.set(i, 2 * i);
|
||||
}
|
||||
heapArray.write(file);
|
||||
|
||||
try (var channel = (FileChannel) Files.newByteChannel(file, StandardOpenOption.READ)) {
|
||||
|
||||
var heapArray2 = PagingLongArray.newOnHeap(partitioningScheme, 51);
|
||||
heapArray2.transferFrom(channel, 10, 7, 20);
|
||||
|
||||
var heapArray3 = PagingLongArray.newPartitionedOnHeap(partitioningScheme, 51);
|
||||
heapArray3.transferFrom(channel, 10, 7, 20);
|
||||
|
||||
for (int i = 0; i < 51; i++) {
|
||||
System.out.println(i + ":" + heapArray2.get(i));
|
||||
assertEquals(heapArray3.get(i), heapArray2.get(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -3,11 +3,9 @@ package nu.marginalia.array.algo;
|
||||
import com.google.common.collect.Sets;
|
||||
import nu.marginalia.array.LongArray;
|
||||
import nu.marginalia.array.LongArrayFactory;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
|
||||
@ -24,7 +22,7 @@ class TwoArrayOperationsTest {
|
||||
b.set(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30);
|
||||
|
||||
LongArray out = LongArrayFactory.onHeapShared(TwoArrayOperations.countDistinctElements(a, b, 0, 10, 0, 15));
|
||||
TwoArrayOperations.mergeArrays(out, a, b, 0, out.size(), 0, 10, 0, 15);
|
||||
assertEquals(out.size(), TwoArrayOperations.mergeArrays(out, a, b, 0, 0, 10, 0, 15));
|
||||
|
||||
long[] values = new long[15];
|
||||
out.get(0, 15, values);
|
||||
@ -65,45 +63,6 @@ class TwoArrayOperationsTest {
|
||||
assertEquals(expected, TwoArrayOperations.countDistinctElements(a, b, 5, 5, 0, 15));
|
||||
}
|
||||
|
||||
@Test
|
||||
void mergeArrayValues() {
|
||||
// create two arrays with associated values
|
||||
// these must be sorted in the odd positions
|
||||
|
||||
long[] aVals = new long[] { 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10 };
|
||||
long[] bVals = new long[] { 2, 1, 4, 2, 6, 3, 8, 4, 10, 5, 12, 6, 14, 7, 16, 8, 18, 9, 20, 10, 22, 11, 24, 12, 26, 13, 28, 14, 30, 15 };
|
||||
|
||||
LongArray a = LongArrayFactory.onHeapShared(20);
|
||||
LongArray b = LongArrayFactory.onHeapShared(30);
|
||||
|
||||
a.set(0, aVals);
|
||||
b.set(0, bVals);
|
||||
|
||||
// merge b's associated values into a
|
||||
TwoArrayOperations.mergeArrayValues(a, b, Long::sum, 0, 20, 0, 30);
|
||||
|
||||
// fetch the values back into aVals
|
||||
a.get(0, 20, aVals);
|
||||
|
||||
var map = new HashMap<Long, Long>();
|
||||
for (int i = 0; i < aVals.length; i+=2) {
|
||||
map.put(aVals[i], aVals[i+1]);
|
||||
}
|
||||
|
||||
// aVals contained the keys 1..10, and bVals contained the keys 2..30
|
||||
// aVals' values were the same as the keys, but bVals' values were half the keys' values
|
||||
// the merged values should be the sum of the two values in even positions,
|
||||
// and the same as the keys in odd positions
|
||||
map.forEach((k,v) -> {
|
||||
if (k % 2 == 0) {
|
||||
Assertions.assertEquals(2 * v, 3*k);
|
||||
}
|
||||
else {
|
||||
Assertions.assertEquals(v, k);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountMerge() {
|
||||
LongArray a = LongArrayFactory.onHeapShared(1024);
|
||||
@ -115,7 +74,7 @@ class TwoArrayOperationsTest {
|
||||
|
||||
long distinctSize = TwoArrayOperations.countDistinctElements(a, b, 0, 1024, 0, 512);
|
||||
|
||||
long mergedSize = TwoArrayOperations.mergeArrays(c, a, b, 0, 1024+512, 0, 1024, 0, 512);
|
||||
long mergedSize = TwoArrayOperations.mergeArrays(c, a, b, 0, 0, 1024, 0, 512);
|
||||
|
||||
assertEquals(distinctSize, mergedSize);
|
||||
|
||||
@ -136,7 +95,7 @@ class TwoArrayOperationsTest {
|
||||
System.out.println(numDistinct);
|
||||
System.out.println(numDistinct);
|
||||
|
||||
TwoArrayOperations.mergeArrays2(out, left, right, 0, 4, 0, 4, 0, 2);
|
||||
assertEquals(out.size(), TwoArrayOperations.mergeArrays2(out, left, right, 0, 0, 4, 0, 2));
|
||||
|
||||
System.out.println(Arrays.toString(longArrayToJavaArray(out)));
|
||||
|
||||
|
@ -1,89 +0,0 @@
|
||||
package nu.marginalia.btree;
|
||||
|
||||
import nu.marginalia.array.algo.TwoArrayOperations;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.LongBinaryOperator;
|
||||
|
||||
/** Functions for merging btrees.
|
||||
*
|
||||
*/
|
||||
public class BTreeMerger {
|
||||
|
||||
/** Merge two BTrees into a new BTree. The two BTrees must have an entry size of 1.
|
||||
*
|
||||
* @return the size of the written data
|
||||
*/
|
||||
public static long merge1(BTreeReader left,
|
||||
BTreeReader right,
|
||||
BTreeWriter writer,
|
||||
long writeOffset) throws IOException
|
||||
{
|
||||
assert left.ctx.entrySize == 1;
|
||||
assert right.ctx.entrySize == 1;
|
||||
|
||||
final long size = TwoArrayOperations.countDistinctElements(
|
||||
left.data(),
|
||||
right.data(),
|
||||
0, left.numEntries(),
|
||||
0, right.numEntries()
|
||||
);
|
||||
|
||||
int numEntries = (int) size;
|
||||
|
||||
return writer.write(writeOffset, numEntries, slice -> {
|
||||
long end = TwoArrayOperations.mergeArrays(slice, left.data(), right.data(),
|
||||
0, numEntries,
|
||||
0, left.numEntries(),
|
||||
0, right.numEntries()
|
||||
);
|
||||
assert end == numEntries;
|
||||
});
|
||||
}
|
||||
|
||||
/** Merge two BTrees into a new BTree. The two BTrees must have an entry size of 2.
|
||||
* The merge function is applied to the values of the two BTrees.
|
||||
*
|
||||
* Caveat: This function merges the common values into the left tree before merging the two trees.
|
||||
*
|
||||
* @return the size of the written data
|
||||
*/
|
||||
public static long merge2(BTreeReader left,
|
||||
BTreeReader right,
|
||||
BTreeWriter writer,
|
||||
LongBinaryOperator mergeFunction,
|
||||
long writeOffset) throws IOException
|
||||
{
|
||||
assert left.ctx.entrySize == 2;
|
||||
assert right.ctx.entrySize == 2;
|
||||
|
||||
final long size = TwoArrayOperations.countDistinctElementsN(2,
|
||||
left.data(), right.data(),
|
||||
0, left.data().size(),
|
||||
0, right.data().size()
|
||||
);
|
||||
|
||||
int numEntries = (int) size;
|
||||
|
||||
long leftSize = left.data().size();
|
||||
long rightSize = right.data().size();
|
||||
|
||||
// Merge the common values into the left tree
|
||||
TwoArrayOperations.mergeArrayValues(
|
||||
left.data(),
|
||||
right.data(),
|
||||
mergeFunction,
|
||||
0, leftSize,
|
||||
0, rightSize);
|
||||
|
||||
return writer.write(writeOffset, numEntries, slice -> {
|
||||
TwoArrayOperations.mergeArrays2(slice,
|
||||
left.data(),
|
||||
right.data(),
|
||||
0, 2 * size,
|
||||
0, leftSize,
|
||||
0, rightSize);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
@ -27,6 +27,7 @@ public class BTreeReader {
|
||||
index = file.range(header.indexOffsetLongs(), header.dataOffsetLongs());
|
||||
data = file.range(header.dataOffsetLongs(), header.dataOffsetLongs() + dataBlockEnd);
|
||||
|
||||
assert file.size() >= header.dataOffsetLongs() + dataBlockEnd;
|
||||
}
|
||||
|
||||
LongArray data() {
|
||||
@ -213,7 +214,7 @@ public class BTreeReader {
|
||||
|
||||
final long searchStart = layerOffsets[layer] + offset;
|
||||
|
||||
final long nextLayerOffset = (int) index.binarySearchUpperBound(key, searchStart, searchStart + ctx.pageSize()) - searchStart;
|
||||
final long nextLayerOffset = index.binarySearchUpperBound(key, searchStart, searchStart + ctx.pageSize()) - searchStart;
|
||||
|
||||
layer --;
|
||||
boundary = index.get(searchStart + nextLayerOffset);
|
||||
@ -253,7 +254,7 @@ public class BTreeReader {
|
||||
? remainingTotal
|
||||
: (long) ctx.pageSize() * ctx.entrySize;
|
||||
|
||||
long searchEnd = searchStart + (int) min(remainingTotal, remainingBlock);
|
||||
long searchEnd = searchStart + min(remainingTotal, remainingBlock);
|
||||
|
||||
return data.binarySearchN(ctx.entrySize, key, searchStart, searchEnd);
|
||||
}
|
||||
@ -271,7 +272,7 @@ public class BTreeReader {
|
||||
long remainingTotal = dataBlockEnd - dataOffset;
|
||||
long remainingBlock = ctx.pageSize() - relOffset;
|
||||
|
||||
long searchEnd = dataOffset + (int) min(remainingTotal, remainingBlock);
|
||||
long searchEnd = dataOffset + min(remainingTotal, remainingBlock);
|
||||
|
||||
data.retainN(buffer, ctx.entrySize, boundary, dataOffset, searchEnd);
|
||||
}
|
||||
@ -295,7 +296,7 @@ public class BTreeReader {
|
||||
long remainingTotal = dataBlockEnd - dataOffset;
|
||||
long remainingBlock = ctx.pageSize() - relOffset;
|
||||
|
||||
long searchEnd = dataOffset + (int) min(remainingTotal, remainingBlock);
|
||||
long searchEnd = dataOffset + min(remainingTotal, remainingBlock);
|
||||
|
||||
data.rejectN(buffer, ctx.entrySize, boundary, dataOffset, searchEnd);
|
||||
}
|
||||
|
@ -1,214 +0,0 @@
|
||||
package nu.marginalia.btree;
|
||||
|
||||
import nu.marginalia.array.LongArray;
|
||||
import nu.marginalia.array.algo.TwoArrayOperations;
|
||||
import nu.marginalia.array.delegate.ShiftedLongArray;
|
||||
import nu.marginalia.btree.model.BTreeBlockSize;
|
||||
import nu.marginalia.btree.model.BTreeContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.function.LongUnaryOperator;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class BTreeMergerTest {
|
||||
|
||||
@Test
|
||||
void merge1Vanilla() throws IOException {
|
||||
BTreeContext ctx = new BTreeContext(4, 1, BTreeBlockSize.BS_64);
|
||||
|
||||
LongArray a = LongArray.allocate(ctx.calculateSize(1024));
|
||||
LongArray b = LongArray.allocate(ctx.calculateSize(512));
|
||||
|
||||
new BTreeWriter(a, ctx).write(0, 1024, generate(i -> 4*i));
|
||||
new BTreeWriter(b, ctx).write(0, 512, generate(i -> 3*i));
|
||||
|
||||
var aReader = new BTreeReader(a, ctx, 0);
|
||||
var bReader = new BTreeReader(b, ctx, 0);
|
||||
long cSize = ctx.calculateSize(1024 + 512);
|
||||
LongArray c = LongArray.allocate(cSize);
|
||||
|
||||
long written = BTreeMerger.merge1(aReader, bReader, new BTreeWriter(c, ctx), 0);
|
||||
|
||||
assertTrue(cSize >= written);
|
||||
|
||||
BTreeReader cReader = new BTreeReader(c, ctx, 0);
|
||||
|
||||
// Check that the number of entries is correct
|
||||
assertEquals(cReader.numEntries(), TwoArrayOperations.countDistinctElements(
|
||||
aReader.data(), bReader.data(),
|
||||
0, aReader.numEntries(),
|
||||
0, bReader.numEntries()));
|
||||
|
||||
// Check that all values are present
|
||||
for (int i = 0; i < 1024*5; i++) {
|
||||
boolean expectTrue = false;
|
||||
if (i / 4 < 1024 && i % 4 == 0) {
|
||||
expectTrue = true;
|
||||
}
|
||||
if (i / 3 < 512 && i % 3 == 0) {
|
||||
expectTrue = true;
|
||||
}
|
||||
|
||||
assertEquals(expectTrue, cReader.findEntry(i) >= 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void merge1OneEmpty() throws IOException {
|
||||
BTreeContext ctx = new BTreeContext(4, 1, BTreeBlockSize.BS_64);
|
||||
|
||||
LongArray a = LongArray.allocate(ctx.calculateSize(1024));
|
||||
LongArray b = LongArray.allocate(ctx.calculateSize(10));
|
||||
|
||||
new BTreeWriter(a, ctx).write(0, 1024, generate((i -> 4*i)));
|
||||
new BTreeWriter(b, ctx).write(0, 0, generate((i -> 3*i)));
|
||||
|
||||
var aReader = new BTreeReader(a, ctx, 0);
|
||||
var bReader = new BTreeReader(b, ctx, 0);
|
||||
long cSize = ctx.calculateSize(1024 + 512);
|
||||
LongArray c = LongArray.allocate(cSize);
|
||||
|
||||
long written = BTreeMerger.merge1(aReader, bReader, new BTreeWriter(c, ctx), 0);
|
||||
|
||||
assertTrue(cSize >= written);
|
||||
|
||||
BTreeReader cReader = new BTreeReader(c, ctx, 0);
|
||||
|
||||
// Check that the number of entries is correct
|
||||
assertEquals(cReader.numEntries(), TwoArrayOperations.countDistinctElements(
|
||||
aReader.data(), bReader.data(),
|
||||
0, aReader.numEntries(),
|
||||
0, bReader.numEntries()));
|
||||
|
||||
// Check that all values are present
|
||||
for (int i = 0; i < 1024*5; i++) {
|
||||
boolean expectTrue = false;
|
||||
if (i / 4 < 1024 && i % 4 == 0) {
|
||||
expectTrue = true;
|
||||
}
|
||||
|
||||
assertEquals(expectTrue, cReader.findEntry(i) >= 0);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void merge2Vanilla() throws IOException {
|
||||
BTreeContext ctx = new BTreeContext(4, 2, BTreeBlockSize.BS_64);
|
||||
|
||||
LongArray a = LongArray.allocate(ctx.calculateSize(1024));
|
||||
LongArray b = LongArray.allocate(ctx.calculateSize(512));
|
||||
|
||||
new BTreeWriter(a, ctx).write(0, 512, generate(i -> i, i -> 2*i));
|
||||
new BTreeWriter(b, ctx).write(0, 256, generate(i -> 2*i, i -> 6*i));
|
||||
|
||||
long cSize = ctx.calculateSize(1024 + 512);
|
||||
var aReader = new BTreeReader(a, ctx, 0);
|
||||
var bReader = new BTreeReader(b, ctx, 0);
|
||||
|
||||
LongArray c = LongArray.allocate(cSize);
|
||||
BTreeMerger.merge2(aReader, bReader, new BTreeWriter(c, ctx), Long::sum, 0);
|
||||
|
||||
BTreeReader cReader = new BTreeReader(c, ctx, 0);
|
||||
|
||||
for (int i = 0; i < 512; i++) {
|
||||
long offset = cReader.findEntry(i);
|
||||
assertTrue(offset >= 0);
|
||||
|
||||
long data = cReader.data().get(offset + 1);
|
||||
|
||||
if (i % 2 == 0) {
|
||||
assertEquals(5*i, data);
|
||||
} else {
|
||||
assertEquals(2*i, data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void merge2LeftEmpty() throws IOException {
|
||||
BTreeContext ctx = new BTreeContext(4, 2, BTreeBlockSize.BS_64);
|
||||
|
||||
LongArray a = LongArray.allocate(ctx.calculateSize(0));
|
||||
LongArray b = LongArray.allocate(ctx.calculateSize(512));
|
||||
|
||||
new BTreeWriter(a, ctx).write(0, 0, generate(i -> i, i -> 2*i));
|
||||
new BTreeWriter(b, ctx).write(0, 256, generate(i -> 2*i, i -> 6*i));
|
||||
|
||||
long cSize = ctx.calculateSize(256);
|
||||
var aReader = new BTreeReader(a, ctx, 0);
|
||||
var bReader = new BTreeReader(b, ctx, 0);
|
||||
|
||||
LongArray c = LongArray.allocate(cSize);
|
||||
long mergedSize = BTreeMerger.merge2(aReader, bReader, new BTreeWriter(c, ctx), Long::sum, 0);
|
||||
assertEquals(cSize, mergedSize);
|
||||
|
||||
BTreeReader cReader = new BTreeReader(c, ctx, 0);
|
||||
System.out.println(Arrays.toString(((ShiftedLongArray) cReader.data()).toArray()));
|
||||
for (int i = 0; i < 256; i++) {
|
||||
long offset = cReader.findEntry(2 * i);
|
||||
assertTrue(offset >= 0);
|
||||
|
||||
long data = cReader.data().get(offset + 1);
|
||||
|
||||
assertEquals(6*i, data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void merge2RightEmpty() throws IOException {
|
||||
BTreeContext ctx = new BTreeContext(4, 2, BTreeBlockSize.BS_64);
|
||||
|
||||
LongArray a = LongArray.allocate(ctx.calculateSize(0));
|
||||
LongArray b = LongArray.allocate(ctx.calculateSize(512));
|
||||
|
||||
new BTreeWriter(a, ctx).write(0, 0, generate(i -> i, i -> 2*i));
|
||||
|
||||
new BTreeWriter(b, ctx).write(0, 256, generate(i -> 2*i, i -> 6*i));
|
||||
|
||||
long cSize = ctx.calculateSize(256);
|
||||
var aReader = new BTreeReader(a, ctx, 0);
|
||||
var bReader = new BTreeReader(b, ctx, 0);
|
||||
|
||||
LongArray c = LongArray.allocate(cSize);
|
||||
|
||||
|
||||
// v-- swapped --v
|
||||
long mergedSize = BTreeMerger.merge2(bReader, aReader, new BTreeWriter(c, ctx), Long::sum, 0);
|
||||
assertEquals(cSize, mergedSize);
|
||||
|
||||
BTreeReader cReader = new BTreeReader(c, ctx, 0);
|
||||
for (int i = 0; i < 256; i++) {
|
||||
long offset = cReader.findEntry(2 * i);
|
||||
assertTrue(offset >= 0);
|
||||
|
||||
long data = cReader.data().get(offset + 1);
|
||||
assertEquals(6*i, data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Generate a BTree callback that will populate the slice with the values generated by the given generator.
|
||||
*/
|
||||
BTreeWriteCallback generate(LongUnaryOperator generator) {
|
||||
return slice -> slice.transformEach(0, slice.size(), (i, v) -> generator.applyAsLong(i));
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a BTree callback that will populate the slice with the keys and values generated by the given generators.
|
||||
*/
|
||||
BTreeWriteCallback generate(LongUnaryOperator keyGen, LongUnaryOperator valGen) {
|
||||
return slice -> {
|
||||
for (int i = 0; i < slice.size(); i+=2) {
|
||||
slice.set(i, keyGen.applyAsLong(i/2));
|
||||
slice.set(i+1, valGen.applyAsLong(i/2));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
@ -1,9 +1,11 @@
|
||||
package nu.marginalia.btree;
|
||||
|
||||
import nu.marginalia.array.LongArray;
|
||||
import nu.marginalia.array.LongArrayFactory;
|
||||
import nu.marginalia.btree.model.BTreeBlockSize;
|
||||
import nu.marginalia.btree.model.BTreeContext;
|
||||
import nu.marginalia.btree.model.BTreeHeader;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -132,6 +134,31 @@ class BTreeWriterTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled // This test creates a 16 GB file in tmp
|
||||
public void veryLargeBTreeTest() throws IOException {
|
||||
var wordsBTreeContext = new BTreeContext(5, 2, BTreeBlockSize.BS_2048);
|
||||
Path file = Path.of("/tmp/large.dat");
|
||||
try (var la = LongArrayFactory.mmapForWritingConfined(file, wordsBTreeContext.calculateSize(1024*1024*1024))) {
|
||||
new BTreeWriter(la, wordsBTreeContext)
|
||||
.write(0, 1024*1024*1024, wc -> {
|
||||
for (long i = 0; i < 1024*1024*1024; i++) {
|
||||
wc.set(2*i, i);
|
||||
wc.set(2*i + 1, -i);
|
||||
}
|
||||
});
|
||||
System.out.println("Wrote");
|
||||
var reader = new BTreeReader(la, wordsBTreeContext, 0);
|
||||
|
||||
for (int i = 0; i < 1204*1204*1024; i++) {
|
||||
long offset = reader.findEntry(i);
|
||||
assertEquals(2L*i, offset);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
Files.delete(file);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteEqualityNotMasked() throws IOException {
|
||||
|
@ -104,15 +104,11 @@ public class IndexConstructorMain {
|
||||
if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir);
|
||||
|
||||
|
||||
ReverseIndexConstructor.
|
||||
createReverseIndex(
|
||||
heartbeat,
|
||||
IndexJournalReader::singleFile,
|
||||
indexStaging.asPath(),
|
||||
this::addRankToIdEncoding,
|
||||
tmpDir,
|
||||
outputFileDocs,
|
||||
outputFileWords);
|
||||
new ReverseIndexConstructor(outputFileDocs, outputFileWords,
|
||||
IndexJournalReader::singleFile,
|
||||
this::addRankToIdEncoding, tmpDir)
|
||||
.createReverseIndex(heartbeat, indexStaging.asPath());
|
||||
|
||||
}
|
||||
|
||||
private void createPrioReverseIndex() throws SQLException, IOException {
|
||||
@ -130,13 +126,10 @@ public class IndexConstructorMain {
|
||||
// important to the document. This filter will act on the encoded {@see WordMetadata}
|
||||
LongPredicate wordMetaFilter = getPriorityIndexWordMetaFilter();
|
||||
|
||||
ReverseIndexConstructor.
|
||||
createReverseIndex(heartbeat,
|
||||
(path) -> IndexJournalReader
|
||||
.singleFile(path)
|
||||
.filtering(wordMetaFilter),
|
||||
indexStaging.asPath(),
|
||||
this::addRankToIdEncoding, tmpDir, outputFileDocs, outputFileWords);
|
||||
new ReverseIndexConstructor(outputFileDocs, outputFileWords,
|
||||
(path) -> IndexJournalReader.singleFile(path).filtering(wordMetaFilter),
|
||||
this::addRankToIdEncoding, tmpDir)
|
||||
.createReverseIndex(heartbeat, indexStaging.asPath());
|
||||
}
|
||||
|
||||
private static LongPredicate getPriorityIndexWordMetaFilter() {
|
||||
|
@ -13,15 +13,17 @@ import nu.marginalia.index.index.SearchIndex;
|
||||
import nu.marginalia.index.svc.SearchTermsService;
|
||||
import nu.marginalia.model.idx.WordMetadata;
|
||||
import nu.marginalia.ranking.ResultValuator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.OptionalInt;
|
||||
|
||||
public class IndexMetadataService {
|
||||
private final SearchIndex index;
|
||||
private final SearchTermsService searchTermsService;
|
||||
private final ResultValuator searchResultValuator;
|
||||
private static final Logger logger = LoggerFactory.getLogger(IndexMetadataService.class);
|
||||
|
||||
@Inject
|
||||
public IndexMetadataService(SearchIndex index,
|
||||
@ -123,8 +125,10 @@ public class IndexMetadataService {
|
||||
public long getTermMetadata(long termId, long docId) {
|
||||
var docsForTerm = termdocToMeta.get(termId);
|
||||
if (docsForTerm == null) {
|
||||
logger.warn("Missing meta for term {}", termId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return docsForTerm.getOrDefault(docId, 0);
|
||||
}
|
||||
|
||||
|
@ -221,9 +221,8 @@ public class IndexQueryServiceIntegrationSmokeTest {
|
||||
Path tmpDir = indexStaging.asPath().resolve("tmp");
|
||||
if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir);
|
||||
|
||||
|
||||
ReverseIndexConstructor.
|
||||
createReverseIndex(new FakeProcessHeartbeat(), IndexJournalReader::singleFile, indexStaging.asPath(), DocIdRewriter.identity(), tmpDir, outputFileDocs, outputFileWords);
|
||||
new ReverseIndexConstructor(outputFileDocs, outputFileWords, IndexJournalReader::singleFile, DocIdRewriter.identity(), tmpDir)
|
||||
.createReverseIndex(new FakeProcessHeartbeat(), indexStaging.asPath());
|
||||
}
|
||||
|
||||
private void createPrioReverseIndex() throws SQLException, IOException {
|
||||
@ -237,8 +236,8 @@ public class IndexQueryServiceIntegrationSmokeTest {
|
||||
Path tmpDir = indexStaging.asPath().resolve("tmp");
|
||||
if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir);
|
||||
|
||||
ReverseIndexConstructor.
|
||||
createReverseIndex(new FakeProcessHeartbeat(), IndexJournalReader::singleFile, indexStaging.asPath(), DocIdRewriter.identity(), tmpDir, outputFileDocs, outputFileWords);
|
||||
new ReverseIndexConstructor(outputFileDocs, outputFileWords, IndexJournalReader::singleFile, DocIdRewriter.identity(), tmpDir)
|
||||
.createReverseIndex(new FakeProcessHeartbeat(), indexStaging.asPath());
|
||||
}
|
||||
|
||||
private void createForwardIndex() throws SQLException, IOException {
|
||||
|
@ -490,9 +490,8 @@ public class IndexQueryServiceIntegrationTest {
|
||||
Path tmpDir = indexStaging.asPath().resolve("tmp");
|
||||
if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir);
|
||||
|
||||
|
||||
ReverseIndexConstructor.
|
||||
createReverseIndex(new FakeProcessHeartbeat(), IndexJournalReader::singleFile, indexStaging.asPath(), DocIdRewriter.identity(), tmpDir, outputFileDocs, outputFileWords);
|
||||
new ReverseIndexConstructor(outputFileDocs, outputFileWords, IndexJournalReader::singleFile, DocIdRewriter.identity(), tmpDir)
|
||||
.createReverseIndex(new FakeProcessHeartbeat(), indexStaging.asPath());
|
||||
}
|
||||
|
||||
private void createPrioReverseIndex() throws SQLException, IOException {
|
||||
@ -506,8 +505,8 @@ public class IndexQueryServiceIntegrationTest {
|
||||
Path tmpDir = indexStaging.asPath().resolve("tmp");
|
||||
if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir);
|
||||
|
||||
ReverseIndexConstructor.
|
||||
createReverseIndex(new FakeProcessHeartbeat(), IndexJournalReader::singleFile, indexStaging.asPath(), DocIdRewriter.identity(), tmpDir, outputFileDocs, outputFileWords);
|
||||
new ReverseIndexConstructor(outputFileDocs, outputFileWords, IndexJournalReader::singleFile, DocIdRewriter.identity(), tmpDir)
|
||||
.createReverseIndex(new FakeProcessHeartbeat(), indexStaging.asPath());
|
||||
}
|
||||
|
||||
private void createForwardIndex() throws SQLException, IOException {
|
||||
|
2
run/env/service.env
vendored
2
run/env/service.env
vendored
@ -3,4 +3,4 @@ CONTROL_SERVICE_OPTS="-DdistPath=/dist"
|
||||
CONVERTER_PROCESS_OPTS="-ea -Dservice-host=0.0.0.0 -ea -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=4001 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -XX:StartFlightRecording:dumponexit=true,filename=/samples/converter.jfr"
|
||||
CRAWLER_PROCESS_OPTS="-Dservice-host=0.0.0.0 -ea -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=4001 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -XX:StartFlightRecording:dumponexit=true,filename=/samples/crawler.jfr"
|
||||
LOADER_PROCESS_OPTS="-Dservice-host=0.0.0.0 -ea -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=4001 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -XX:StartFlightRecording:dumponexit=true,filename=/samples/loader.jfr"
|
||||
INDEX_CONSTRUCTION_PROCESS_OPTS="-ea"
|
||||
INDEX_CONSTRUCTION_PROCESS_OPTS="-ea -Djava.util.concurrent.ForkJoinPool.common.parallelism=4"
|
||||
|
Loading…
Reference in New Issue
Block a user