(index) Remove intermediate models

This commit is contained in:
Viktor Lofgren 2024-08-07 10:10:44 +02:00
parent 680ad19c7d
commit 7babdb87d5
5 changed files with 116 additions and 150 deletions

View File

@ -1,22 +0,0 @@
package nu.marginalia.api.searchquery.model.results;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@AllArgsConstructor @Getter @ToString
public class SearchResultSet {
public SearchResultSet() {
results = new ArrayList<>();
}
public List<DecoratedSearchResultItem> results;
public int size() {
return results.size();
}
}

View File

@ -8,14 +8,15 @@ import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import lombok.SneakyThrows;
import nu.marginalia.api.searchquery.*;
import nu.marginalia.api.searchquery.IndexApiGrpc;
import nu.marginalia.api.searchquery.RpcDecoratedResultItem;
import nu.marginalia.api.searchquery.RpcIndexQuery;
import nu.marginalia.api.searchquery.model.compiled.CompiledQuery;
import nu.marginalia.api.searchquery.model.compiled.CompiledQueryLong;
import nu.marginalia.api.searchquery.model.compiled.CqDataInt;
import nu.marginalia.api.searchquery.model.query.SearchSpecification;
import nu.marginalia.api.searchquery.model.results.ResultRankingContext;
import nu.marginalia.api.searchquery.model.results.ResultRankingParameters;
import nu.marginalia.api.searchquery.model.results.SearchResultSet;
import nu.marginalia.array.page.LongQueryBuffer;
import nu.marginalia.index.index.StatefulIndex;
import nu.marginalia.index.model.SearchParameters;
@ -113,7 +114,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
long endTime = System.currentTimeMillis() + request.getQueryLimits().getTimeoutMs();
SearchResultSet results = wmsa_query_time
List<RpcDecoratedResultItem> results = wmsa_query_time
.labels(nodeName, "GRPC")
.time(() -> {
// Perform the search
@ -132,48 +133,8 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
}
// Send the results back to the client
for (var result : results.results) {
var rawResult = result.rawIndexResult;
var rawItem = RpcRawResultItem.newBuilder();
rawItem.setCombinedId(rawResult.combinedId);
rawItem.setHtmlFeatures(rawResult.htmlFeatures);
rawItem.setEncodedDocMetadata(rawResult.encodedDocMetadata);
rawItem.setHasPriorityTerms(rawResult.hasPrioTerm);
for (var score : rawResult.keywordScores) {
rawItem.addKeywordScores(
RpcResultKeywordScore.newBuilder()
.setFlags(score.flags)
.setPositions(score.positionCount)
.setKeyword(score.keyword)
);
}
var decoratedBuilder = RpcDecoratedResultItem.newBuilder()
.setDataHash(result.dataHash)
.setDescription(result.description)
.setFeatures(result.features)
.setFormat(result.format)
.setRankingScore(result.rankingScore)
.setTitle(result.title)
.setUrl(result.url.toString())
.setUrlQuality(result.urlQuality)
.setWordsTotal(result.wordsTotal)
.setBestPositions(result.bestPositions)
.setResultsFromDomain(result.resultsFromDomain)
.setRawItem(rawItem);
var rankingDetails = IndexProtobufCodec.convertRankingDetails(result.rankingDetails);
if (rankingDetails != null) {
decoratedBuilder.setRankingDetails(rankingDetails);
}
if (result.pubYear != null) {
decoratedBuilder.setPubYear(result.pubYear);
}
responseObserver.onNext(decoratedBuilder.build());
for (var result : results) {
responseObserver.onNext(result);
}
responseObserver.onCompleted();
@ -187,7 +148,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
// exists for test access
@SneakyThrows
SearchResultSet justQuery(SearchSpecification specsSet) {
List<RpcDecoratedResultItem> justQuery(SearchSpecification specsSet) {
return executeSearch(new SearchParameters(specsSet, getSearchSet(specsSet)));
}
@ -210,11 +171,11 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
}
// accessible for tests
public SearchResultSet executeSearch(SearchParameters params) throws SQLException, InterruptedException {
public List<RpcDecoratedResultItem> executeSearch(SearchParameters params) throws SQLException, InterruptedException {
if (!statefulIndex.isLoaded()) {
// Short-circuit if the index is not loaded, as we trivially know that there can be no results
return new SearchResultSet(List.of());
return List.of();
}
ResultRankingContext rankingContext = createRankingContext(params.rankingParams,
@ -223,7 +184,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
var queryExecution = new QueryExecution(rankingContext, params.fetchSize);
var ret = queryExecution.run(params);
List<RpcDecoratedResultItem> ret = queryExecution.run(params);
wmsa_index_query_exec_block_time
.labels(nodeName)
@ -235,30 +196,69 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
return ret;
}
/** This class is responsible for ranking the results and adding the best results to the
* resultHeap, which depending on the state of the indexLookup threads may or may not block
*/
private ResultRankingContext createRankingContext(ResultRankingParameters rankingParams,
CompiledQuery<String> compiledQuery,
CompiledQueryLong compiledQueryIds)
{
int[] full = new int[compiledQueryIds.size()];
int[] prio = new int[compiledQueryIds.size()];
BitSet ngramsMask = new BitSet(compiledQuery.size());
BitSet regularMask = new BitSet(compiledQuery.size());
var currentIndex = statefulIndex.get();
for (int idx = 0; idx < compiledQueryIds.size(); idx++) {
long id = compiledQueryIds.at(idx);
full[idx] = currentIndex.numHits(id);
prio[idx] = currentIndex.numHitsPrio(id);
if (compiledQuery.at(idx).contains("_")) {
ngramsMask.set(idx);
}
else {
regularMask.set(idx);
}
}
return new ResultRankingContext(currentIndex.totalDocCount(),
rankingParams,
ngramsMask,
regularMask,
new CqDataInt(full),
new CqDataInt(prio));
}
/** This class is responsible for executing a search query. It uses a thread pool to
* execute the subqueries and their valuation in parallel. The results are then combined
* into a bounded priority queue, and finally the best results are returned.
*/
private class QueryExecution {
private static final Executor workerPool = Executors.newWorkStealingPool(indexValuationThreads*4);
/** The queue where the results from the index lookup threads are placed,
* pending ranking by the result ranker threads */
private final ArrayBlockingQueue<CombinedDocIdList> resultCandidateQueue
= new ArrayBlockingQueue<>(8);
private final ResultPriorityQueue resultHeap;
private final ResultRankingContext resultRankingContext;
private final AtomicInteger remainingIndexTasks = new AtomicInteger(0);
private final AtomicInteger remainingValuationTasks = new AtomicInteger(0);
private final AtomicInteger remainingValuationTasks = new AtomicInteger(0);
private final AtomicLong blockTime = new AtomicLong(0);
private final AtomicLong stallTime = new AtomicLong(0);
public long getStallTime() {
return stallTime.get();
}
public long getBlockTime() {
return blockTime.get();
}
@ -269,7 +269,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
}
/** Execute a search query */
public SearchResultSet run(SearchParameters parameters) throws SQLException, InterruptedException {
public List<RpcDecoratedResultItem> run(SearchParameters parameters) throws SQLException, InterruptedException {
var terms = new SearchTerms(parameters.query, parameters.compiledQueryIds);
@ -286,7 +286,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
awaitCompletion();
// Return the best results
return new SearchResultSet(resultValuator.selectBestResults(parameters, resultHeap));
return resultValuator.selectBestResults(parameters, resultHeap);
}
/** Wait for all tasks to complete */
@ -297,12 +297,12 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
}
}
}
/** This class is responsible for executing a subquery and adding the results to the
* resultCandidateQueue, which depending on the state of the valuator threads may
* or may not block */
class IndexLookup implements Runnable {
private final IndexQuery query;
private final IndexSearchBudget budget;
IndexLookup(IndexQuery query,
@ -344,7 +344,7 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
results.add(buffer.data.get(i));
}
if (results.size() < 512) {
if (results.size() >= 512) {
enqueueResults(new CombinedDocIdList(results));
results.clear();
}
@ -371,13 +371,11 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
logger.warn("Interrupted while waiting to offer resultIds to queue", e);
}
}
}
/** This class is responsible for ranking the results and adding the best results to the
* resultHeap, which depending on the state of the indexLookup threads may or may not block
*/
}
class ResultRanker implements Runnable {
private final SearchParameters parameters;
private final ResultRankingContext rankingContext;
ResultRanker(SearchParameters parameters, ResultRankingContext rankingContext) {
@ -401,7 +399,6 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
}
}
}
private boolean execute() throws InterruptedException {
long start = System.currentTimeMillis();
@ -426,43 +423,10 @@ public class IndexGrpcService extends IndexApiGrpc.IndexApiImplBase {
return true; // keep going
}
}
}
private ResultRankingContext createRankingContext(ResultRankingParameters rankingParams,
CompiledQuery<String> compiledQuery,
CompiledQueryLong compiledQueryIds)
{
int[] full = new int[compiledQueryIds.size()];
int[] prio = new int[compiledQueryIds.size()];
BitSet ngramsMask = new BitSet(compiledQuery.size());
BitSet regularMask = new BitSet(compiledQuery.size());
var currentIndex = statefulIndex.get();
for (int idx = 0; idx < compiledQueryIds.size(); idx++) {
long id = compiledQueryIds.at(idx);
full[idx] = currentIndex.numHits(id);
prio[idx] = currentIndex.numHitsPrio(id);
if (compiledQuery.at(idx).contains("_")) {
ngramsMask.set(idx);
}
else {
regularMask.set(idx);
}
}
return new ResultRankingContext(currentIndex.totalDocCount(),
rankingParams,
ngramsMask,
regularMask,
new CqDataInt(full),
new CqDataInt(prio));
}
}

View File

@ -6,9 +6,11 @@ import gnu.trove.list.TLongList;
import gnu.trove.list.array.TLongArrayList;
import gnu.trove.map.hash.TObjectLongHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import nu.marginalia.api.searchquery.RpcDecoratedResultItem;
import nu.marginalia.api.searchquery.RpcRawResultItem;
import nu.marginalia.api.searchquery.RpcResultKeywordScore;
import nu.marginalia.api.searchquery.model.compiled.CompiledQuery;
import nu.marginalia.api.searchquery.model.query.SearchQuery;
import nu.marginalia.api.searchquery.model.results.DecoratedSearchResultItem;
import nu.marginalia.api.searchquery.model.results.ResultRankingContext;
import nu.marginalia.api.searchquery.model.results.SearchResultItem;
import nu.marginalia.index.index.CombinedIndexReader;
@ -109,8 +111,8 @@ public class IndexResultRankingService {
}
public List<DecoratedSearchResultItem> selectBestResults(SearchParameters params,
Collection<SearchResultItem> results) throws SQLException {
public List<RpcDecoratedResultItem> selectBestResults(SearchParameters params,
Collection<SearchResultItem> results) throws SQLException {
var domainCountFilter = new IndexResultDomainDeduplicator(params.limitByDomain);
@ -141,7 +143,7 @@ public class IndexResultRankingService {
detailsById.put(item.urlId(), item);
}
List<DecoratedSearchResultItem> resultItems = new ArrayList<>(resultsList.size());
List<RpcDecoratedResultItem> resultItems = new ArrayList<>(resultsList.size());
// Decorate the results with the document details
for (var result : resultsList) {
@ -153,23 +155,45 @@ public class IndexResultRankingService {
continue;
}
// Create a decorated search result item from the result and the document data
resultItems.add(new DecoratedSearchResultItem(
result,
docData.url(),
docData.title(),
docData.description(),
docData.urlQuality(),
docData.format(),
docData.features(),
docData.pubYear(),
docData.dataHash(),
docData.wordsTotal(),
0L, //bestPositions(wordMetas),
result.getScore(),
domainCountFilter.getCount(result),
null
));
var rawItem = RpcRawResultItem.newBuilder();
rawItem.setCombinedId(result.combinedId);
rawItem.setHtmlFeatures(result.htmlFeatures);
rawItem.setEncodedDocMetadata(result.encodedDocMetadata);
rawItem.setHasPriorityTerms(result.hasPrioTerm);
for (var score : result.keywordScores) {
rawItem.addKeywordScores(
RpcResultKeywordScore.newBuilder()
.setFlags(score.flags)
.setPositions(score.positionCount)
.setKeyword(score.keyword)
);
}
var decoratedBuilder = RpcDecoratedResultItem.newBuilder()
.setDataHash(docData.dataHash())
.setDescription(docData.description())
.setFeatures(docData.features())
.setFormat(docData.format())
.setRankingScore(result.getScore())
.setTitle(docData.title())
.setUrl(docData.url().toString())
.setUrlQuality(docData.urlQuality())
.setWordsTotal(docData.wordsTotal())
.setBestPositions(0 /* FIXME */)
.setResultsFromDomain(domainCountFilter.getCount(result))
.setRawItem(rawItem);
if (docData.pubYear() != null) {
decoratedBuilder.setPubYear(docData.pubYear());
}
/* FIXME
var rankingDetails = IndexProtobufCodec.convertRankingDetails(result.rankingDetails);
if (rankingDetails != null) {
decoratedBuilder.setRankingDetails(rankingDetails);
}*/
}
return resultItems;

View File

@ -130,9 +130,9 @@ public class IndexQueryServiceIntegrationSmokeTest {
int[] idxes = new int[] { 30, 510, 90, 150, 210, 270, 330, 390, 450 };
long[] ids = IntStream.of(idxes).mapToLong(this::fullId).toArray();
long[] actual = rsp.results
long[] actual = rsp
.stream()
.mapToLong(i -> i.rawIndexResult.getDocumentId())
.mapToLong(i -> i.getRawItem().getCombinedId())
.toArray();
System.out.println(Arrays.toString(actual));
@ -177,9 +177,9 @@ public class IndexQueryServiceIntegrationSmokeTest {
int[] idxes = new int[] { 504, 360, 420, 480, 240, 180, 300, 120, 280, 440 };
long[] ids = IntStream.of(idxes).mapToLong(Long::valueOf).toArray();
long[] actual = rsp.results
long[] actual = rsp
.stream()
.mapToLong(i -> i.rawIndexResult.getDocumentId())
.mapToLong(i -> i.getRawItem().getCombinedId())
.map(UrlIdCodec::getDocumentOrdinal)
.toArray();
@ -224,7 +224,7 @@ public class IndexQueryServiceIntegrationSmokeTest {
Collections.emptyList())).build());
int[] idxes = new int[] { 210, 270 };
long[] ids = IntStream.of(idxes).mapToLong(id -> UrlIdCodec.encodeId(id/100, id)).toArray();
long[] actual = rsp.results.stream().mapToLong(i -> i.rawIndexResult.getDocumentId()).toArray();
long[] actual = rsp.stream().mapToLong(i -> i.getRawItem().getCombinedId()).toArray();
Assertions.assertArrayEquals(ids, actual);
}
@ -262,12 +262,12 @@ public class IndexQueryServiceIntegrationSmokeTest {
Set<Integer> years = new HashSet<>();
for (var res : rsp.results) {
years.add(DocumentMetadata.decodeYear(res.rawIndexResult.encodedDocMetadata));
for (var res : rsp) {
years.add(DocumentMetadata.decodeYear(res.getRawItem().getCombinedId()));
}
assertEquals(Set.of(1998), years);
assertEquals(rsp.results.size(), 10);
assertEquals(rsp.size(), 10);
}

View File

@ -347,8 +347,8 @@ public class IndexQueryServiceIntegrationTest {
System.out.println(rsp);
for (var result : rsp.results) {
long docId = result.rawIndexResult.getDocumentId();
for (var result : rsp) {
long docId = result.getRawItem().getCombinedId();
actual.add(new MockDataDocument(UrlIdCodec.getDomainId(docId), UrlIdCodec.getDocumentOrdinal(docId)));
}
@ -382,9 +382,9 @@ public class IndexQueryServiceIntegrationTest {
includeAndCohere("hello", "world")
)));
assertEquals(1, rsp.results.size());
assertEquals(1, rsp.size());
assertEquals(d(2,2).docId(),
rsp.results.get(0).rawIndexResult.getDocumentId());
rsp.get(0).getRawItem().getCombinedId());
}
SearchSpecification basicQuery(Function<SearchSpecification.SearchSpecificationBuilder, SearchSpecification.SearchSpecificationBuilder> mutator)