Clean up preconverter code

This commit is contained in:
vlofgren 2022-08-08 15:29:44 +02:00
parent 31b5742280
commit 2af2c50f34

View File

@ -23,18 +23,6 @@ public class SearchIndexPreconverter {
public record Shard(int bucket, int block) {} public record Shard(int bucket, int block) {}
private record ShardOutput(Shard shard, RandomAccessFile raf, FileChannel fc) {
public static ShardOutput fromFile(Shard s, File f) {
try {
var v = new RandomAccessFile(f, "rw");
v.seek(SearchIndexJournalReader.FILE_HEADER_SIZE_BYTES);
return new ShardOutput(s, v, v.getChannel());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
@SneakyThrows @SneakyThrows
@Inject @Inject
public SearchIndexPreconverter(File inputFile, public SearchIndexPreconverter(File inputFile,
@ -57,7 +45,9 @@ public class SearchIndexPreconverter {
logger.info("{}", indexJournalReader.fileHeader); logger.info("{}", indexJournalReader.fileHeader);
ShardOutput[] outputs = outputFiles.entrySet().stream().map(entry -> ShardOutput.fromFile(entry.getKey(), entry.getValue())).toArray(ShardOutput[]::new); ShardOutput[] outputs = outputFiles.entrySet().stream()
.map(entry -> ShardOutput.fromFile(entry.getKey(), entry.getValue()))
.toArray(ShardOutput[]::new);
var lock = partitioner.getReadLock(); var lock = partitioner.getReadLock();
try { try {
@ -69,18 +59,14 @@ public class SearchIndexPreconverter {
continue; continue;
} }
int domainId = entry.domainId();
buffer.clear(); buffer.clear();
entry.copyToBuffer(buffer); entry.copyToBuffer(buffer);
for (int i = 0; i < outputs.length; i++) { for (ShardOutput output : outputs) {
if (outputs[i].shard.block == entry.header.block().id if (output.shouldWrite(partitioner, entry)) {
&& partitioner.filterUnsafe(domainId, outputs[i].shard.bucket))
{
buffer.flip(); buffer.flip();
while (buffer.position() < buffer.limit()) output.write(buffer);
outputs[i].fc.write(buffer);
} }
} }
} }
@ -90,16 +76,42 @@ public class SearchIndexPreconverter {
} }
logger.info("Finalizing preconversion"); logger.info("Finalizing preconversion");
for (int i = 0; i < outputs.length; i++) { for (ShardOutput output : outputs) {
long pos = outputs[i].raf.getFilePointer(); output.finish(wordCountOriginal);
outputs[i].raf.seek(0);
outputs[i].raf.writeLong(pos);
outputs[i].raf.writeLong(wordCountOriginal);
outputs[i].fc.force(true);
outputs[i].fc.close();
outputs[i].raf.close();
} }
} }
private record ShardOutput(Shard shard, RandomAccessFile raf, FileChannel fc) {
public static ShardOutput fromFile(Shard s, File f) {
try {
var v = new RandomAccessFile(f, "rw");
v.seek(SearchIndexJournalReader.FILE_HEADER_SIZE_BYTES);
return new ShardOutput(s, v, v.getChannel());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public boolean shouldWrite(SearchIndexPartitioner partitioner, SearchIndexJournalReader.JournalEntry entry) {
return shard.block == entry.header.block().id
&& partitioner.filterUnsafe(entry.domainId(), shard.bucket);
}
public void finish(long wordCountOriginal) throws IOException {
long pos = raf.getFilePointer();
raf.seek(0);
raf.writeLong(pos);
raf.writeLong(wordCountOriginal);
fc.force(true);
fc.close();
raf.close();
}
public void write(ByteBuffer buffer) throws IOException {
while (buffer.position() < buffer.limit())
fc.write(buffer);
}
};
} }