diff --git a/code/features-index/result-ranking/src/main/java/nu/marginalia/ranking/ResultValuator.java b/code/features-index/result-ranking/src/main/java/nu/marginalia/ranking/ResultValuator.java index 2a856258..6322c09c 100644 --- a/code/features-index/result-ranking/src/main/java/nu/marginalia/ranking/ResultValuator.java +++ b/code/features-index/result-ranking/src/main/java/nu/marginalia/ranking/ResultValuator.java @@ -80,6 +80,14 @@ public class ResultValuator { temporalBias = 0; } + logger.info("averageSentenceLengthPenalty: " + averageSentenceLengthPenalty); + logger.info("documentLengthPenalty: " + documentLengthPenalty); + logger.info("qualityPenalty: " + qualityPenalty); + logger.info("rankingBonus: " + rankingBonus); + logger.info("topologyBonus: " + topologyBonus); + logger.info("temporalBias: " + temporalBias); + logger.info("flagsPenalty: " + flagsPenalty); + double overallPart = averageSentenceLengthPenalty + documentLengthPenalty + qualityPenalty @@ -112,9 +120,22 @@ public class ResultValuator { double overallPartPositive = Math.max(0, overallPart); double overallPartNegative = -Math.min(0, overallPart); + + logger.info("bestTcf: " + bestTcf); + logger.info("bestBM25F: " + bestBM25F); + logger.info("bestBM25P: " + bestBM25P); + logger.info("bestBM25PN: " + bestBM25PN); + logger.info("overallPartPositive: " + overallPartPositive); + logger.info("overallPartNegative: " + overallPartNegative); + // Renormalize to 0...15, where 0 is the best possible score; // this is a historical artifact of the original ranking function - return normalize(1.5 * bestTcf + bestBM25F + bestBM25P + 0.25 * bestBM25PN + overallPartPositive, overallPartNegative); + double ret = normalize(1.5 * bestTcf + bestBM25F + bestBM25P + 0.25 * bestBM25PN + overallPartPositive, overallPartNegative); + + logger.info("ret: " + ret); + + return ret; + } private double calculateQualityPenalty(int size, int quality, ResultRankingParameters rankingParams) { diff --git a/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java b/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java index 523381fa..c143f88a 100644 --- a/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java +++ b/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java @@ -19,21 +19,16 @@ public class ProcessingIterator implements Iterator { private final LinkedBlockingQueue queue; private final AtomicBoolean isFinished = new AtomicBoolean(false); - private final ExecutorService executorService; - private final Semaphore sem; + private final SimpleBlockingThreadPool pool; private T next = null; - private final int parallelism; - - ProcessingIterator(ExecutorService executorService, int queueSize, int parallelism, ProcessingJob task) { - this.parallelism = parallelism; - + @SneakyThrows + ProcessingIterator(SimpleBlockingThreadPool pool, int queueSize, ProcessingJob task) { queue = new LinkedBlockingQueue<>(queueSize); - this.executorService = executorService; - sem = new Semaphore(parallelism); + this.pool = pool; - executorService.submit(() -> executeJob(task)); + pool.submit(() -> executeJob(task)); } public static Factory factory(int queueSize, int parallelism) { @@ -50,20 +45,13 @@ public class ProcessingIterator implements Iterator { } } + @SneakyThrows private void executeTask(Task task) { - try { - sem.acquire(); - } catch (InterruptedException e) { - return; - } - - executorService.submit(() -> { + pool.submit(() -> { try { queue.put(task.get()); } catch (Exception e) { logger.warn("Exception while processing", e); - } finally { - sem.release(); } }); } @@ -97,7 +85,7 @@ public class ProcessingIterator implements Iterator { private boolean expectMore() { return !isFinished.get() // we are still reading from the database || !queue.isEmpty() // ... or we have documents in the queue - || sem.availablePermits() < parallelism; // ... or we are still processing documents + || pool.getActiveCount() > 0; // ... or we are still processing documents } /** Returns the next document to be processed. @@ -142,24 +130,17 @@ public class ProcessingIterator implements Iterator { public static class Factory { private final int queueSize; - private final int parallelism; - private final ExecutorService executorService; + private final SimpleBlockingThreadPool pool; Factory(int queueSize, int parallelism) { this.queueSize = queueSize; - this.parallelism = parallelism; - this.executorService = Executors.newFixedThreadPool(parallelism); + this.pool = new SimpleBlockingThreadPool("sideload", parallelism, 4); } public ProcessingIterator create(ProcessingJob task) { - return new ProcessingIterator<>(executorService, queueSize, parallelism, task); + return new ProcessingIterator<>(pool, queueSize, task); } - public void stop() { - if (!executorService.isShutdown()) { - executorService.shutdown(); - } - } } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java index 630f97f7..a7f62aca 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java @@ -86,7 +86,9 @@ public class DomainProcessor { private final Set processedUrls = new HashSet<>(); private final DomainLinks externalDomainLinks; private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator(); - private static ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(24, 16); + private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8, + Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors()) + ); SideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) throws IOException { this.dataStream = dataStream;