From 14172312dc428cedf550665b1d1990e80b9983d5 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Tue, 20 Feb 2024 15:44:07 +0100 Subject: [PATCH] (query-client) Fix query client The query service delegates and aggregates IndexDomainLinksApiGrpc messages to the index services. The query client was accidentally also doing this, instead of talking to the query client. Fixed so it correctly talks to the query client and nothing else. --- .../marginalia/query/client/QueryClient.java | 75 ++++++++----------- 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java b/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java index af32900c..74f78aa7 100644 --- a/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java +++ b/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java @@ -3,11 +3,7 @@ package nu.marginalia.query.client; import com.google.inject.Inject; import com.google.inject.Singleton; import io.prometheus.client.Summary; -import nu.marginalia.service.client.GrpcMultiNodeChannelPool; -import nu.marginalia.index.api.Empty; -import nu.marginalia.index.api.IndexDomainLinksApiGrpc; -import nu.marginalia.index.api.QueryApiGrpc; -import nu.marginalia.index.api.RpcDomainId; +import nu.marginalia.index.api.*; import nu.marginalia.query.QueryProtobufCodec; import nu.marginalia.query.model.QueryParams; import nu.marginalia.query.model.QueryResponse; @@ -31,18 +27,14 @@ public class QueryClient { .register(); private final GrpcSingleNodeChannelPool queryApiPool; - private final GrpcMultiNodeChannelPool domainLinkApiPool; - - public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub domainApi(int node) { - return domainLinkApiPool.apiForNode(node); - } + private final GrpcSingleNodeChannelPool domainLinkApiPool; private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject public QueryClient(GrpcChannelPoolFactory channelPoolFactory) { this.queryApiPool = channelPoolFactory.createSingle(ServiceId.Query, QueryApiGrpc::newBlockingStub); - this.domainLinkApiPool = channelPoolFactory.createMulti(ServiceId.Index, IndexDomainLinksApiGrpc::newBlockingStub); + this.domainLinkApiPool = channelPoolFactory.createSingle(ServiceId.Query, IndexDomainLinksApiGrpc::newBlockingStub); } @CheckReturnValue @@ -59,27 +51,28 @@ public class QueryClient { public AllLinks getAllDomainLinks() { AllLinks links = new AllLinks(); - domainApi(0).getAllLinks(Empty.newBuilder().build()).forEachRemaining(pairs -> { - for (int i = 0; i < pairs.getDestIdsCount(); i++) { - links.add(pairs.getSourceIds(i), pairs.getDestIds(i)); - } - }); + domainLinkApiPool.api() + .getAllLinks(Empty.getDefaultInstance()) + .forEachRemaining(pairs -> { + for (int i = 0; i < pairs.getDestIdsCount(); i++) { + links.add(pairs.getSourceIds(i), pairs.getDestIds(i)); + } + }); return links; } public List getLinksToDomain(int domainId) { try { - return domainLinkApiPool.callEachSequential( - api -> api.getLinksToDomain(RpcDomainId + return domainLinkApiPool.api() + .getLinksToDomain(RpcDomainId .newBuilder() .setDomainId(domainId) .build()) - .getDomainIdList()) - .flatMap(List::stream) + .getDomainIdList() + .stream() .sorted() .toList(); - } catch (Exception e) { logger.error("API Exception", e); @@ -89,13 +82,13 @@ public class QueryClient { public List getLinksFromDomain(int domainId) { try { - return domainLinkApiPool.callEachSequential( - api -> api.getLinksFromDomain(RpcDomainId - .newBuilder() - .setDomainId(domainId) - .build()) - .getDomainIdList()) - .flatMap(List::stream) + return domainLinkApiPool.api() + .getLinksFromDomain(RpcDomainId + .newBuilder() + .setDomainId(domainId) + .build()) + .getDomainIdList() + .stream() .sorted() .toList(); } @@ -107,13 +100,12 @@ public class QueryClient { public int countLinksToDomain(int domainId) { try { - return domainLinkApiPool.callEachSequential( - api -> api.countLinksToDomain(RpcDomainId - .newBuilder() - .setDomainId(domainId) - .build()).getIdCount()) - .mapToInt(Integer::valueOf) - .sum(); + return domainLinkApiPool.api() + .countLinksToDomain(RpcDomainId + .newBuilder() + .setDomainId(domainId) + .build()) + .getIdCount(); } catch (Exception e) { logger.error("API Exception", e); @@ -123,13 +115,12 @@ public class QueryClient { public int countLinksFromDomain(int domainId) { try { - return domainLinkApiPool.callEachSequential( - api -> api.countLinksFromDomain(RpcDomainId - .newBuilder() - .setDomainId(domainId) - .build()).getIdCount()) - .mapToInt(Integer::valueOf) - .sum(); + return domainLinkApiPool.api() + .countLinksFromDomain(RpcDomainId + .newBuilder() + .setDomainId(domainId) + .build()) + .getIdCount(); } catch (Exception e) { logger.error("API Exception", e);