From 3d1c15ef99a79384eebddd1467bec78775cfd50f Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 16 Oct 2023 12:34:01 +0200 Subject: [PATCH] (client) Refactor liveness monitor --- .../nu/marginalia/client/AbstractClient.java | 85 +++++-------------- .../client/EndpointLivenessMonitor.java | 83 ++++++++++++++++++ 2 files changed, 103 insertions(+), 65 deletions(-) create mode 100644 code/common/service-client/src/main/java/nu/marginalia/client/EndpointLivenessMonitor.java diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java index 19905ecf..19e58bc4 100644 --- a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java @@ -32,11 +32,10 @@ public abstract class AbstractClient implements AutoCloseable { private final OkHttpClient client; private boolean quiet; - private final ServiceRoutes serviceRoutes; + final ServiceRoutes serviceRoutes; private int timeout; - private final LivenessMonitor livenessMonitor = new LivenessMonitor(); - private final Thread livenessMonitorThread; + private final EndpointLivenessMonitor livenessMonitor; public void setTimeout(int timeout) { this.timeout = timeout; @@ -71,71 +70,13 @@ public abstract class AbstractClient implements AutoCloseable { } }); - livenessMonitorThread = new Thread(livenessMonitor, getClass().getSimpleName() + "-monitor"); - livenessMonitorThread.setDaemon(true); - livenessMonitorThread.start(); - logger.info("Finished creating client for {}", getClass().getSimpleName()); - } - - private class LivenessMonitor implements Runnable { - private final ConcurrentHashMap alivenessMap = new ConcurrentHashMap<>(); - - @SneakyThrows - public void run() { - Thread.sleep(100); // Wait for initialization - try { - for (; ; ) { - boolean allAlive = true; - try { - for (int node : serviceRoutes.getNodes()) { - boolean isResponsive = isResponsive(node); - alivenessMap.put(node, isResponsive); - allAlive &= isResponsive; - } - } - // - catch (Exception ex) { - logger.warn("Oops", ex); - } - if (allAlive) { - synchronized (this) { - wait(1000); - } - } - else { - Thread.sleep(100); - } - } - } catch (InterruptedException ex) { - // nothing to see here - } - } - - public boolean isAlive(int node) { - // compute-if-absence ensures we do a synchronous status check if this is a cold start, - // that way we don't have to wait for the polling loop to find out if the service is up - return alivenessMap.computeIfAbsent(node, this::isResponsive); - } - - public synchronized boolean isResponsive(int node) { - Context ctx = Context.internal("ping"); - var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + "/internal/ping").get().build(); - return Observable.just(client.newCall(req)) - .subscribeOn(scheduler().get()) - .map(Call::execute) - .map(AbstractClient.this::getResponseStatus) - .flatMap(line -> validateStatus(line, req).timeout(5000, TimeUnit.SECONDS).onErrorReturn(e -> 500)) - .onErrorReturn(error -> 500) - .map(HttpStatusCode::new) - .map(HttpStatusCode::isGood) - .blockingFirst(); - } + livenessMonitor = new EndpointLivenessMonitor(this); } @Override public void close() { - livenessMonitorThread.interrupt(); + livenessMonitor.close(); scheduler().close(); } @@ -165,6 +106,20 @@ public abstract class AbstractClient implements AutoCloseable { .blockingFirst(); } + public synchronized boolean isResponsive(int node) { + Context ctx = Context.internal("ping"); + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + "/internal/ping").get().build(); + return Observable.just(client.newCall(req)) + .subscribeOn(scheduler().get()) + .map(Call::execute) + .map(AbstractClient.this::getResponseStatus) + .flatMap(line -> validateStatus(line, req).timeout(5000, TimeUnit.SECONDS).onErrorReturn(e -> 500)) + .onErrorReturn(error -> 500) + .map(HttpStatusCode::new) + .map(HttpStatusCode::isGood) + .blockingFirst(); + } + @SneakyThrows protected synchronized Observable post(Context ctx, int node, @@ -330,8 +285,8 @@ public abstract class AbstractClient implements AutoCloseable { if (!isAlive(node)) { var route = serviceRoutes.get(node); - logger.error("Route not configured for {}:{}; {}; {}", name(), node, livenessMonitor.alivenessMap, serviceRoutes.getNodes() - .stream().map(serviceRoutes::get).toList()); + logger.error("Route not configured for {}:{}", name(), node); + throw new RouteNotConfiguredException("Route not configured for " + name() + ":" + node + " -- tried " + route); } } diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/EndpointLivenessMonitor.java b/code/common/service-client/src/main/java/nu/marginalia/client/EndpointLivenessMonitor.java new file mode 100644 index 00000000..0b589a6c --- /dev/null +++ b/code/common/service-client/src/main/java/nu/marginalia/client/EndpointLivenessMonitor.java @@ -0,0 +1,83 @@ +package nu.marginalia.client; + +import lombok.SneakyThrows; +import nu.marginalia.client.route.ServiceRoutes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; + +/** Keep tabs on which endpoints are accessible via polling. This permits us to reduce the chances of + * synchronous requests blocking on timeout. + */ +public class EndpointLivenessMonitor { + + private final ConcurrentHashMap alivenessMap = new ConcurrentHashMap<>(); + private final AbstractClient client; + private final ServiceRoutes serviceRoutes; + + private static final Logger logger = LoggerFactory.getLogger(EndpointLivenessMonitor.class); + private static Thread daemonThread; + + public EndpointLivenessMonitor(AbstractClient client) { + this.client = client; + this.serviceRoutes = client.serviceRoutes; + + daemonThread = new Thread(this::run, client.getClass().getSimpleName()+":Liveness"); + daemonThread.setDaemon(true); + daemonThread.start(); + } + + @SneakyThrows + public void run() { + Thread.sleep(100); // Wait for initialization + try { + while (!Thread.interrupted()) { + if (updateLivenessMap()) { + synchronized (this) { + wait(1000); + } + } + else Thread.sleep(100); + } + } catch (InterruptedException ex) { + // nothing to see here + } + } + + private boolean updateLivenessMap() { + boolean allAlive = true; + + for (int node : serviceRoutes.getNodes()) { + allAlive &= alivenessMap.compute(node, this::isResponsive); + } + + return allAlive; + } + + private boolean isResponsive(int node, Boolean oldValue) { + try { + boolean wasAlive = Boolean.TRUE.equals(oldValue); + boolean isAlive = client.isResponsive(node); + if (wasAlive != isAlive) { + logger.info("Liveness change {}:{} -- {}", client.name(), node, isAlive ? "UP":"DOWN"); + } + return isAlive; + } + catch (Exception ex) { + logger.warn("Oops", ex); + return false; + } + } + + public boolean isAlive(int node) { + // compute-if-absence ensures we do a synchronous status check if this is a cold start, + // that way we don't have to wait for the polling loop to find out if the service is up + return alivenessMap.computeIfAbsent(node, client::isResponsive); + } + + + public void close() { + daemonThread.interrupt(); + } +}