diff --git a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java b/code/common/service-discovery/java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java index c725fa0b..d4f75e66 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java +++ b/code/common/service-discovery/java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java @@ -48,20 +48,32 @@ public class GrpcMultiNodeChannelPool { } private GrpcSingleNodeChannelPool getPoolForNode(int node) { - return pools.computeIfAbsent(node, _ -> - new GrpcSingleNodeChannelPool<>( - serviceRegistryIf, - serviceKey.forPartition(ServicePartition.partition(node)), - channelConstructor, - stubConstructor)); + return pools.computeIfAbsent(node, this::newSingleChannelPool); } + private GrpcSingleNodeChannelPool newSingleChannelPool(int node) { + return new GrpcSingleNodeChannelPool<>( + serviceRegistryIf, + serviceKey.forPartition(ServicePartition.partition(node)), + channelConstructor, + stubConstructor); + } /** Get the list of nodes that are eligible for broadcast-style requests */ public List getEligibleNodes() { return nodeConfigurationWatcher.getQueryNodes(); } + /** Create a new call builder for the given method. This is a fluent-style + * method, where you can chain calls to specify how to run the method. + *

+ * Example: + *
+     *     var results = channelPool.call(AStub:someMethod)
+     *                   .async(someExecutor)
+     *                   .runAll(argumentToSomeMethod);
+     * 
+ * */ public CallBuilderBase call(BiFunction method) { return new CallBuilderBase<>(method); } @@ -73,16 +85,20 @@ public class GrpcMultiNodeChannelPool { this.method = method; } + /** Create a call for the given method on the given node */ public GrpcSingleNodeChannelPool.CallBuilderBase forNode(int node) { return getPoolForNode(node).call(method); } + /** Run the given method on each node, returning a list of results. + * This is a blocking method, where each call will be made in sequence */ public List run(I arg) { return getEligibleNodes().stream() .map(node -> getPoolForNode(node).call(method).run(arg)) .toList(); } + /** Generate an async call builder for the given method */ public CallBuilderAsync async(ExecutorService service) { return new CallBuilderAsync<>(service, method); } diff --git a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java b/code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java index ec3bb38a..9c777c2b 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java +++ b/code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java @@ -169,6 +169,9 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { throw new ServiceNotAvailableException(serviceKey); } + /** Create a call for the given method on the given node. + * This is a fluent method, so you can chain it with other + * methods to specify the node and arguments */ public CallBuilderBase call(BiFunction method) { return new CallBuilderBase<>(method); } @@ -179,21 +182,12 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { this.method = method; } + /** Execute the call in a blocking manner */ public T run(I arg) { return call(method, arg); } - public List runFor(I... args) { - return runFor(List.of(args)); - } - - public List runFor(List args) { - List results = new ArrayList<>(); - for (var arg : args) { - results.add(call(method, arg)); - } - return results; - } + /** Create an asynchronous call using the provided executor */ public CallBuilderAsync async(Executor executor) { return new CallBuilderAsync<>(executor, method); } @@ -207,9 +201,12 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { this.method = method; } + /** Execute the call in an asynchronous manner */ public CompletableFuture run(I arg) { return CompletableFuture.supplyAsync(() -> call(method, arg), executor); } + + /** Execute the call in an asynchronous manner for each of the given arguments */ public CompletableFuture> runFor(List args) { List> results = new ArrayList<>(); for (var arg : args) { @@ -218,6 +215,8 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { return CompletableFuture.allOf(results.toArray(new CompletableFuture[0])) .thenApply(v -> results.stream().map(CompletableFuture::join).toList()); } + + /** Execute the call in an asynchronous manner for each of the given arguments */ public CompletableFuture> runFor(I... args) { return runFor(List.of(args)); } diff --git a/code/common/service-discovery/java/nu/marginalia/service/discovery/ZkServiceRegistry.java b/code/common/service-discovery/java/nu/marginalia/service/discovery/ZkServiceRegistry.java index e4eca465..0e233ced 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/discovery/ZkServiceRegistry.java +++ b/code/common/service-discovery/java/nu/marginalia/service/discovery/ZkServiceRegistry.java @@ -79,7 +79,7 @@ public class ZkServiceRegistry implements ServiceRegistryIf { curatorFramework.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) - .forPath(STR."/first-boot"); + .forPath("/first-boot"); } }