From a0648844fb3c2d1bbbb5a14cd8963c2429e67fad Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Wed, 28 Feb 2024 14:35:29 +0100 Subject: [PATCH] (grpc) Reduce error spam --- .../client/GrpcSingleNodeChannelPool.java | 30 +++++++++++++++++-- .../nu/marginalia/service/server/Service.java | 9 +++++- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java b/code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java index 9c777c2b..873cc2a6 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java +++ b/code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java @@ -8,6 +8,7 @@ import nu.marginalia.service.discovery.monitor.ServiceChangeMonitor; import nu.marginalia.service.discovery.property.PartitionTraits; import nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress; import nu.marginalia.service.discovery.property.ServiceKey; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,9 +74,11 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { return true; } - private class ConnectionHolder { + private class ConnectionHolder implements Comparable { private final AtomicReference channel = new AtomicReference<>(); private final InstanceAddress address; + private volatile long lastError = Long.MIN_VALUE; + private volatile long lastUsed = Long.MAX_VALUE; ConnectionHolder(InstanceAddress address) { this.address = address; @@ -83,6 +86,9 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { public ManagedChannel get() { var value = channel.get(); + + lastUsed = System.currentTimeMillis(); + if (value != null) { return value; } @@ -125,6 +131,23 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { public int hashCode() { return Objects.hash(address); } + + private boolean hasRecentError() { + return System.currentTimeMillis() < lastError + 5000; + } + + void flagError() { + lastError = System.currentTimeMillis(); + } + + @Override + public int compareTo(@NotNull GrpcSingleNodeChannelPool.ConnectionHolder o) { + // If one has recently errored and the other has not, the one that has not errored is preferred + int diff = Boolean.compare(hasRecentError(), o.hasRecentError()); + if (diff != 0) return diff; + + return Long.compare(lastUsed, o.lastUsed); + } } @@ -150,14 +173,15 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { final List exceptions = new ArrayList<>(); final List connectionHolders = new ArrayList<>(channels.values()); - // Randomize the order of the connection holders to spread out the load - Collections.shuffle(connectionHolders); + Collections.sort(connectionHolders); for (var channel : connectionHolders) { try { return call.apply(stubConstructor.apply(channel.get()), arg); } catch (Exception e) { + channel.flagError(); + exceptions.add(e); } } diff --git a/code/common/service/java/nu/marginalia/service/server/Service.java b/code/common/service/java/nu/marginalia/service/server/Service.java index c5dfa1ea..f4f51375 100644 --- a/code/common/service/java/nu/marginalia/service/server/Service.java +++ b/code/common/service/java/nu/marginalia/service/server/Service.java @@ -8,6 +8,7 @@ import io.prometheus.client.Counter; import lombok.SneakyThrows; import nu.marginalia.mq.inbox.*; import nu.marginalia.service.NamedExecutorFactory; +import nu.marginalia.service.client.ServiceNotAvailableException; import nu.marginalia.service.discovery.property.*; import nu.marginalia.service.id.ServiceId; import nu.marginalia.service.server.mq.ServiceMqSubscription; @@ -99,7 +100,13 @@ public class Service { initialization.addCallback(() -> serviceRegistry.announceInstance(config.instanceUuid())); Thread.setDefaultUncaughtExceptionHandler((t, e) -> { - logger.error("Uncaught exception", e); + if (e instanceof ServiceNotAvailableException) { + // reduce log spam for this common case + logger.error("Service not available: {}", e.getMessage()); + } + else { + logger.error("Uncaught exception", e); + } request_counter_err.labels(serviceName, Integer.toString(node)).inc(); });