(index) Observability for query execution queues

This commit is contained in:
Viktor Lofgren 2024-02-25 11:54:31 +01:00
parent b8e336e809
commit 7fc0d4d786

View File

@ -33,6 +33,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Singleton @Singleton
public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
@ -60,6 +61,17 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
.help("Index-side query time") .help("Index-side query time")
.register(); .register();
private static final Gauge wmsa_index_query_exec_stall_time = Gauge.build()
.name("wmsa_index_query_exec_stall_time")
.help("Execution stall time")
.labelNames("node")
.register();
private static final Gauge wmsa_index_query_exec_block_time = Gauge.build()
.name("wmsa_index_query_exec_block_time")
.help("Execution stall time")
.labelNames("node")
.register();
private final StatefulIndex index; private final StatefulIndex index;
private final SearchSetsService searchSetsService; private final SearchSetsService searchSetsService;
@ -68,7 +80,9 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
private final IndexResultValuatorService resultValuator; private final IndexResultValuatorService resultValuator;
private final int nodeId; private final int nodeId;
private final String nodeName;
private final int indexValuationThreads = Integer.getInteger("index.valuationThreads", 8);
@Inject @Inject
public IndexGrpcService(ServiceConfiguration serviceConfiguration, public IndexGrpcService(ServiceConfiguration serviceConfiguration,
@ -78,6 +92,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
IndexResultValuatorService resultValuator) IndexResultValuatorService resultValuator)
{ {
this.nodeId = serviceConfiguration.node(); this.nodeId = serviceConfiguration.node();
this.nodeName = Integer.toString(nodeId);
this.index = index; this.index = index;
this.searchSetsService = searchSetsService; this.searchSetsService = searchSetsService;
this.resultValuator = resultValuator; this.resultValuator = resultValuator;
@ -91,7 +106,6 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
try { try {
var params = new SearchParameters(request, getSearchSet(request)); var params = new SearchParameters(request, getSearchSet(request));
final String nodeName = Integer.toString(nodeId);
SearchResultSet results = wmsa_query_time SearchResultSet results = wmsa_query_time
.labels(nodeName, "GRPC") .labels(nodeName, "GRPC")
@ -180,7 +194,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
return searchSetsService.getSearchSetByName(request.getSearchSetIdentifier()); return searchSetsService.getSearchSetByName(request.getSearchSetIdentifier());
} }
private SearchResultSet executeSearch(SearchParameters params) throws SQLException { private SearchResultSet executeSearch(SearchParameters params) throws SQLException, InterruptedException {
if (!index.isLoaded()) { if (!index.isLoaded()) {
// Short-circuit if the index is not loaded, as we trivially know that there can be no results // Short-circuit if the index is not loaded, as we trivially know that there can be no results
@ -189,51 +203,84 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
ResultRankingContext rankingContext = createRankingContext(params.rankingParams, params.subqueries); ResultRankingContext rankingContext = createRankingContext(params.rankingParams, params.subqueries);
// logger.info(queryMarker, "{}", params.queryParams); var queryExecution = new QueryExecution(rankingContext, params.fetchSize);
return new QueryExecution(rankingContext, params.fetchSize) var ret = queryExecution.run(params);
.run(params);
wmsa_index_query_exec_block_time
.labels(nodeName)
.set(queryExecution.getBlockTime() / 1000.);
wmsa_index_query_exec_stall_time
.labels(nodeName)
.set(queryExecution.getStallTime() / 1000.);
return ret;
} }
/** This class is responsible for executing a search query. It uses a thread pool to
* execute the subqueries in parallel, and then uses another thread pool to rank the
* results in parallel. The results are then combined into a bounded priority queue,
* and finally the best results are returned.
*/
private class QueryExecution { private class QueryExecution {
private static final Executor queryExecutor = Executors.newCachedThreadPool(); private static final Executor workerPool = Executors.newCachedThreadPool();
private static final Executor rankingExecutor = Executors.newCachedThreadPool();
private final ArrayBlockingQueue<CombinedDocIdList> resultQueue = new ArrayBlockingQueue<>(8); private final ArrayBlockingQueue<CombinedDocIdList> resultCandidateQueue
= new ArrayBlockingQueue<>(8);
private final ResultPriorityQueue resultHeap; private final ResultPriorityQueue resultHeap;
private final ResultRankingContext resultRankingContext; private final ResultRankingContext resultRankingContext;
private final AtomicInteger remainingIndexTasks = new AtomicInteger(0); private final AtomicInteger remainingIndexTasks = new AtomicInteger(0);
private final AtomicInteger remainingValuationTasks = new AtomicInteger(0); private final AtomicInteger remainingValuationTasks = new AtomicInteger(0);
private final AtomicLong blockTime = new AtomicLong(0);
private final AtomicLong stallTime = new AtomicLong(0);
public long getStallTime() {
return stallTime.get();
}
public long getBlockTime() {
return blockTime.get();
}
private QueryExecution(ResultRankingContext resultRankingContext, int maxResults) { private QueryExecution(ResultRankingContext resultRankingContext, int maxResults) {
this.resultRankingContext = resultRankingContext; this.resultRankingContext = resultRankingContext;
this.resultHeap = new ResultPriorityQueue(maxResults); this.resultHeap = new ResultPriorityQueue(maxResults);
} }
public SearchResultSet run(SearchParameters parameters) throws SQLException { /** Execute a search query */
public SearchResultSet run(SearchParameters parameters) throws SQLException, InterruptedException {
for (var subquery : parameters.subqueries) { for (var subquery : parameters.subqueries) {
queryExecutor.execute(new IndexLookup(subquery, parameters)); workerPool.execute(new IndexLookup(subquery, parameters));
} }
for (int i = 0; i < 16; i++) { for (int i = 0; i < indexValuationThreads; i++) {
rankingExecutor.execute(new ResultRanker(parameters, resultRankingContext)); workerPool.execute(new ResultRanker(parameters, resultRankingContext));
} }
// Wait for all tasks to complete // Wait for all tasks to complete
synchronized (remainingValuationTasks) { awaitCompletion();
while (remainingValuationTasks.get() > 0) {
try {
remainingValuationTasks.wait(20);
}
catch (InterruptedException e) {
logger.warn("Interrupted while waiting for tasks to complete", e);
}
}
}
return new SearchResultSet(resultValuator.selectBestResults(parameters, resultRankingContext, resultHeap)); // Return the best results
return new SearchResultSet(
resultValuator.selectBestResults(parameters,
resultRankingContext,
resultHeap));
} }
/** Wait for all tasks to complete */
private void awaitCompletion() throws InterruptedException {
synchronized (remainingValuationTasks) {
while (remainingValuationTasks.get() > 0) {
remainingValuationTasks.wait(20);
}
}
}
/** This class is responsible for executing a subquery and adding the results to the
* resultCandidateQueue, which depending on the state of the valuator threads may
* or may not block*/
class IndexLookup implements Runnable { class IndexLookup implements Runnable {
private final SearchSubquery subquery; private final SearchSubquery subquery;
private final SearchParameters parameters; private final SearchParameters parameters;
@ -267,7 +314,11 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
long remainingTime = parameters.budget.timeLeft(); long remainingTime = parameters.budget.timeLeft();
try { try {
resultQueue.offer(resultIds, remainingTime, TimeUnit.MILLISECONDS); if (!resultCandidateQueue.offer(resultIds)) {
long start = System.currentTimeMillis();
resultCandidateQueue.offer(resultIds, remainingTime, TimeUnit.MILLISECONDS);
blockTime.addAndGet(System.currentTimeMillis() - start);
}
} }
catch (InterruptedException e) { catch (InterruptedException e) {
logger.warn("Interrupted while waiting to offer resultIds to queue", e); logger.warn("Interrupted while waiting to offer resultIds to queue", e);
@ -275,6 +326,9 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
} }
} }
/** This class is responsible for ranking the results and adding the best results to the
* resultHeap, which depending on the state of the indexLookup threads may or may not block
*/
class ResultRanker implements Runnable { class ResultRanker implements Runnable {
private final SearchParameters parameters; private final SearchParameters parameters;
private final ResultRankingContext rankingContext; private final ResultRankingContext rankingContext;
@ -290,16 +344,22 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
try { try {
while (parameters.budget.timeLeft() > 0) { while (parameters.budget.timeLeft() > 0) {
CombinedDocIdList resultIds = resultQueue.poll( long start = System.currentTimeMillis();
CombinedDocIdList resultIds = resultCandidateQueue.poll(
Math.clamp(parameters.budget.timeLeft(), 1, 25), Math.clamp(parameters.budget.timeLeft(), 1, 25),
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
if (resultIds == null) { if (resultIds == null) {
if (remainingIndexTasks.get() == 0 && resultQueue.isEmpty()) if (remainingIndexTasks.get() == 0
&& resultCandidateQueue.isEmpty())
break; break;
else else
continue; continue;
} }
stallTime.addAndGet(System.currentTimeMillis() - start);
var bestResults = resultValuator.rankResults(parameters, rankingContext, resultIds); var bestResults = resultValuator.rankResults(parameters, rankingContext, resultIds);
resultHeap.addAll(bestResults); resultHeap.addAll(bestResults);