From 746a86510666beb4809ee3f1b6feeaaf23c12a3d Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Tue, 20 Feb 2024 14:14:09 +0100 Subject: [PATCH] (client) Fix handling of channel refreshes The previous code made an incorrect assumption that all routes refer to the same node, and would overwrite the route list on each update. This lead to storms of closing and opening channels whenever an update was received. The new code is correctly aware that we may talk to multiple nodes. --- .../client/GrpcSingleNodeChannelPool.java | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/code/common/service-discovery/src/main/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java b/code/common/service-discovery/src/main/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java index 69941e51..c59e2498 100644 --- a/code/common/service-discovery/src/main/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java +++ b/code/common/service-discovery/src/main/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java @@ -21,7 +21,7 @@ import java.util.function.Function; * Manages unicast-style requests */ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { private final Map, ManagedChannel> channels = new ConcurrentHashMap<>(); - private volatile Set> routes = Set.of(); + private final Map>> routes = new ConcurrentHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(GrpcSingleNodeChannelPool.class); @@ -68,31 +68,25 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { private void refreshNode(int node) { Set> newRoutes = serviceRegistryIf.getEndpoints(ApiSchema.GRPC, serviceId, node); - Set> oldRoutes = routes; + Set> oldRoutes = routes.getOrDefault(node, Set.of()); - if (!oldRoutes.equals(newRoutes)) { - // Find the routes that have been added or removed - for (var route : Sets.symmetricDifference(oldRoutes, newRoutes)) { + // Find the routes that have been added or removed + for (var route : Sets.symmetricDifference(oldRoutes, newRoutes)) { - ManagedChannel oldChannel; + ManagedChannel oldChannel; - if (newRoutes.contains(route)) { - logger.info(STR."Adding channel for \{serviceId.serviceName}-\{node} \{route.host()}:\{route.port()}"); - - var newChannel = channelConstructor.apply(route); - oldChannel = channels.put(route, newChannel); - } else { - logger.info(STR."Removing channel for \{serviceId.serviceName}-\{node} \{route.host()}:\{route.port()}"); - - oldChannel = channels.remove(route); - } - - if (oldChannel != null) - oldChannel.shutdown(); + if (newRoutes.contains(route)) { + var newChannel = channelConstructor.apply(route); + oldChannel = channels.put(route, newChannel); + } else { + oldChannel = channels.remove(route); } - routes = newRoutes; + if (oldChannel != null) + oldChannel.shutdown(); } + + routes.put(node, newRoutes); } public boolean hasChannel() {