diff --git a/code/functions/search-query/api/java/nu/marginalia/api/searchquery/model/results/SearchResultSet.java b/code/functions/search-query/api/java/nu/marginalia/api/searchquery/model/results/SearchResultSet.java deleted file mode 100644 index 09468162..00000000 --- a/code/functions/search-query/api/java/nu/marginalia/api/searchquery/model/results/SearchResultSet.java +++ /dev/null @@ -1,22 +0,0 @@ -package nu.marginalia.api.searchquery.model.results; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.ToString; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; - -@AllArgsConstructor @Getter @ToString -public class SearchResultSet { - public SearchResultSet() { - results = new ArrayList<>(); - } - - public List results; - public int size() { - return results.size(); - } - -} diff --git a/code/index/java/nu/marginalia/index/IndexGrpcService.java b/code/index/java/nu/marginalia/index/IndexGrpcService.java index b16b456d..68e077a4 100644 --- a/code/index/java/nu/marginalia/index/IndexGrpcService.java +++ b/code/index/java/nu/marginalia/index/IndexGrpcService.java @@ -8,14 +8,15 @@ import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; import it.unimi.dsi.fastutil.longs.LongArrayList; import lombok.SneakyThrows; -import nu.marginalia.api.searchquery.*; +import nu.marginalia.api.searchquery.IndexApiGrpc; +import nu.marginalia.api.searchquery.RpcDecoratedResultItem; +import nu.marginalia.api.searchquery.RpcIndexQuery; import nu.marginalia.api.searchquery.model.compiled.CompiledQuery; import nu.marginalia.api.searchquery.model.compiled.CompiledQueryLong; import nu.marginalia.api.searchquery.model.compiled.CqDataInt; import nu.marginalia.api.searchquery.model.query.SearchSpecification; import nu.marginalia.api.searchquery.model.results.ResultRankingContext; import nu.marginalia.api.searchquery.model.results.ResultRankingParameters; -import nu.marginalia.api.searchquery.model.results.SearchResultSet; import nu.marginalia.array.page.LongQueryBuffer; import nu.marginalia.index.index.StatefulIndex; import nu.marginalia.index.model.SearchParameters; @@ -113,7 +114,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { long endTime = System.currentTimeMillis() + request.getQueryLimits().getTimeoutMs(); - SearchResultSet results = wmsa_query_time + List results = wmsa_query_time .labels(nodeName, "GRPC") .time(() -> { // Perform the search @@ -132,48 +133,8 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { } // Send the results back to the client - for (var result : results.results) { - - var rawResult = result.rawIndexResult; - - var rawItem = RpcRawResultItem.newBuilder(); - rawItem.setCombinedId(rawResult.combinedId); - rawItem.setHtmlFeatures(rawResult.htmlFeatures); - rawItem.setEncodedDocMetadata(rawResult.encodedDocMetadata); - rawItem.setHasPriorityTerms(rawResult.hasPrioTerm); - - for (var score : rawResult.keywordScores) { - rawItem.addKeywordScores( - RpcResultKeywordScore.newBuilder() - .setFlags(score.flags) - .setPositions(score.positionCount) - .setKeyword(score.keyword) - ); - } - - var decoratedBuilder = RpcDecoratedResultItem.newBuilder() - .setDataHash(result.dataHash) - .setDescription(result.description) - .setFeatures(result.features) - .setFormat(result.format) - .setRankingScore(result.rankingScore) - .setTitle(result.title) - .setUrl(result.url.toString()) - .setUrlQuality(result.urlQuality) - .setWordsTotal(result.wordsTotal) - .setBestPositions(result.bestPositions) - .setResultsFromDomain(result.resultsFromDomain) - .setRawItem(rawItem); - - var rankingDetails = IndexProtobufCodec.convertRankingDetails(result.rankingDetails); - if (rankingDetails != null) { - decoratedBuilder.setRankingDetails(rankingDetails); - } - - if (result.pubYear != null) { - decoratedBuilder.setPubYear(result.pubYear); - } - responseObserver.onNext(decoratedBuilder.build()); + for (var result : results) { + responseObserver.onNext(result); } responseObserver.onCompleted(); @@ -187,7 +148,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { // exists for test access @SneakyThrows - SearchResultSet justQuery(SearchSpecification specsSet) { + List justQuery(SearchSpecification specsSet) { return executeSearch(new SearchParameters(specsSet, getSearchSet(specsSet))); } @@ -210,11 +171,11 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { } // accessible for tests - public SearchResultSet executeSearch(SearchParameters params) throws SQLException, InterruptedException { + public List executeSearch(SearchParameters params) throws SQLException, InterruptedException { if (!statefulIndex.isLoaded()) { // Short-circuit if the index is not loaded, as we trivially know that there can be no results - return new SearchResultSet(List.of()); + return List.of(); } ResultRankingContext rankingContext = createRankingContext(params.rankingParams, @@ -223,7 +184,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { var queryExecution = new QueryExecution(rankingContext, params.fetchSize); - var ret = queryExecution.run(params); + List ret = queryExecution.run(params); wmsa_index_query_exec_block_time .labels(nodeName) @@ -235,30 +196,69 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { return ret; } + /** This class is responsible for ranking the results and adding the best results to the + * resultHeap, which depending on the state of the indexLookup threads may or may not block + */ + private ResultRankingContext createRankingContext(ResultRankingParameters rankingParams, + CompiledQuery compiledQuery, + CompiledQueryLong compiledQueryIds) + { + + int[] full = new int[compiledQueryIds.size()]; + int[] prio = new int[compiledQueryIds.size()]; + + BitSet ngramsMask = new BitSet(compiledQuery.size()); + BitSet regularMask = new BitSet(compiledQuery.size()); + + var currentIndex = statefulIndex.get(); + + for (int idx = 0; idx < compiledQueryIds.size(); idx++) { + long id = compiledQueryIds.at(idx); + full[idx] = currentIndex.numHits(id); + prio[idx] = currentIndex.numHitsPrio(id); + + if (compiledQuery.at(idx).contains("_")) { + ngramsMask.set(idx); + } + else { + regularMask.set(idx); + } + } + + return new ResultRankingContext(currentIndex.totalDocCount(), + rankingParams, + ngramsMask, + regularMask, + new CqDataInt(full), + new CqDataInt(prio)); + } + /** This class is responsible for executing a search query. It uses a thread pool to * execute the subqueries and their valuation in parallel. The results are then combined * into a bounded priority queue, and finally the best results are returned. */ private class QueryExecution { + private static final Executor workerPool = Executors.newWorkStealingPool(indexValuationThreads*4); /** The queue where the results from the index lookup threads are placed, * pending ranking by the result ranker threads */ private final ArrayBlockingQueue resultCandidateQueue = new ArrayBlockingQueue<>(8); - private final ResultPriorityQueue resultHeap; + private final ResultRankingContext resultRankingContext; - private final AtomicInteger remainingIndexTasks = new AtomicInteger(0); - private final AtomicInteger remainingValuationTasks = new AtomicInteger(0); + private final AtomicInteger remainingValuationTasks = new AtomicInteger(0); private final AtomicLong blockTime = new AtomicLong(0); + private final AtomicLong stallTime = new AtomicLong(0); public long getStallTime() { return stallTime.get(); } + public long getBlockTime() { return blockTime.get(); } @@ -269,7 +269,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { } /** Execute a search query */ - public SearchResultSet run(SearchParameters parameters) throws SQLException, InterruptedException { + public List run(SearchParameters parameters) throws SQLException, InterruptedException { var terms = new SearchTerms(parameters.query, parameters.compiledQueryIds); @@ -286,7 +286,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { awaitCompletion(); // Return the best results - return new SearchResultSet(resultValuator.selectBestResults(parameters, resultHeap)); + return resultValuator.selectBestResults(parameters, resultHeap); } /** Wait for all tasks to complete */ @@ -297,12 +297,12 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { } } } - /** This class is responsible for executing a subquery and adding the results to the * resultCandidateQueue, which depending on the state of the valuator threads may * or may not block */ class IndexLookup implements Runnable { private final IndexQuery query; + private final IndexSearchBudget budget; IndexLookup(IndexQuery query, @@ -344,7 +344,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { results.add(buffer.data.get(i)); } - if (results.size() < 512) { + if (results.size() >= 512) { enqueueResults(new CombinedDocIdList(results)); results.clear(); } @@ -371,13 +371,11 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { logger.warn("Interrupted while waiting to offer resultIds to queue", e); } } - } - /** This class is responsible for ranking the results and adding the best results to the - * resultHeap, which depending on the state of the indexLookup threads may or may not block - */ + } class ResultRanker implements Runnable { private final SearchParameters parameters; + private final ResultRankingContext rankingContext; ResultRanker(SearchParameters parameters, ResultRankingContext rankingContext) { @@ -401,7 +399,6 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { } } } - private boolean execute() throws InterruptedException { long start = System.currentTimeMillis(); @@ -426,43 +423,10 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase { return true; // keep going } + } } - private ResultRankingContext createRankingContext(ResultRankingParameters rankingParams, - CompiledQuery compiledQuery, - CompiledQueryLong compiledQueryIds) - { - - int[] full = new int[compiledQueryIds.size()]; - int[] prio = new int[compiledQueryIds.size()]; - - BitSet ngramsMask = new BitSet(compiledQuery.size()); - BitSet regularMask = new BitSet(compiledQuery.size()); - - var currentIndex = statefulIndex.get(); - - for (int idx = 0; idx < compiledQueryIds.size(); idx++) { - long id = compiledQueryIds.at(idx); - full[idx] = currentIndex.numHits(id); - prio[idx] = currentIndex.numHitsPrio(id); - - if (compiledQuery.at(idx).contains("_")) { - ngramsMask.set(idx); - } - else { - regularMask.set(idx); - } - } - - return new ResultRankingContext(currentIndex.totalDocCount(), - rankingParams, - ngramsMask, - regularMask, - new CqDataInt(full), - new CqDataInt(prio)); - } - } diff --git a/code/index/java/nu/marginalia/index/results/IndexResultRankingService.java b/code/index/java/nu/marginalia/index/results/IndexResultRankingService.java index 90331d14..8c94cefd 100644 --- a/code/index/java/nu/marginalia/index/results/IndexResultRankingService.java +++ b/code/index/java/nu/marginalia/index/results/IndexResultRankingService.java @@ -6,9 +6,11 @@ import gnu.trove.list.TLongList; import gnu.trove.list.array.TLongArrayList; import gnu.trove.map.hash.TObjectLongHashMap; import it.unimi.dsi.fastutil.longs.LongArrayList; +import nu.marginalia.api.searchquery.RpcDecoratedResultItem; +import nu.marginalia.api.searchquery.RpcRawResultItem; +import nu.marginalia.api.searchquery.RpcResultKeywordScore; import nu.marginalia.api.searchquery.model.compiled.CompiledQuery; import nu.marginalia.api.searchquery.model.query.SearchQuery; -import nu.marginalia.api.searchquery.model.results.DecoratedSearchResultItem; import nu.marginalia.api.searchquery.model.results.ResultRankingContext; import nu.marginalia.api.searchquery.model.results.SearchResultItem; import nu.marginalia.index.index.CombinedIndexReader; @@ -109,8 +111,8 @@ public class IndexResultRankingService { } - public List selectBestResults(SearchParameters params, - Collection results) throws SQLException { + public List selectBestResults(SearchParameters params, + Collection results) throws SQLException { var domainCountFilter = new IndexResultDomainDeduplicator(params.limitByDomain); @@ -141,7 +143,7 @@ public class IndexResultRankingService { detailsById.put(item.urlId(), item); } - List resultItems = new ArrayList<>(resultsList.size()); + List resultItems = new ArrayList<>(resultsList.size()); // Decorate the results with the document details for (var result : resultsList) { @@ -153,23 +155,45 @@ public class IndexResultRankingService { continue; } - // Create a decorated search result item from the result and the document data - resultItems.add(new DecoratedSearchResultItem( - result, - docData.url(), - docData.title(), - docData.description(), - docData.urlQuality(), - docData.format(), - docData.features(), - docData.pubYear(), - docData.dataHash(), - docData.wordsTotal(), - 0L, //bestPositions(wordMetas), - result.getScore(), - domainCountFilter.getCount(result), - null - )); + var rawItem = RpcRawResultItem.newBuilder(); + + rawItem.setCombinedId(result.combinedId); + rawItem.setHtmlFeatures(result.htmlFeatures); + rawItem.setEncodedDocMetadata(result.encodedDocMetadata); + rawItem.setHasPriorityTerms(result.hasPrioTerm); + + for (var score : result.keywordScores) { + rawItem.addKeywordScores( + RpcResultKeywordScore.newBuilder() + .setFlags(score.flags) + .setPositions(score.positionCount) + .setKeyword(score.keyword) + ); + } + + var decoratedBuilder = RpcDecoratedResultItem.newBuilder() + .setDataHash(docData.dataHash()) + .setDescription(docData.description()) + .setFeatures(docData.features()) + .setFormat(docData.format()) + .setRankingScore(result.getScore()) + .setTitle(docData.title()) + .setUrl(docData.url().toString()) + .setUrlQuality(docData.urlQuality()) + .setWordsTotal(docData.wordsTotal()) + .setBestPositions(0 /* FIXME */) + .setResultsFromDomain(domainCountFilter.getCount(result)) + .setRawItem(rawItem); + + if (docData.pubYear() != null) { + decoratedBuilder.setPubYear(docData.pubYear()); + } + + /* FIXME + var rankingDetails = IndexProtobufCodec.convertRankingDetails(result.rankingDetails); + if (rankingDetails != null) { + decoratedBuilder.setRankingDetails(rankingDetails); + }*/ } return resultItems; diff --git a/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationSmokeTest.java b/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationSmokeTest.java index 60501571..5021f2ee 100644 --- a/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationSmokeTest.java +++ b/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationSmokeTest.java @@ -130,9 +130,9 @@ public class IndexQueryServiceIntegrationSmokeTest { int[] idxes = new int[] { 30, 510, 90, 150, 210, 270, 330, 390, 450 }; long[] ids = IntStream.of(idxes).mapToLong(this::fullId).toArray(); - long[] actual = rsp.results + long[] actual = rsp .stream() - .mapToLong(i -> i.rawIndexResult.getDocumentId()) + .mapToLong(i -> i.getRawItem().getCombinedId()) .toArray(); System.out.println(Arrays.toString(actual)); @@ -177,9 +177,9 @@ public class IndexQueryServiceIntegrationSmokeTest { int[] idxes = new int[] { 504, 360, 420, 480, 240, 180, 300, 120, 280, 440 }; long[] ids = IntStream.of(idxes).mapToLong(Long::valueOf).toArray(); - long[] actual = rsp.results + long[] actual = rsp .stream() - .mapToLong(i -> i.rawIndexResult.getDocumentId()) + .mapToLong(i -> i.getRawItem().getCombinedId()) .map(UrlIdCodec::getDocumentOrdinal) .toArray(); @@ -224,7 +224,7 @@ public class IndexQueryServiceIntegrationSmokeTest { Collections.emptyList())).build()); int[] idxes = new int[] { 210, 270 }; long[] ids = IntStream.of(idxes).mapToLong(id -> UrlIdCodec.encodeId(id/100, id)).toArray(); - long[] actual = rsp.results.stream().mapToLong(i -> i.rawIndexResult.getDocumentId()).toArray(); + long[] actual = rsp.stream().mapToLong(i -> i.getRawItem().getCombinedId()).toArray(); Assertions.assertArrayEquals(ids, actual); } @@ -262,12 +262,12 @@ public class IndexQueryServiceIntegrationSmokeTest { Set years = new HashSet<>(); - for (var res : rsp.results) { - years.add(DocumentMetadata.decodeYear(res.rawIndexResult.encodedDocMetadata)); + for (var res : rsp) { + years.add(DocumentMetadata.decodeYear(res.getRawItem().getCombinedId())); } assertEquals(Set.of(1998), years); - assertEquals(rsp.results.size(), 10); + assertEquals(rsp.size(), 10); } diff --git a/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationTest.java b/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationTest.java index eb83f714..569b7937 100644 --- a/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationTest.java +++ b/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationTest.java @@ -347,8 +347,8 @@ public class IndexQueryServiceIntegrationTest { System.out.println(rsp); - for (var result : rsp.results) { - long docId = result.rawIndexResult.getDocumentId(); + for (var result : rsp) { + long docId = result.getRawItem().getCombinedId(); actual.add(new MockDataDocument(UrlIdCodec.getDomainId(docId), UrlIdCodec.getDocumentOrdinal(docId))); } @@ -382,9 +382,9 @@ public class IndexQueryServiceIntegrationTest { includeAndCohere("hello", "world") ))); - assertEquals(1, rsp.results.size()); + assertEquals(1, rsp.size()); assertEquals(d(2,2).docId(), - rsp.results.get(0).rawIndexResult.getDocumentId()); + rsp.get(0).getRawItem().getCombinedId()); } SearchSpecification basicQuery(Function mutator)