From 5c952d48f46b14874d45a8bb69613adb4c654494 Mon Sep 17 00:00:00 2001 From: vlofgren Date: Mon, 8 Aug 2022 15:17:49 +0200 Subject: [PATCH] Speed up conversion --- .../wmsa/edge/index/IndexServicesFactory.java | 24 +++++---- .../conversion/SearchIndexPreconverter.java | 54 +++++++++++-------- 2 files changed, 47 insertions(+), 31 deletions(-) diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/IndexServicesFactory.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/IndexServicesFactory.java index b3b4d45e..869c6f5b 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/IndexServicesFactory.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/IndexServicesFactory.java @@ -26,6 +26,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Callable; import static nu.marginalia.wmsa.edge.index.EdgeIndexService.DYNAMIC_BUCKET_LENGTH; @@ -39,7 +41,7 @@ public class IndexServicesFactory { private final PartitionedDataFile writerIndexFile; private final RootDataFile keywordLexiconFile; - private final PartitionedDataFile preconverterOutputFile; + private final DoublePartitionedDataFile preconverterOutputFile; private final DoublePartitionedDataFile indexReadWordsFile; private final DoublePartitionedDataFile indexReadUrlsFile; private final DoublePartitionedDataFile indexWriteWordsFile; @@ -75,7 +77,7 @@ public class IndexServicesFactory { this.indexReadUrlsFile = new DoublePartitionedDataFile(partitionRootFast, indexReadUrlsFile); this.indexWriteWordsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteWordsFile); this.indexWriteUrlsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteUrlsFile); - this.preconverterOutputFile = new PartitionedDataFile(partitionRootSlowTmp, "preconverted.dat"); + this.preconverterOutputFile = new DoublePartitionedDataFile(partitionRootSlowTmp, "preconverted.dat"); this.partitioner = partitioner; } @@ -101,7 +103,7 @@ public class IndexServicesFactory { public void convertIndex(int id, IndexBlock block) throws ConversionUnnecessaryException, IOException { var converter = new SearchIndexConverter(block, id, tmpFileDir, - preconverterOutputFile.get(id), + preconverterOutputFile.get(id, block.ordinal()), indexWriteWordsFile.get(id, block.id), indexWriteUrlsFile.get(id, block.id), partitioner, @@ -112,19 +114,23 @@ public class IndexServicesFactory { @SneakyThrows public SearchIndexPreconverter getIndexPreconverter() { - File[] outputFiles = new File[DYNAMIC_BUCKET_LENGTH+1]; - for (int i = 0; i < outputFiles.length; i++) { - outputFiles[i] = getPreconverterOutputFile(i); + Map shards = new HashMap<>(); + + for (int index = 0; index < (DYNAMIC_BUCKET_LENGTH + 1); index++) { + for (IndexBlock block : IndexBlock.values()) { + shards.put(new SearchIndexPreconverter.Shard(index, block.ordinal()), getPreconverterOutputFile(index, block.ordinal())); + } } + return new SearchIndexPreconverter(writerIndexFile.get(0), - outputFiles, + shards, partitioner, domainBlacklist ); } - private File getPreconverterOutputFile(int i) { - return preconverterOutputFile.get(i); + private File getPreconverterOutputFile(int index, int block) { + return preconverterOutputFile.get(index, block); } @SneakyThrows diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPreconverter.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPreconverter.java index d096ff0e..37560b61 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPreconverter.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPreconverter.java @@ -10,26 +10,42 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; +import java.util.Map; import java.util.Objects; public class SearchIndexPreconverter { private final Logger logger = LoggerFactory.getLogger(getClass()); + public record Shard(int bucket, int block) {} + + private record ShardOutput(Shard shard, RandomAccessFile raf, FileChannel fc) { + public static ShardOutput fromFile(Shard s, File f) { + try { + var v = new RandomAccessFile(f, "rw"); + v.seek(SearchIndexJournalReader.FILE_HEADER_SIZE_BYTES); + return new ShardOutput(s, v, v.getChannel()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + @SneakyThrows @Inject public SearchIndexPreconverter(File inputFile, - File[] outputFiles, + Map outputFiles, SearchIndexPartitioner partitioner, EdgeDomainBlacklist blacklist) { TIntHashSet spamDomains = blacklist.getSpamDomains(); logger.info("Preconverting {}", inputFile); - for (File f : outputFiles) { + for (File f : outputFiles.values()) { if (f.exists()) { Files.deleteIfExists(Objects.requireNonNull(f).toPath()); } @@ -41,15 +57,7 @@ public class SearchIndexPreconverter { logger.info("{}", indexJournalReader.fileHeader); - RandomAccessFile[] randomAccessFiles = new RandomAccessFile[outputFiles.length]; - for (int i = 0; i < randomAccessFiles.length; i++) { - randomAccessFiles[i] = new RandomAccessFile(outputFiles[i], "rw"); - randomAccessFiles[i].seek(SearchIndexJournalReader.FILE_HEADER_SIZE_BYTES); - } - FileChannel[] fileChannels = new FileChannel[outputFiles.length]; - for (int i = 0; i < fileChannels.length; i++) { - fileChannels[i] = randomAccessFiles[i].getChannel(); - } + ShardOutput[] outputs = outputFiles.entrySet().stream().map(entry -> ShardOutput.fromFile(entry.getKey(), entry.getValue())).toArray(ShardOutput[]::new); var lock = partitioner.getReadLock(); try { @@ -65,12 +73,14 @@ public class SearchIndexPreconverter { buffer.clear(); entry.copyToBuffer(buffer); - for (int i = 0; i < randomAccessFiles.length; i++) { - if (partitioner.filterUnsafe(domainId, i)) { + for (int i = 0; i < outputs.length; i++) { + if (outputs[i].shard.block == entry.header.block().id + && partitioner.filterUnsafe(domainId, outputs[i].shard.bucket)) + { buffer.flip(); while (buffer.position() < buffer.limit()) - fileChannels[i].write(buffer); + outputs[i].fc.write(buffer); } } } @@ -80,14 +90,14 @@ public class SearchIndexPreconverter { } logger.info("Finalizing preconversion"); - for (int i = 0; i < randomAccessFiles.length; i++) { - long pos = randomAccessFiles[i].getFilePointer(); - randomAccessFiles[i].seek(0); - randomAccessFiles[i].writeLong(pos); - randomAccessFiles[i].writeLong(wordCountOriginal); - fileChannels[i].force(true); - fileChannels[i].close(); - randomAccessFiles[i].close(); + for (int i = 0; i < outputs.length; i++) { + long pos = outputs[i].raf.getFilePointer(); + outputs[i].raf.seek(0); + outputs[i].raf.writeLong(pos); + outputs[i].raf.writeLong(wordCountOriginal); + outputs[i].fc.force(true); + outputs[i].fc.close(); + outputs[i].raf.close(); } }