mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-24 05:18:58 +00:00
(client) Fix handling of channel refreshes
The previous code made an incorrect assumption that all routes refer to the same node, and would overwrite the route list on each update. This lead to storms of closing and opening channels whenever an update was received. The new code is correctly aware that we may talk to multiple nodes.
This commit is contained in:
parent
f85ec28a16
commit
746a865106
@ -21,7 +21,7 @@ import java.util.function.Function;
|
|||||||
* Manages unicast-style requests */
|
* Manages unicast-style requests */
|
||||||
public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
||||||
private final Map<InstanceAddress<?>, ManagedChannel> channels = new ConcurrentHashMap<>();
|
private final Map<InstanceAddress<?>, ManagedChannel> channels = new ConcurrentHashMap<>();
|
||||||
private volatile Set<InstanceAddress<?>> routes = Set.of();
|
private final Map<Integer, Set<InstanceAddress<?>>> routes = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(GrpcSingleNodeChannelPool.class);
|
private static final Logger logger = LoggerFactory.getLogger(GrpcSingleNodeChannelPool.class);
|
||||||
|
|
||||||
@ -68,22 +68,17 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
|||||||
private void refreshNode(int node) {
|
private void refreshNode(int node) {
|
||||||
|
|
||||||
Set<InstanceAddress<?>> newRoutes = serviceRegistryIf.getEndpoints(ApiSchema.GRPC, serviceId, node);
|
Set<InstanceAddress<?>> newRoutes = serviceRegistryIf.getEndpoints(ApiSchema.GRPC, serviceId, node);
|
||||||
Set<InstanceAddress<?>> oldRoutes = routes;
|
Set<InstanceAddress<?>> oldRoutes = routes.getOrDefault(node, Set.of());
|
||||||
|
|
||||||
if (!oldRoutes.equals(newRoutes)) {
|
|
||||||
// Find the routes that have been added or removed
|
// Find the routes that have been added or removed
|
||||||
for (var route : Sets.symmetricDifference(oldRoutes, newRoutes)) {
|
for (var route : Sets.symmetricDifference(oldRoutes, newRoutes)) {
|
||||||
|
|
||||||
ManagedChannel oldChannel;
|
ManagedChannel oldChannel;
|
||||||
|
|
||||||
if (newRoutes.contains(route)) {
|
if (newRoutes.contains(route)) {
|
||||||
logger.info(STR."Adding channel for \{serviceId.serviceName}-\{node} \{route.host()}:\{route.port()}");
|
|
||||||
|
|
||||||
var newChannel = channelConstructor.apply(route);
|
var newChannel = channelConstructor.apply(route);
|
||||||
oldChannel = channels.put(route, newChannel);
|
oldChannel = channels.put(route, newChannel);
|
||||||
} else {
|
} else {
|
||||||
logger.info(STR."Removing channel for \{serviceId.serviceName}-\{node} \{route.host()}:\{route.port()}");
|
|
||||||
|
|
||||||
oldChannel = channels.remove(route);
|
oldChannel = channels.remove(route);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,8 +86,7 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
|||||||
oldChannel.shutdown();
|
oldChannel.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
routes = newRoutes;
|
routes.put(node, newRoutes);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasChannel() {
|
public boolean hasChannel() {
|
||||||
|
Loading…
Reference in New Issue
Block a user