Speed up conversion

This commit is contained in:
vlofgren 2022-08-08 15:17:49 +02:00
parent 41e89bf735
commit 5c952d48f4
2 changed files with 47 additions and 31 deletions

View File

@ -26,6 +26,8 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import static nu.marginalia.wmsa.edge.index.EdgeIndexService.DYNAMIC_BUCKET_LENGTH; import static nu.marginalia.wmsa.edge.index.EdgeIndexService.DYNAMIC_BUCKET_LENGTH;
@ -39,7 +41,7 @@ public class IndexServicesFactory {
private final PartitionedDataFile writerIndexFile; private final PartitionedDataFile writerIndexFile;
private final RootDataFile keywordLexiconFile; private final RootDataFile keywordLexiconFile;
private final PartitionedDataFile preconverterOutputFile; private final DoublePartitionedDataFile preconverterOutputFile;
private final DoublePartitionedDataFile indexReadWordsFile; private final DoublePartitionedDataFile indexReadWordsFile;
private final DoublePartitionedDataFile indexReadUrlsFile; private final DoublePartitionedDataFile indexReadUrlsFile;
private final DoublePartitionedDataFile indexWriteWordsFile; private final DoublePartitionedDataFile indexWriteWordsFile;
@ -75,7 +77,7 @@ public class IndexServicesFactory {
this.indexReadUrlsFile = new DoublePartitionedDataFile(partitionRootFast, indexReadUrlsFile); this.indexReadUrlsFile = new DoublePartitionedDataFile(partitionRootFast, indexReadUrlsFile);
this.indexWriteWordsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteWordsFile); this.indexWriteWordsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteWordsFile);
this.indexWriteUrlsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteUrlsFile); this.indexWriteUrlsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteUrlsFile);
this.preconverterOutputFile = new PartitionedDataFile(partitionRootSlowTmp, "preconverted.dat"); this.preconverterOutputFile = new DoublePartitionedDataFile(partitionRootSlowTmp, "preconverted.dat");
this.partitioner = partitioner; this.partitioner = partitioner;
} }
@ -101,7 +103,7 @@ public class IndexServicesFactory {
public void convertIndex(int id, IndexBlock block) throws ConversionUnnecessaryException, IOException { public void convertIndex(int id, IndexBlock block) throws ConversionUnnecessaryException, IOException {
var converter = new SearchIndexConverter(block, id, tmpFileDir, var converter = new SearchIndexConverter(block, id, tmpFileDir,
preconverterOutputFile.get(id), preconverterOutputFile.get(id, block.ordinal()),
indexWriteWordsFile.get(id, block.id), indexWriteWordsFile.get(id, block.id),
indexWriteUrlsFile.get(id, block.id), indexWriteUrlsFile.get(id, block.id),
partitioner, partitioner,
@ -112,19 +114,23 @@ public class IndexServicesFactory {
@SneakyThrows @SneakyThrows
public SearchIndexPreconverter getIndexPreconverter() { public SearchIndexPreconverter getIndexPreconverter() {
File[] outputFiles = new File[DYNAMIC_BUCKET_LENGTH+1]; Map<SearchIndexPreconverter.Shard, File> shards = new HashMap<>();
for (int i = 0; i < outputFiles.length; i++) {
outputFiles[i] = getPreconverterOutputFile(i); for (int index = 0; index < (DYNAMIC_BUCKET_LENGTH + 1); index++) {
for (IndexBlock block : IndexBlock.values()) {
shards.put(new SearchIndexPreconverter.Shard(index, block.ordinal()), getPreconverterOutputFile(index, block.ordinal()));
}
} }
return new SearchIndexPreconverter(writerIndexFile.get(0), return new SearchIndexPreconverter(writerIndexFile.get(0),
outputFiles, shards,
partitioner, partitioner,
domainBlacklist domainBlacklist
); );
} }
private File getPreconverterOutputFile(int i) { private File getPreconverterOutputFile(int index, int block) {
return preconverterOutputFile.get(i); return preconverterOutputFile.get(index, block);
} }
@SneakyThrows @SneakyThrows

View File

@ -10,26 +10,42 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class SearchIndexPreconverter { public class SearchIndexPreconverter {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
public record Shard(int bucket, int block) {}
private record ShardOutput(Shard shard, RandomAccessFile raf, FileChannel fc) {
public static ShardOutput fromFile(Shard s, File f) {
try {
var v = new RandomAccessFile(f, "rw");
v.seek(SearchIndexJournalReader.FILE_HEADER_SIZE_BYTES);
return new ShardOutput(s, v, v.getChannel());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
@SneakyThrows @SneakyThrows
@Inject @Inject
public SearchIndexPreconverter(File inputFile, public SearchIndexPreconverter(File inputFile,
File[] outputFiles, Map<Shard, File> outputFiles,
SearchIndexPartitioner partitioner, SearchIndexPartitioner partitioner,
EdgeDomainBlacklist blacklist) EdgeDomainBlacklist blacklist)
{ {
TIntHashSet spamDomains = blacklist.getSpamDomains(); TIntHashSet spamDomains = blacklist.getSpamDomains();
logger.info("Preconverting {}", inputFile); logger.info("Preconverting {}", inputFile);
for (File f : outputFiles) { for (File f : outputFiles.values()) {
if (f.exists()) { if (f.exists()) {
Files.deleteIfExists(Objects.requireNonNull(f).toPath()); Files.deleteIfExists(Objects.requireNonNull(f).toPath());
} }
@ -41,15 +57,7 @@ public class SearchIndexPreconverter {
logger.info("{}", indexJournalReader.fileHeader); logger.info("{}", indexJournalReader.fileHeader);
RandomAccessFile[] randomAccessFiles = new RandomAccessFile[outputFiles.length]; ShardOutput[] outputs = outputFiles.entrySet().stream().map(entry -> ShardOutput.fromFile(entry.getKey(), entry.getValue())).toArray(ShardOutput[]::new);
for (int i = 0; i < randomAccessFiles.length; i++) {
randomAccessFiles[i] = new RandomAccessFile(outputFiles[i], "rw");
randomAccessFiles[i].seek(SearchIndexJournalReader.FILE_HEADER_SIZE_BYTES);
}
FileChannel[] fileChannels = new FileChannel[outputFiles.length];
for (int i = 0; i < fileChannels.length; i++) {
fileChannels[i] = randomAccessFiles[i].getChannel();
}
var lock = partitioner.getReadLock(); var lock = partitioner.getReadLock();
try { try {
@ -65,12 +73,14 @@ public class SearchIndexPreconverter {
buffer.clear(); buffer.clear();
entry.copyToBuffer(buffer); entry.copyToBuffer(buffer);
for (int i = 0; i < randomAccessFiles.length; i++) { for (int i = 0; i < outputs.length; i++) {
if (partitioner.filterUnsafe(domainId, i)) { if (outputs[i].shard.block == entry.header.block().id
&& partitioner.filterUnsafe(domainId, outputs[i].shard.bucket))
{
buffer.flip(); buffer.flip();
while (buffer.position() < buffer.limit()) while (buffer.position() < buffer.limit())
fileChannels[i].write(buffer); outputs[i].fc.write(buffer);
} }
} }
} }
@ -80,14 +90,14 @@ public class SearchIndexPreconverter {
} }
logger.info("Finalizing preconversion"); logger.info("Finalizing preconversion");
for (int i = 0; i < randomAccessFiles.length; i++) { for (int i = 0; i < outputs.length; i++) {
long pos = randomAccessFiles[i].getFilePointer(); long pos = outputs[i].raf.getFilePointer();
randomAccessFiles[i].seek(0); outputs[i].raf.seek(0);
randomAccessFiles[i].writeLong(pos); outputs[i].raf.writeLong(pos);
randomAccessFiles[i].writeLong(wordCountOriginal); outputs[i].raf.writeLong(wordCountOriginal);
fileChannels[i].force(true); outputs[i].fc.force(true);
fileChannels[i].close(); outputs[i].fc.close();
randomAccessFiles[i].close(); outputs[i].raf.close();
} }
} }