diff --git a/code/features-convert/data-extractors/build.gradle b/code/features-convert/data-extractors/build.gradle index 73aebd49..69ae1388 100644 --- a/code/features-convert/data-extractors/build.gradle +++ b/code/features-convert/data-extractors/build.gradle @@ -21,6 +21,7 @@ dependencies { implementation project(':code:common:model') implementation project(':code:libraries:language-processing') implementation project(':code:libraries:term-frequency-dict') + implementation project(':code:libraries:blocking-thread-pool') implementation project(':code:features-crawl:link-parser') implementation project(':code:features-convert:anchor-keywords') implementation project(':code:process-models:crawling-model') diff --git a/code/features-convert/data-extractors/java/nu/marginalia/extractor/TermFrequencyExporter.java b/code/features-convert/data-extractors/java/nu/marginalia/extractor/TermFrequencyExporter.java index bdb7362a..1e1a2cd5 100644 --- a/code/features-convert/data-extractors/java/nu/marginalia/extractor/TermFrequencyExporter.java +++ b/code/features-convert/data-extractors/java/nu/marginalia/extractor/TermFrequencyExporter.java @@ -14,6 +14,7 @@ import nu.marginalia.process.log.WorkLog; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorageId; +import nu.marginalia.util.SimpleBlockingThreadPool; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.slf4j.Logger; @@ -53,27 +54,23 @@ public class TermFrequencyExporter implements ExporterIf { TLongIntHashMap counts = new TLongIntHashMap(100_000_000, 0.7f, -1, -1); AtomicInteger docCount = new AtomicInteger(); - try (ForkJoinPool fjp = new ForkJoinPool(Math.max(2, Runtime.getRuntime().availableProcessors() / 2))) { + SimpleBlockingThreadPool sjp = new SimpleBlockingThreadPool("exporter", Math.clamp(2, 16, Runtime.getRuntime().availableProcessors() / 2), 4); + Path crawlerLogFile = inputDir.resolve("crawler.log"); - Path crawlerLogFile = inputDir.resolve("crawler.log"); + for (var item : WorkLog.iterable(crawlerLogFile)) { + if (Thread.interrupted()) { + sjp.shutDownNow(); - for (var item : WorkLog.iterable(crawlerLogFile)) { - if (Thread.interrupted()) { - fjp.shutdownNow(); - - throw new InterruptedException(); - } - - Path crawlDataPath = inputDir.resolve(item.relPath()); - fjp.execute(() -> processFile(crawlDataPath, counts, docCount, se.get())); + throw new InterruptedException(); } - while (!fjp.isQuiescent()) { - if (fjp.awaitQuiescence(10, TimeUnit.SECONDS)) - break; - } + Path crawlDataPath = inputDir.resolve(item.relPath()); + sjp.submitQuietly(() -> processFile(crawlDataPath, counts, docCount, se.get())); } + sjp.shutDown(); + sjp.awaitTermination(10, TimeUnit.DAYS); + var tmpFile = Files.createTempFile(destStorage.asPath(), "freqs", ".dat.tmp", PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));