diff --git a/code/index/java/nu/marginalia/index/IndexGrpcService.java b/code/index/java/nu/marginalia/index/IndexGrpcService.java index 92440796..b210c851 100644 --- a/code/index/java/nu/marginalia/index/IndexGrpcService.java +++ b/code/index/java/nu/marginalia/index/IndexGrpcService.java @@ -33,6 +33,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; @Singleton public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { @@ -60,6 +61,17 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { .help("Index-side query time") .register(); + private static final Gauge wmsa_index_query_exec_stall_time = Gauge.build() + .name("wmsa_index_query_exec_stall_time") + .help("Execution stall time") + .labelNames("node") + .register(); + + private static final Gauge wmsa_index_query_exec_block_time = Gauge.build() + .name("wmsa_index_query_exec_block_time") + .help("Execution stall time") + .labelNames("node") + .register(); private final StatefulIndex index; private final SearchSetsService searchSetsService; @@ -68,7 +80,9 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { private final IndexResultValuatorService resultValuator; private final int nodeId; + private final String nodeName; + private final int indexValuationThreads = Integer.getInteger("index.valuationThreads", 8); @Inject public IndexGrpcService(ServiceConfiguration serviceConfiguration, @@ -78,6 +92,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { IndexResultValuatorService resultValuator) { this.nodeId = serviceConfiguration.node(); + this.nodeName = Integer.toString(nodeId); this.index = index; this.searchSetsService = searchSetsService; this.resultValuator = resultValuator; @@ -91,7 +106,6 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { try { var params = new SearchParameters(request, getSearchSet(request)); - final String nodeName = Integer.toString(nodeId); SearchResultSet results = wmsa_query_time .labels(nodeName, "GRPC") @@ -180,7 +194,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { return searchSetsService.getSearchSetByName(request.getSearchSetIdentifier()); } - private SearchResultSet executeSearch(SearchParameters params) throws SQLException { + private SearchResultSet executeSearch(SearchParameters params) throws SQLException, InterruptedException { if (!index.isLoaded()) { // Short-circuit if the index is not loaded, as we trivially know that there can be no results @@ -189,51 +203,84 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { ResultRankingContext rankingContext = createRankingContext(params.rankingParams, params.subqueries); -// logger.info(queryMarker, "{}", params.queryParams); + var queryExecution = new QueryExecution(rankingContext, params.fetchSize); - return new QueryExecution(rankingContext, params.fetchSize) - .run(params); + var ret = queryExecution.run(params); + + wmsa_index_query_exec_block_time + .labels(nodeName) + .set(queryExecution.getBlockTime() / 1000.); + wmsa_index_query_exec_stall_time + .labels(nodeName) + .set(queryExecution.getStallTime() / 1000.); + + return ret; } + /** This class is responsible for executing a search query. It uses a thread pool to + * execute the subqueries in parallel, and then uses another thread pool to rank the + * results in parallel. The results are then combined into a bounded priority queue, + * and finally the best results are returned. + */ private class QueryExecution { - private static final Executor queryExecutor = Executors.newCachedThreadPool(); - private static final Executor rankingExecutor = Executors.newCachedThreadPool(); - private final ArrayBlockingQueue resultQueue = new ArrayBlockingQueue<>(8); + private static final Executor workerPool = Executors.newCachedThreadPool(); + + private final ArrayBlockingQueue resultCandidateQueue + = new ArrayBlockingQueue<>(8); + private final ResultPriorityQueue resultHeap; private final ResultRankingContext resultRankingContext; private final AtomicInteger remainingIndexTasks = new AtomicInteger(0); private final AtomicInteger remainingValuationTasks = new AtomicInteger(0); + private final AtomicLong blockTime = new AtomicLong(0); + private final AtomicLong stallTime = new AtomicLong(0); + + public long getStallTime() { + return stallTime.get(); + } + public long getBlockTime() { + return blockTime.get(); + } + private QueryExecution(ResultRankingContext resultRankingContext, int maxResults) { this.resultRankingContext = resultRankingContext; this.resultHeap = new ResultPriorityQueue(maxResults); } - public SearchResultSet run(SearchParameters parameters) throws SQLException { + /** Execute a search query */ + public SearchResultSet run(SearchParameters parameters) throws SQLException, InterruptedException { for (var subquery : parameters.subqueries) { - queryExecutor.execute(new IndexLookup(subquery, parameters)); + workerPool.execute(new IndexLookup(subquery, parameters)); } - for (int i = 0; i < 16; i++) { - rankingExecutor.execute(new ResultRanker(parameters, resultRankingContext)); + for (int i = 0; i < indexValuationThreads; i++) { + workerPool.execute(new ResultRanker(parameters, resultRankingContext)); } // Wait for all tasks to complete - synchronized (remainingValuationTasks) { - while (remainingValuationTasks.get() > 0) { - try { - remainingValuationTasks.wait(20); - } - catch (InterruptedException e) { - logger.warn("Interrupted while waiting for tasks to complete", e); - } - } - } + awaitCompletion(); - return new SearchResultSet(resultValuator.selectBestResults(parameters, resultRankingContext, resultHeap)); + // Return the best results + return new SearchResultSet( + resultValuator.selectBestResults(parameters, + resultRankingContext, + resultHeap)); } + /** Wait for all tasks to complete */ + private void awaitCompletion() throws InterruptedException { + synchronized (remainingValuationTasks) { + while (remainingValuationTasks.get() > 0) { + remainingValuationTasks.wait(20); + } + } + } + + /** This class is responsible for executing a subquery and adding the results to the + * resultCandidateQueue, which depending on the state of the valuator threads may + * or may not block*/ class IndexLookup implements Runnable { private final SearchSubquery subquery; private final SearchParameters parameters; @@ -267,7 +314,11 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { long remainingTime = parameters.budget.timeLeft(); try { - resultQueue.offer(resultIds, remainingTime, TimeUnit.MILLISECONDS); + if (!resultCandidateQueue.offer(resultIds)) { + long start = System.currentTimeMillis(); + resultCandidateQueue.offer(resultIds, remainingTime, TimeUnit.MILLISECONDS); + blockTime.addAndGet(System.currentTimeMillis() - start); + } } catch (InterruptedException e) { logger.warn("Interrupted while waiting to offer resultIds to queue", e); @@ -275,6 +326,9 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { } } + /** This class is responsible for ranking the results and adding the best results to the + * resultHeap, which depending on the state of the indexLookup threads may or may not block + */ class ResultRanker implements Runnable { private final SearchParameters parameters; private final ResultRankingContext rankingContext; @@ -290,16 +344,22 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { try { while (parameters.budget.timeLeft() > 0) { - CombinedDocIdList resultIds = resultQueue.poll( + long start = System.currentTimeMillis(); + + CombinedDocIdList resultIds = resultCandidateQueue.poll( Math.clamp(parameters.budget.timeLeft(), 1, 25), TimeUnit.MILLISECONDS); + if (resultIds == null) { - if (remainingIndexTasks.get() == 0 && resultQueue.isEmpty()) + if (remainingIndexTasks.get() == 0 + && resultCandidateQueue.isEmpty()) break; else continue; } + stallTime.addAndGet(System.currentTimeMillis() - start); + var bestResults = resultValuator.rankResults(parameters, rankingContext, resultIds); resultHeap.addAll(bestResults);