(query-client) Fix query client

The query service delegates and aggregates IndexDomainLinksApiGrpc
messages to the index services.  The query client was accidentally
also doing this, instead of talking to the query client.

Fixed so it correctly talks to the query client and nothing else.
This commit is contained in:
Viktor Lofgren 2024-02-20 15:44:07 +01:00
parent c600d7aa47
commit 14172312dc

View File

@ -3,11 +3,7 @@ package nu.marginalia.query.client;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import io.prometheus.client.Summary; import io.prometheus.client.Summary;
import nu.marginalia.service.client.GrpcMultiNodeChannelPool; import nu.marginalia.index.api.*;
import nu.marginalia.index.api.Empty;
import nu.marginalia.index.api.IndexDomainLinksApiGrpc;
import nu.marginalia.index.api.QueryApiGrpc;
import nu.marginalia.index.api.RpcDomainId;
import nu.marginalia.query.QueryProtobufCodec; import nu.marginalia.query.QueryProtobufCodec;
import nu.marginalia.query.model.QueryParams; import nu.marginalia.query.model.QueryParams;
import nu.marginalia.query.model.QueryResponse; import nu.marginalia.query.model.QueryResponse;
@ -31,18 +27,14 @@ public class QueryClient {
.register(); .register();
private final GrpcSingleNodeChannelPool<QueryApiGrpc.QueryApiBlockingStub> queryApiPool; private final GrpcSingleNodeChannelPool<QueryApiGrpc.QueryApiBlockingStub> queryApiPool;
private final GrpcMultiNodeChannelPool<IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub> domainLinkApiPool; private final GrpcSingleNodeChannelPool<IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub> domainLinkApiPool;
public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub domainApi(int node) {
return domainLinkApiPool.apiForNode(node);
}
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject @Inject
public QueryClient(GrpcChannelPoolFactory channelPoolFactory) { public QueryClient(GrpcChannelPoolFactory channelPoolFactory) {
this.queryApiPool = channelPoolFactory.createSingle(ServiceId.Query, QueryApiGrpc::newBlockingStub); this.queryApiPool = channelPoolFactory.createSingle(ServiceId.Query, QueryApiGrpc::newBlockingStub);
this.domainLinkApiPool = channelPoolFactory.createMulti(ServiceId.Index, IndexDomainLinksApiGrpc::newBlockingStub); this.domainLinkApiPool = channelPoolFactory.createSingle(ServiceId.Query, IndexDomainLinksApiGrpc::newBlockingStub);
} }
@CheckReturnValue @CheckReturnValue
@ -59,27 +51,28 @@ public class QueryClient {
public AllLinks getAllDomainLinks() { public AllLinks getAllDomainLinks() {
AllLinks links = new AllLinks(); AllLinks links = new AllLinks();
domainApi(0).getAllLinks(Empty.newBuilder().build()).forEachRemaining(pairs -> { domainLinkApiPool.api()
for (int i = 0; i < pairs.getDestIdsCount(); i++) { .getAllLinks(Empty.getDefaultInstance())
links.add(pairs.getSourceIds(i), pairs.getDestIds(i)); .forEachRemaining(pairs -> {
} for (int i = 0; i < pairs.getDestIdsCount(); i++) {
}); links.add(pairs.getSourceIds(i), pairs.getDestIds(i));
}
});
return links; return links;
} }
public List<Integer> getLinksToDomain(int domainId) { public List<Integer> getLinksToDomain(int domainId) {
try { try {
return domainLinkApiPool.callEachSequential( return domainLinkApiPool.api()
api -> api.getLinksToDomain(RpcDomainId .getLinksToDomain(RpcDomainId
.newBuilder() .newBuilder()
.setDomainId(domainId) .setDomainId(domainId)
.build()) .build())
.getDomainIdList()) .getDomainIdList()
.flatMap(List::stream) .stream()
.sorted() .sorted()
.toList(); .toList();
} }
catch (Exception e) { catch (Exception e) {
logger.error("API Exception", e); logger.error("API Exception", e);
@ -89,13 +82,13 @@ public class QueryClient {
public List<Integer> getLinksFromDomain(int domainId) { public List<Integer> getLinksFromDomain(int domainId) {
try { try {
return domainLinkApiPool.callEachSequential( return domainLinkApiPool.api()
api -> api.getLinksFromDomain(RpcDomainId .getLinksFromDomain(RpcDomainId
.newBuilder() .newBuilder()
.setDomainId(domainId) .setDomainId(domainId)
.build()) .build())
.getDomainIdList()) .getDomainIdList()
.flatMap(List::stream) .stream()
.sorted() .sorted()
.toList(); .toList();
} }
@ -107,13 +100,12 @@ public class QueryClient {
public int countLinksToDomain(int domainId) { public int countLinksToDomain(int domainId) {
try { try {
return domainLinkApiPool.callEachSequential( return domainLinkApiPool.api()
api -> api.countLinksToDomain(RpcDomainId .countLinksToDomain(RpcDomainId
.newBuilder() .newBuilder()
.setDomainId(domainId) .setDomainId(domainId)
.build()).getIdCount()) .build())
.mapToInt(Integer::valueOf) .getIdCount();
.sum();
} }
catch (Exception e) { catch (Exception e) {
logger.error("API Exception", e); logger.error("API Exception", e);
@ -123,13 +115,12 @@ public class QueryClient {
public int countLinksFromDomain(int domainId) { public int countLinksFromDomain(int domainId) {
try { try {
return domainLinkApiPool.callEachSequential( return domainLinkApiPool.api()
api -> api.countLinksFromDomain(RpcDomainId .countLinksFromDomain(RpcDomainId
.newBuilder() .newBuilder()
.setDomainId(domainId) .setDomainId(domainId)
.build()).getIdCount()) .build())
.mapToInt(Integer::valueOf) .getIdCount();
.sum();
} }
catch (Exception e) { catch (Exception e) {
logger.error("API Exception", e); logger.error("API Exception", e);