From 16e0738731590173e172f14032ab4c31ca480cba Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Sun, 15 Oct 2023 18:38:30 +0200 Subject: [PATCH] (*) Get multi-node routing working. --- .../assistant/client/AssistantClient.java | 2 +- .../executor/client/ExecutorClient.java | 5 +- .../marginalia/index/client/IndexClient.java | 14 ++- .../client/model/results/SearchResultSet.java | 13 +++ .../marginalia/query/client/QueryClient.java | 5 +- .../src/main/java/nu/marginalia/WmsaHome.java | 14 --- .../storage/FileStorageService.java | 31 ++++- .../migration/V23_11_0_006__clean_stores.sql | 1 + .../nu/marginalia/client/AbstractClient.java | 56 ++++----- .../client/AbstractDynamicClient.java | 13 +-- .../marginalia/client/model/ClientRoute.java | 4 - .../client/route/RouteProvider.java | 11 ++ .../marginalia/client/route/ServiceRoute.java | 12 ++ .../client/route/ServiceRoutes.java | 22 ++++ .../marginalia/client/AbstractClientTest.java | 4 +- .../service/SearchServiceDescriptors.java | 18 +-- .../service/descriptor/HostsFile.java | 45 -------- .../service/descriptor/ServiceDescriptor.java | 16 ++- .../module/ServiceConfigurationModule.java | 4 +- .../monitor/FileStorageMonitorActor.java | 2 +- .../index/IndexConstructorMain.java | 4 +- code/services-core/control-service/readme.md | 17 ++- .../nu/marginalia/control/ControlService.java | 20 ---- .../java/nu/marginalia/control/Redirects.java | 3 +- .../control/node/model/ActorState.java | 17 --- .../control/node/model/ActorStateGraph.java | 50 -------- .../control/node/model/IndexNodeStatus.java | 7 +- .../node/svc/ControlFileStorageService.java | 2 +- .../control/node/svc/ControlNodeService.java | 33 +++--- .../control/partials/nodes-table.hdb | 15 ++- .../control/partials/storage-table.hdb | 0 .../ExecutorSvcApiIntegrationTest.java | 2 +- .../nu/marginalia/query/QueryService.java | 13 ++- .../query/svc/NodeConfigurationWatcher.java | 53 +++++++++ docker-compose.yml | 107 ++++++------------ run/env/service.env | 8 +- run/nginx-site.conf | 25 ++-- run/setup.sh | 2 +- 38 files changed, 322 insertions(+), 348 deletions(-) create mode 100644 code/common/db/src/main/resources/db/migration/V23_11_0_006__clean_stores.sql delete mode 100644 code/common/service-client/src/main/java/nu/marginalia/client/model/ClientRoute.java create mode 100644 code/common/service-client/src/main/java/nu/marginalia/client/route/RouteProvider.java create mode 100644 code/common/service-client/src/main/java/nu/marginalia/client/route/ServiceRoute.java create mode 100644 code/common/service-client/src/main/java/nu/marginalia/client/route/ServiceRoutes.java delete mode 100644 code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/HostsFile.java delete mode 100644 code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/ActorState.java delete mode 100644 code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/ActorStateGraph.java delete mode 100644 code/services-core/control-service/src/main/resources/templates/control/partials/storage-table.hdb create mode 100644 code/services-core/query-service/src/main/java/nu/marginalia/query/svc/NodeConfigurationWatcher.java diff --git a/code/api/assistant-api/src/main/java/nu/marginalia/assistant/client/AssistantClient.java b/code/api/assistant-api/src/main/java/nu/marginalia/assistant/client/AssistantClient.java index 8916120c..eaa9f0f5 100644 --- a/code/api/assistant-api/src/main/java/nu/marginalia/assistant/client/AssistantClient.java +++ b/code/api/assistant-api/src/main/java/nu/marginalia/assistant/client/AssistantClient.java @@ -21,7 +21,7 @@ public class AssistantClient extends AbstractDynamicClient { @Inject public AssistantClient(ServiceDescriptors descriptors) { - super(descriptors.forId(ServiceId.Assistant), WmsaHome.getHostsFile(), GsonFactory::get); + super(descriptors.forId(ServiceId.Assistant), GsonFactory::get); } public Observable dictionaryLookup(Context ctx, String word) { diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java index a1715f0e..0fb26715 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java @@ -4,6 +4,9 @@ import com.google.inject.Inject; import nu.marginalia.WmsaHome; import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.Context; +import nu.marginalia.client.route.RouteProvider; +import nu.marginalia.client.route.ServiceRoute; +import nu.marginalia.service.descriptor.ServiceDescriptor; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.executor.model.ActorRunStates; import nu.marginalia.executor.model.crawl.RecrawlParameters; @@ -20,7 +23,7 @@ import java.util.List; public class ExecutorClient extends AbstractDynamicClient { @Inject public ExecutorClient(ServiceDescriptors descriptors) { - super(descriptors.forId(ServiceId.Executor), WmsaHome.getHostsFile(), GsonFactory::get); + super(descriptors.forId(ServiceId.Executor), GsonFactory::get); } public void startFsm(Context ctx, int node, String actorName) { diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java index c85d8899..2b4c57f8 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java @@ -5,7 +5,7 @@ import com.google.inject.Singleton; import com.google.inject.name.Named; import io.prometheus.client.Summary; import io.reactivex.rxjava3.core.Observable; -import nu.marginalia.WmsaHome; +import io.reactivex.rxjava3.schedulers.Schedulers; import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.Context; import nu.marginalia.index.client.model.query.SearchSpecification; @@ -15,6 +15,7 @@ import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.id.ServiceId; +import java.util.List; import javax.annotation.CheckReturnValue; import java.util.UUID; @@ -31,7 +32,7 @@ public class IndexClient extends AbstractDynamicClient { MessageQueueFactory messageQueueFactory, @Named("wmsa-system-node") Integer nodeId) { - super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get); + super(descriptors.forId(ServiceId.Index), GsonFactory::get); String inboxName = ServiceId.Index.name; String outboxName = System.getProperty("service-name:"+nodeId, UUID.randomUUID().toString()); @@ -50,6 +51,15 @@ public class IndexClient extends AbstractDynamicClient { ); } + @CheckReturnValue + public SearchResultSet query(Context ctx, List nodes, SearchSpecification specs) { + return Observable.fromIterable(nodes) + .subscribeOn(Schedulers.io()) + .concatMap(node -> this.postGet(ctx, node,"/search/", specs, SearchResultSet.class)) + .reduce(SearchResultSet::combine) + .blockingGet(); + } + @CheckReturnValue public Observable isBlocked(Context ctx, int node) { diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/model/results/SearchResultSet.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/model/results/SearchResultSet.java index b69f52bd..296a2df7 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/model/results/SearchResultSet.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/model/results/SearchResultSet.java @@ -4,6 +4,8 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; @AllArgsConstructor @Getter @ToString @@ -12,4 +14,15 @@ public class SearchResultSet { public int size() { return results.size(); } + + public static SearchResultSet combine(SearchResultSet l, SearchResultSet r) { + List combinedItems = new ArrayList<>(l.size() + r.size()); + combinedItems.addAll(l.results); + combinedItems.addAll(r.results); + + // TODO: Do we combine these correctly? + combinedItems.sort(Comparator.comparing(item -> item.rankingScore)); + + return new SearchResultSet(combinedItems); + } } diff --git a/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java b/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java index ecb50102..4241ef76 100644 --- a/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java +++ b/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java @@ -30,10 +30,9 @@ public class QueryClient extends AbstractDynamicClient { private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject - public QueryClient(ServiceDescriptors descriptors, - MessageQueueFactory messageQueueFactory) { + public QueryClient(ServiceDescriptors descriptors) { - super(descriptors.forId(ServiceId.Query), WmsaHome.getHostsFile(), GsonFactory::get); + super(descriptors.forId(ServiceId.Query), GsonFactory::get); } /** Delegate an Index API style query directly to the index service */ diff --git a/code/common/config/src/main/java/nu/marginalia/WmsaHome.java b/code/common/config/src/main/java/nu/marginalia/WmsaHome.java index 7a1158d4..0f5f6598 100644 --- a/code/common/config/src/main/java/nu/marginalia/WmsaHome.java +++ b/code/common/config/src/main/java/nu/marginalia/WmsaHome.java @@ -2,7 +2,6 @@ package nu.marginalia; import nu.marginalia.service.ServiceHomeNotConfiguredException; -import nu.marginalia.service.descriptor.HostsFile; import java.io.FileNotFoundException; import java.io.IOException; @@ -56,19 +55,6 @@ public class WmsaHome { .toString(); } - public static HostsFile getHostsFile() { - Path hostsFile = getHomePath().resolve("conf/hosts"); - if (Files.isRegularFile(hostsFile)) { - try { - return new HostsFile(hostsFile); - } catch (IOException e) { - throw new RuntimeException("Failed to load hosts file " + hostsFile, e); - } - } - else { - return new HostsFile(); - } - } public static Path getAdsDefinition() { return getHomePath().resolve("data").resolve("adblock.txt"); diff --git a/code/common/config/src/main/java/nu/marginalia/storage/FileStorageService.java b/code/common/config/src/main/java/nu/marginalia/storage/FileStorageService.java index 51776da3..946bbe8f 100644 --- a/code/common/config/src/main/java/nu/marginalia/storage/FileStorageService.java +++ b/code/common/config/src/main/java/nu/marginalia/storage/FileStorageService.java @@ -33,8 +33,15 @@ public class FileStorageService { public Optional findFileStorageToDelete() { try (var conn = dataSource.getConnection(); var stmt = conn.prepareStatement(""" - SELECT ID FROM FILE_STORAGE WHERE STATE='DELETE' LIMIT 1 + SELECT FILE_STORAGE.ID FROM FILE_STORAGE + INNER JOIN FILE_STORAGE_BASE ON BASE_ID=FILE_STORAGE_BASE.ID + WHERE STATE='DELETE' + AND NODE = ? + LIMIT 1 """)) { + + stmt.setInt(1, node); + var rs = stmt.executeQuery(); if (rs.next()) { return Optional.of(getStorage(new FileStorageId(rs.getLong(1)))); @@ -106,9 +113,16 @@ public class FileStorageService { try (var conn = dataSource.getConnection(); var stmt = conn.prepareStatement(""" - SELECT PATH FROM FILE_STORAGE WHERE BASE_ID = ? + SELECT FILE_STORAGE.PATH + FROM FILE_STORAGE INNER JOIN FILE_STORAGE_BASE + ON BASE_ID = FILE_STORAGE_BASE.ID + WHERE BASE_ID = ? + AND NODE = ? """)) { + stmt.setLong(1, base.id().id()); + stmt.setInt(2, node); + var rs = stmt.executeQuery(); while (rs.next()) { ignoredPaths.add(rs.getString(1)); @@ -494,8 +508,11 @@ public class FileStorageService { var stmt = conn.prepareStatement(""" SELECT PATH, STATE, TYPE, DESCRIPTION, CREATE_DATE, ID, BASE_ID FROM FILE_STORAGE_VIEW + WHERE NODE=? """)) { + stmt.setInt(1, node); + long storageId; long baseId; String path; @@ -510,7 +527,15 @@ public class FileStorageService { storageId = rs.getLong("ID"); path = rs.getString("PATH"); state = rs.getString("STATE"); - type = FileStorageType.valueOf(rs.getString("TYPE")); + + try { + type = FileStorageType.valueOf(rs.getString("TYPE")); + } + catch (IllegalArgumentException ex) { + logger.warn("Illegal file storage type {} in db", rs.getString("TYPE")); + continue; + } + description = rs.getString("DESCRIPTION"); createDateTime = rs.getTimestamp("CREATE_DATE").toLocalDateTime(); var base = getStorageBase(new FileStorageBaseId(baseId)); diff --git a/code/common/db/src/main/resources/db/migration/V23_11_0_006__clean_stores.sql b/code/common/db/src/main/resources/db/migration/V23_11_0_006__clean_stores.sql new file mode 100644 index 00000000..07277366 --- /dev/null +++ b/code/common/db/src/main/resources/db/migration/V23_11_0_006__clean_stores.sql @@ -0,0 +1 @@ +DELETE FROM FILE_STORAGE WHERE TYPE IN ('INDEX_STAGING', 'INDEX_LIVE', 'SEARCH_SETS', 'LINKDB_LIVE', 'LINKDB_STAGING'); \ No newline at end of file diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java index 20094ea9..bd4bc790 100644 --- a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java @@ -10,16 +10,16 @@ import nu.marginalia.client.exception.LocalException; import nu.marginalia.client.exception.NetworkException; import nu.marginalia.client.exception.RemoteException; import nu.marginalia.client.exception.RouteNotConfiguredException; -import nu.marginalia.client.model.ClientRoute; +import nu.marginalia.client.route.ServiceRoute; +import nu.marginalia.client.route.RouteProvider; +import nu.marginalia.client.route.ServiceRoutes; +import nu.marginalia.service.descriptor.ServiceDescriptor; import okhttp3.*; -import org.apache.http.HttpHost; import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.ConnectException; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -32,7 +32,7 @@ public abstract class AbstractClient implements AutoCloseable { private final OkHttpClient client; private boolean quiet; - private final Map serviceRoutes; + private final ServiceRoutes serviceRoutes; private int timeout; private final LivenessMonitor livenessMonitor = new LivenessMonitor(); @@ -42,18 +42,14 @@ public abstract class AbstractClient implements AutoCloseable { this.timeout = timeout; } - public AbstractClient(ClientRoute route, int timeout, Supplier gsonProvider) { - this(Map.of(0, route), timeout, gsonProvider); + public AbstractClient(ServiceDescriptor service, int timeout, Supplier gsonProvider) { + this(RouteProvider.fromService(service), timeout, gsonProvider); } - public AbstractClient(Map routes, + public AbstractClient(RouteProvider routeProvider, int timeout, Supplier gsonProvider) { - routes.forEach((node, route) -> { - logger.info("Creating client route for {}:{} -> {}:{}", getClass().getSimpleName(), node, route.host(), route.port()); - }); - this.gson = gsonProvider.get(); this.timeout = timeout; @@ -64,11 +60,7 @@ public abstract class AbstractClient implements AutoCloseable { .followRedirects(true) .build(); - serviceRoutes = new HashMap<>(routes.size()); - - routes.forEach((node, client) -> - serviceRoutes.put(node, new HttpHost(client.host(), client.port()).toURI()) - ); + serviceRoutes = new ServiceRoutes(routeProvider); RxJavaPlugins.setErrorHandler(e -> { if (e.getMessage() == null) { @@ -96,10 +88,10 @@ public abstract class AbstractClient implements AutoCloseable { for (; ; ) { boolean allAlive = true; try { - for (int node : serviceRoutes.keySet()) { + for (int node : serviceRoutes.getNodes()) { boolean isResponsive = isResponsive(node); alivenessMap.put(node, isResponsive); - allAlive &= !isResponsive; + allAlive &= isResponsive; } } // @@ -121,13 +113,14 @@ public abstract class AbstractClient implements AutoCloseable { } public boolean isAlive(int node) { - return alivenessMap.getOrDefault(node, false); + // compute-if-absence ensures we do a synchronous status check if this is a cold start, + // that way we don't have to wait for the polling loop to find out if the service is up + return alivenessMap.computeIfAbsent(node, this::isResponsive); } public synchronized boolean isResponsive(int node) { Context ctx = Context.internal("ping"); var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + "/internal/ping").get().build(); - return Observable.just(client.newCall(req)) .subscribeOn(scheduler().get()) .map(Call::execute) @@ -201,7 +194,7 @@ public abstract class AbstractClient implements AutoCloseable { @SneakyThrows protected synchronized Observable post(Context ctx, int node, String endpoint, GeneratedMessageV3 data) { - ensureAlive(0); + ensureAlive(node); RequestBody body = RequestBody.create(data.toByteArray(), MediaType.parse("application/protobuf")); @@ -225,7 +218,7 @@ public abstract class AbstractClient implements AutoCloseable { @SneakyThrows protected synchronized Observable postGet(Context ctx, int node, String endpoint, Object data, Class returnType) { - ensureAlive(0); + ensureAlive(node); RequestBody body = RequestBody.create(json(data), MediaType.parse("application/json")); var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).post(body).build(); @@ -243,7 +236,7 @@ public abstract class AbstractClient implements AutoCloseable { } protected synchronized Observable post(Context ctx, int node, String endpoint, String data, MediaType mediaType) { - ensureAlive(0); + ensureAlive(node); var body = RequestBody.create(data, mediaType); @@ -269,7 +262,7 @@ public abstract class AbstractClient implements AutoCloseable { } protected synchronized Observable get(Context ctx, int node, String endpoint, Class type) { - ensureAlive(0); + ensureAlive(node); var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).get().build(); @@ -287,7 +280,7 @@ public abstract class AbstractClient implements AutoCloseable { @SuppressWarnings("unchecked") protected synchronized Observable get(Context ctx, int node, String endpoint) { - ensureAlive(0); + ensureAlive(node); var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).get().build(); @@ -304,7 +297,7 @@ public abstract class AbstractClient implements AutoCloseable { } protected synchronized Observable delete(Context ctx, int node, String endpoint) { - ensureAlive(0); + ensureAlive(node); var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).delete().build(); @@ -335,10 +328,11 @@ public abstract class AbstractClient implements AutoCloseable { @SneakyThrows private void ensureAlive(int node) { if (!isAlive(node)) { - wait(2000); - if (!isAlive(node)) { - throw new RouteNotConfiguredException("Route not configured for " + name() + " -- tried " + serviceRoutes.get(node)); - } + var route = serviceRoutes.get(node); + + logger.error("Route not configured for {}:{}; {}; {}", name(), node, livenessMonitor.alivenessMap, serviceRoutes.getNodes() + .stream().map(serviceRoutes::get).toList()); + throw new RouteNotConfiguredException("Route not configured for " + name() + ":" + node + " -- tried " + route); } } diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java index c3d390b6..5bdf453a 100644 --- a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java @@ -1,25 +1,20 @@ package nu.marginalia.client; import com.google.gson.Gson; -import io.reactivex.rxjava3.core.Observable; -import lombok.SneakyThrows; -import nu.marginalia.client.model.ClientRoute; +import nu.marginalia.client.route.RouteProvider; +import nu.marginalia.client.route.ServiceRoute; import nu.marginalia.service.descriptor.ServiceDescriptor; -import nu.marginalia.service.descriptor.HostsFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.util.function.Supplier; public class AbstractDynamicClient extends AbstractClient { private final ServiceDescriptor service; - private final Logger logger = LoggerFactory.getLogger(getClass()); private final AbortingScheduler scheduler; - public AbstractDynamicClient(@Nonnull ServiceDescriptor service, HostsFile hosts, Supplier gsonProvider) { + public AbstractDynamicClient(@Nonnull ServiceDescriptor service, Supplier gsonProvider) { super( - new ClientRoute(hosts.getHost(service), service.port), + service, 10, gsonProvider ); diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/model/ClientRoute.java b/code/common/service-client/src/main/java/nu/marginalia/client/model/ClientRoute.java deleted file mode 100644 index 84d6abff..00000000 --- a/code/common/service-client/src/main/java/nu/marginalia/client/model/ClientRoute.java +++ /dev/null @@ -1,4 +0,0 @@ -package nu.marginalia.client.model; - -public record ClientRoute(String host, int port) { -} diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/route/RouteProvider.java b/code/common/service-client/src/main/java/nu/marginalia/client/route/RouteProvider.java new file mode 100644 index 00000000..0f28fb9d --- /dev/null +++ b/code/common/service-client/src/main/java/nu/marginalia/client/route/RouteProvider.java @@ -0,0 +1,11 @@ +package nu.marginalia.client.route; + +import nu.marginalia.service.descriptor.ServiceDescriptor; + +public interface RouteProvider { + ServiceRoute findRoute(int node); + + static RouteProvider fromService(ServiceDescriptor serviceDescriptor) { + return (n) -> new ServiceRoute(serviceDescriptor.getHostName(n), 80); + } +} diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/route/ServiceRoute.java b/code/common/service-client/src/main/java/nu/marginalia/client/route/ServiceRoute.java new file mode 100644 index 00000000..c9c6ffab --- /dev/null +++ b/code/common/service-client/src/main/java/nu/marginalia/client/route/ServiceRoute.java @@ -0,0 +1,12 @@ +package nu.marginalia.client.route; + +import org.apache.http.HttpHost; + +public record ServiceRoute(String hostname, int port) { + public String toString() { + if (port == 80) { + return "http://" + hostname; + } + return new HttpHost(hostname(), port()).toURI(); + } +} diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/route/ServiceRoutes.java b/code/common/service-client/src/main/java/nu/marginalia/client/route/ServiceRoutes.java new file mode 100644 index 00000000..6026bc16 --- /dev/null +++ b/code/common/service-client/src/main/java/nu/marginalia/client/route/ServiceRoutes.java @@ -0,0 +1,22 @@ +package nu.marginalia.client.route; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class ServiceRoutes { + private final ConcurrentHashMap knownRoutes = new ConcurrentHashMap<>(); + private final RouteProvider provider; + + public ServiceRoutes(RouteProvider provider) { + this.provider = provider; + } + + public ServiceRoute get(int node) { + return knownRoutes.computeIfAbsent(node, provider::findRoute); + } + + public List getNodes() { + return new ArrayList<>(knownRoutes.keySet()); + } +} diff --git a/code/common/service-client/src/test/java/nu/marginalia/client/AbstractClientTest.java b/code/common/service-client/src/test/java/nu/marginalia/client/AbstractClientTest.java index 27c3bdec..c4776ec9 100644 --- a/code/common/service-client/src/test/java/nu/marginalia/client/AbstractClientTest.java +++ b/code/common/service-client/src/test/java/nu/marginalia/client/AbstractClientTest.java @@ -5,7 +5,7 @@ import io.reactivex.rxjava3.core.Observable; import lombok.AllArgsConstructor; import lombok.Data; import lombok.SneakyThrows; -import nu.marginalia.client.model.ClientRoute; +import nu.marginalia.client.route.ServiceRoute; import org.junit.jupiter.api.*; import spark.Request; import spark.Response; @@ -34,7 +34,7 @@ public class AbstractClientTest { int port = new Random().nextInt(6000, 10000); testServer = new TestServer(port); - client = new AbstractClient(new ClientRoute("localhost", port), 1, Gson::new) { + client = new AbstractClient(n -> new ServiceRoute("localhost", port), 1, Gson::new) { @Override public AbortingScheduler scheduler() { return new AbortingScheduler(name()); diff --git a/code/common/service-discovery/src/main/java/nu/marginalia/service/SearchServiceDescriptors.java b/code/common/service-discovery/src/main/java/nu/marginalia/service/SearchServiceDescriptors.java index 943b392a..d5f7fb8d 100644 --- a/code/common/service-discovery/src/main/java/nu/marginalia/service/SearchServiceDescriptors.java +++ b/code/common/service-discovery/src/main/java/nu/marginalia/service/SearchServiceDescriptors.java @@ -8,14 +8,14 @@ import java.util.List; public class SearchServiceDescriptors { public static ServiceDescriptors descriptors = new ServiceDescriptors( - List.of(new ServiceDescriptor(ServiceId.Api, 5004), - new ServiceDescriptor(ServiceId.Index, 5021), - new ServiceDescriptor(ServiceId.Query, 5022), - new ServiceDescriptor(ServiceId.Search, 5023), - new ServiceDescriptor(ServiceId.Executor, 5024), - new ServiceDescriptor(ServiceId.Assistant, 5025), - new ServiceDescriptor(ServiceId.Dating, 5070), - new ServiceDescriptor(ServiceId.Explorer, 5071), - new ServiceDescriptor(ServiceId.Control, 5090) + List.of(new ServiceDescriptor(ServiceId.Api), + new ServiceDescriptor(ServiceId.Index), + new ServiceDescriptor(ServiceId.Query), + new ServiceDescriptor(ServiceId.Search), + new ServiceDescriptor(ServiceId.Executor), + new ServiceDescriptor(ServiceId.Assistant), + new ServiceDescriptor(ServiceId.Dating), + new ServiceDescriptor(ServiceId.Explorer), + new ServiceDescriptor(ServiceId.Control) )); } diff --git a/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/HostsFile.java b/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/HostsFile.java deleted file mode 100644 index ef46749b..00000000 --- a/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/HostsFile.java +++ /dev/null @@ -1,45 +0,0 @@ -package nu.marginalia.service.descriptor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; - -/** Mappings file between ServiceDescriptor.name and host - * - * */ -public class HostsFile { - private final Map hostsMap = new HashMap<>(); - private static final Logger logger = LoggerFactory.getLogger(HostsFile.class); - public HostsFile(Path fileName) throws IOException { - var lines = Files.readAllLines(fileName); - for (var line : lines) { - if (line.startsWith("#") || line.isBlank()) { - continue; - } - String[] parts = line.strip().split(" "); - if (parts.length != 2) throw new IllegalArgumentException("Invalid hosts file entry " + line); - String descriptorName = parts[0]; - String hostName = parts[1]; - - try { - hostsMap.put(descriptorName, hostName); - } - catch (IllegalArgumentException ex) { - logger.warn("Hosts file contains entry for unknown service {}", descriptorName); - } - } - } - - public HostsFile() { - } - - public String getHost(ServiceDescriptor sd) { - return hostsMap.getOrDefault(sd.name, sd.name); - } - -} diff --git a/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java b/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java index 287c4e83..29eac14d 100644 --- a/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java +++ b/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java @@ -5,18 +5,24 @@ import nu.marginalia.service.id.ServiceId; public class ServiceDescriptor { public final ServiceId id; public final String name; - public final int port; - public ServiceDescriptor(ServiceId id, int port) { + public ServiceDescriptor(ServiceId id) { this.id = id; this.name = id.name; - this.port = port; } - public ServiceDescriptor(ServiceId id, String host, int port) { + + public ServiceDescriptor(ServiceId id, String host) { this.id = id; this.name = host; - this.port = port; } + + public String getHostName(int node) { + if (node > 0) + return name + "-" + node; + + return name; + } + public String toString() { return name; } diff --git a/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfigurationModule.java b/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfigurationModule.java index 6e2eb77c..5ea41d47 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfigurationModule.java +++ b/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfigurationModule.java @@ -41,7 +41,7 @@ public class ServiceConfigurationModule extends AbstractModule { return Integer.parseInt(port); } - return descriptors.forId(id).port; + return 80; } private int getPrometheusPort() { @@ -51,7 +51,7 @@ public class ServiceConfigurationModule extends AbstractModule { return Integer.parseInt(prometheusPortEnv); } - return descriptors.forId(id).port + 1000; + return 7000; } private int getNode() { diff --git a/code/features-control/actors/src/main/java/nu/marginalia/actor/monitor/FileStorageMonitorActor.java b/code/features-control/actors/src/main/java/nu/marginalia/actor/monitor/FileStorageMonitorActor.java index f2f80d6b..67038918 100644 --- a/code/features-control/actors/src/main/java/nu/marginalia/actor/monitor/FileStorageMonitorActor.java +++ b/code/features-control/actors/src/main/java/nu/marginalia/actor/monitor/FileStorageMonitorActor.java @@ -74,7 +74,7 @@ public class FileStorageMonitorActor extends AbstractActorPrototype { transition(REMOVE_STALE, missing.get().id()); } - fileStorageService.synchronizeStorageManifests(fileStorageService.getStorageBase(FileStorageBaseType.WORK)); + fileStorageService.synchronizeStorageManifests(fileStorageService.getStorageBase(FileStorageBaseType.STORAGE)); TimeUnit.SECONDS.sleep(10); } diff --git a/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java index 263b54ba..0045a53f 100644 --- a/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java +++ b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java @@ -117,8 +117,8 @@ public class IndexConstructorMain { private void createPrioReverseIndex() throws SQLException, IOException { - Path outputFileDocs = ReverseIndexFullFileNames.resolve(IndexLocations.getCurrentIndex(fileStorageService), ReverseIndexFullFileNames.FileIdentifier.DOCS, ReverseIndexFullFileNames.FileVersion.NEXT); - Path outputFileWords = ReverseIndexFullFileNames.resolve(IndexLocations.getCurrentIndex(fileStorageService), ReverseIndexFullFileNames.FileIdentifier.WORDS, ReverseIndexFullFileNames.FileVersion.NEXT); + Path outputFileDocs = ReverseIndexPrioFileNames.resolve(IndexLocations.getCurrentIndex(fileStorageService), ReverseIndexPrioFileNames.FileIdentifier.DOCS, ReverseIndexPrioFileNames.FileVersion.NEXT); + Path outputFileWords = ReverseIndexPrioFileNames.resolve(IndexLocations.getCurrentIndex(fileStorageService), ReverseIndexPrioFileNames.FileIdentifier.WORDS, ReverseIndexPrioFileNames.FileVersion.NEXT); Path workDir = IndexLocations.getIndexConstructionArea(fileStorageService); Path tmpDir = workDir.resolve("tmp"); diff --git a/code/services-core/control-service/readme.md b/code/services-core/control-service/readme.md index dbe3561b..df5e2c5a 100644 --- a/code/services-core/control-service/readme.md +++ b/code/services-core/control-service/readme.md @@ -1,16 +1,21 @@ # Control Service -The control service provides an operator's user interface, and is responsible for orchestrating the various -processes of the system using Actors. +The control service provides an operator's user interface. By default this interface is +exposed on port 8081. It does not offer any sort of access control or authentication. -Actors within the control service will spawn processes when necessary, by -monitoring their message queue inboxes. +The control service will itself execute tasks that affect the entire system, but delegate +node-specific tasks to the corresponding [executor-service](../executor-service) via the +[executor-api](../../api/executor-api). + +Conceptually the application is broken into three parts: + +* Application specific tasks relate to the high level abstractions such as blacklisting and API keys +* System tasks relate to low level abstractions such as the message queue and event log. +* Node tasks relate to index node specific tasks, such as crawling and indexing. ## Central Classes * [ControlService](src/main/java/nu/marginalia/control/ControlService.java) -* [ControlActors](src/main/java/nu/marginalia/control/actor/ControlActors.java) - Class responsible for Actors' lifecycle -* [ProcessService](src/main/java/nu/marginalia/control/process/ProcessService.java) - Class responsible for spawning Processes ## See Also diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java index 3623a873..7a62ad3e 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -157,26 +157,6 @@ public class ControlService extends Service { "events", eventLogService.getLastEntries(Long.MAX_VALUE, 20)); } - private Object processesModel(Request request, Response response) { - var processes = heartbeatService.getProcessHeartbeats(); - var jobs = heartbeatService.getTaskHeartbeats(); - - return Map.of("processes", processes, - "jobs", jobs, - "actors", controlActorService.getActorStates(request), - "messages", messageQueueService.getLastEntries(20)); - } - -// private Object actorDetailsModel(Request request, Response response) { -// final Actor actor = Actor.valueOf(request.params("fsm").toUpperCase()); -// final String inbox = actor.id(); -// -// return Map.of( -// "actor", actor, -// "state-graph", controlActorService.getActorStateGraph(actor), -// "messages", messageQueueService.getLastEntriesForInbox(inbox, 20)); -// } - private Object serveStatic(Request request, Response response) { String resource = request.params("resource"); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/Redirects.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/Redirects.java index 864edc1a..ce30b108 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/Redirects.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/Redirects.java @@ -3,7 +3,6 @@ package nu.marginalia.control; import spark.ResponseTransformer; public class Redirects { - public static final HtmlRedirect redirectToServices = new HtmlRedirect("/services"); public static final HtmlRedirect redirectToActors = new HtmlRedirect("/actors"); public static final HtmlRedirect redirectToApiKeys = new HtmlRedirect("/api-keys"); public static final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage"); @@ -15,7 +14,7 @@ public class Redirects { private final String html; /** Because Spark doesn't have a redirect method that works with relative URLs - * (without explicitly providing the external address),we use HTML and let the + * (without explicitly providing the external address), we use HTML and let the * browser resolve the relative redirect instead */ public HtmlRedirect(String destination) { this.html = """ diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/ActorState.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/ActorState.java deleted file mode 100644 index b04908f4..00000000 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/ActorState.java +++ /dev/null @@ -1,17 +0,0 @@ -package nu.marginalia.control.node.model; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Stream; - -public record ActorState(String name, - boolean current, - List transitions, - String description) { - public ActorState(nu.marginalia.actor.state.ActorState gs, boolean current) { - this(gs.name(), current, toTransitions(gs.next(), gs.transitions()), gs.description()); - } - private static List toTransitions(String next, String[] transitions) { - return Stream.concat(Stream.of(next), Arrays.stream(transitions)).distinct().toList(); - } -} diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/ActorStateGraph.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/ActorStateGraph.java deleted file mode 100644 index 99a67512..00000000 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/ActorStateGraph.java +++ /dev/null @@ -1,50 +0,0 @@ -package nu.marginalia.control.node.model; - -import nu.marginalia.actor.prototype.AbstractActorPrototype; -import nu.marginalia.actor.state.ActorState; -import nu.marginalia.actor.state.ActorStateInstance; - -import java.util.*; - -public record ActorStateGraph(String description, List states) { - - public ActorStateGraph(AbstractActorPrototype graph, ActorStateInstance currentState) { - this(graph.describe(), getStateList(graph, currentState)); - } - - private static List getStateList( - AbstractActorPrototype graph, - ActorStateInstance currentState) - { - Map declaredStates = graph.declaredStates(); - Set seenStates = new HashSet<>(declaredStates.size()); - LinkedList edge = new LinkedList<>(); - - List statesList = new ArrayList<>(declaredStates.size()); - - edge.add(declaredStates.get("INITIAL")); - - while (!edge.isEmpty()) { - var first = edge.removeFirst(); - if (first == null || !seenStates.add(first)) { - continue; - } - statesList.add(new nu.marginalia.control.node.model.ActorState(first, currentState.name().equals(first.name()))); - - edge.add(declaredStates.get(first.next())); - - for (var transition : first.transitions()) { - edge.add(declaredStates.get(transition)); - } - } - - if (!declaredStates.containsKey("ERROR")) { - statesList.add(new nu.marginalia.control.node.model.ActorState("ERROR", currentState.name().equals("ERROR"), List.of(), "Terminal error state")); - } - if (!declaredStates.containsKey("END")) { - statesList.add(new nu.marginalia.control.node.model.ActorState("END", currentState.name().equals("END"), List.of(), "The machine terminated successfully")); - } - - return statesList; - } -} diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/IndexNodeStatus.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/IndexNodeStatus.java index b413808f..1573173f 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/IndexNodeStatus.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/model/IndexNodeStatus.java @@ -1,4 +1,9 @@ package nu.marginalia.control.node.model; -public record IndexNodeStatus(IndexNode node, boolean indexServiceOnline, boolean executorServiceOnline) { +import nu.marginalia.nodecfg.model.NodeConfiguration; + +public record IndexNodeStatus(NodeConfiguration configuration, boolean indexServiceOnline, boolean executorServiceOnline) { + public int id() { + return configuration.node(); + } } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlFileStorageService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlFileStorageService.java index 4bb38978..37a44250 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlFileStorageService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlFileStorageService.java @@ -19,7 +19,7 @@ import java.sql.SQLException; @Singleton public class ControlFileStorageService { private final FileStorageService fileStorageService; - private Logger logger = LoggerFactory.getLogger(getClass()); + private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject public ControlFileStorageService( FileStorageService fileStorageService) diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java index ecc68392..74889b65 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java @@ -127,7 +127,7 @@ public class ControlNodeService { "nextNodeId", nextId); } - private Object triggerCrawl(Request request, Response response) throws Exception { + private Object triggerCrawl(Request request, Response response) { int nodeId = Integer.parseInt(request.params("id")); executorClient.triggerCrawl(Context.fromRequest(request), nodeId, request.params("fid")); @@ -135,7 +135,7 @@ public class ControlNodeService { return redirectToOverview(request); } - private Object triggerRestoreBackup(Request request, Response response) throws Exception { + private Object triggerRestoreBackup(Request request, Response response) { int nodeId = Integer.parseInt(request.params("id")); executorClient.restoreBackup(Context.fromRequest(request), nodeId, request.params("fid")); @@ -362,9 +362,10 @@ public class ControlNodeService { private Object nodeOverviewModel(Request request, Response response) throws SQLException { int nodeId = Integer.parseInt(request.params("id")); + var config = nodeConfigurationService.get(nodeId); return Map.of( "node", new IndexNode(nodeId), - "status", getStatus(new IndexNode(nodeId)), + "status", getStatus(config), "events", getEvents(nodeId), "processes", heartbeatService.getProcessHeartbeatsForNode(nodeId), "jobs", heartbeatService.getTaskHeartbeatsForNode(nodeId) @@ -394,29 +395,21 @@ public class ControlNodeService { return events; } - public List getConfiguredNodes() { - return fileStorageService - .getConfiguredNodes() - .stream() - .sorted() - .map(IndexNode::new) - .toList(); - } - + @SneakyThrows public List getNodeStatusList() { - return fileStorageService - .getConfiguredNodes() + return nodeConfigurationService + .getAll() .stream() - .sorted() - .map(IndexNode::new) + .sorted(Comparator.comparing(NodeConfiguration::node)) .map(this::getStatus) .toList(); } - IndexNodeStatus getStatus(IndexNode node) { - return new IndexNodeStatus(node, - monitors.isServiceUp(ServiceId.Index, node.id()), - monitors.isServiceUp(ServiceId.Executor, node.id()) + @SneakyThrows + public IndexNodeStatus getStatus(NodeConfiguration config) { + return new IndexNodeStatus(config, + monitors.isServiceUp(ServiceId.Index, config.node()), + monitors.isServiceUp(ServiceId.Executor, config.node()) ); } diff --git a/code/services-core/control-service/src/main/resources/templates/control/partials/nodes-table.hdb b/code/services-core/control-service/src/main/resources/templates/control/partials/nodes-table.hdb index 95d424ca..8a0d4e68 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/partials/nodes-table.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/partials/nodes-table.hdb @@ -2,14 +2,23 @@

Nodes

- + {{#each .}} + + - {{#if indexServiceOnline}}{{/if}} {{#unless indexServiceOnline}}{{/unless}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/partials/storage-table.hdb b/code/services-core/control-service/src/main/resources/templates/control/partials/storage-table.hdb deleted file mode 100644 index e69de29b..00000000 diff --git a/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java b/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java index 1482c114..45729cc3 100644 --- a/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java +++ b/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java @@ -186,7 +186,7 @@ class TestModule extends AbstractModule { @Provides public ServiceDescriptors getServiceDescriptors() { return new ServiceDescriptors( - List.of(new ServiceDescriptor(ServiceId.Executor, "127.0.0.1", ExecutorSvcApiIntegrationTest.port)) + List.of(new ServiceDescriptor(ServiceId.Executor, "127.0.0.1")) ); } diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java index 3b56b8f3..00a220fe 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java @@ -8,8 +8,10 @@ import nu.marginalia.index.client.IndexClient; import nu.marginalia.index.client.model.query.SearchSpecification; import nu.marginalia.index.client.model.results.DecoratedSearchResultItem; import nu.marginalia.index.client.model.results.SearchResultSet; +import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.query.model.QueryParams; import nu.marginalia.query.model.QueryResponse; +import nu.marginalia.query.svc.NodeConfigurationWatcher; import nu.marginalia.query.svc.QueryFactory; import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.Service; @@ -17,24 +19,31 @@ import spark.Request; import spark.Response; import spark.Spark; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; public class QueryService extends Service { private final IndexClient indexClient; + private final NodeConfigurationWatcher nodeWatcher; private final Gson gson; private final DomainBlacklist blacklist; private final QueryFactory queryFactory; + private volatile List nodes = new ArrayList<>(); + @Inject public QueryService(BaseServiceParams params, IndexClient indexClient, + NodeConfigurationWatcher nodeWatcher, Gson gson, DomainBlacklist blacklist, QueryFactory queryFactory) { super(params); this.indexClient = indexClient; + this.nodeWatcher = nodeWatcher; this.gson = gson; this.blacklist = blacklist; this.queryFactory = queryFactory; @@ -73,7 +82,9 @@ public class QueryService extends Service { } private SearchResultSet executeQuery(Context ctx, SearchSpecification query) { - return indexClient.query(ctx, 0, query); + var nodes = nodeWatcher.getQueryNodes(); + + return indexClient.query(ctx, nodes, query); } private boolean isBlacklisted(DecoratedSearchResultItem item) { diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/svc/NodeConfigurationWatcher.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/svc/NodeConfigurationWatcher.java new file mode 100644 index 00000000..9422f3b7 --- /dev/null +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/svc/NodeConfigurationWatcher.java @@ -0,0 +1,53 @@ +package nu.marginalia.query.svc; + +import com.google.inject.Inject; +import lombok.SneakyThrows; +import nu.marginalia.nodecfg.NodeConfigurationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class NodeConfigurationWatcher { + private static final Logger logger = LoggerFactory.getLogger(NodeConfigurationWatcher.class); + + private volatile List queryNodes = new ArrayList<>(); + private final NodeConfigurationService configurationService; + + @Inject + public NodeConfigurationWatcher(NodeConfigurationService configurationService) { + this.configurationService = configurationService; + + var watcherThread = new Thread(this::pollConfiguration, "Node Configuration Watcher"); + watcherThread.setDaemon(true); + watcherThread.start(); + } + + @SneakyThrows + private void pollConfiguration() { + for (;;) { + List goodNodes = new ArrayList<>(); + try { + for (var cfg : configurationService.getAll()) { + + if (!cfg.disabled() && cfg.acceptQueries()) { + goodNodes.add(cfg.node()); + } + } + queryNodes = goodNodes; + } + catch (SQLException ex) { + logger.warn("Failed to update node configurations", ex); + } + + TimeUnit.SECONDS.sleep(10); + } + } + + public List getQueryNodes() { + return queryNodes; + } +} diff --git a/docker-compose.yml b/docker-compose.yml index 08bd8eb8..428f60f3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,113 +2,74 @@ x-svc: &service env_file: - "run/env/service.env" volumes: - - vol:/vol - - backup:/backup - conf:/wmsa/conf:ro - model:/wmsa/model - data:/wmsa/data - - dist:/dist - - samples:/samples - logs:/var/log/wmsa + - dist:/dist networks: - wmsa depends_on: - mariadb +x-p1: &partition-1 + env_file: + - "run/env/service.env" + volumes: + - conf:/wmsa/conf:ro + - model:/wmsa/model + - data:/wmsa/data + - logs:/var/log/wmsa + - dist:/dist + - index:/idx + - work:/work + - backup:/backup + - samples:/storage + networks: + - wmsa + depends_on: + - mariadb + environment: + - "WMSA_SERVICE_NODE=1" + services: - index-service: - <<: *service + index-service-1: + <<: *partition-1 image: "marginalia.nu/index-service" - container_name: "index-service" - ports: - - "127.0.0.1:5021:5021/tcp" - - "127.0.0.1:4021:5000" - - "127.0.0.1:7021:4000" - environment: - - "WMSA_SERVICE_NODE=0" + container_name: "index-service-1" + executor-service-1: + <<: *partition-1 + image: "marginalia.nu/executor-service" + container_name: "executor-service-1" search-service: <<: *service image: "marginalia.nu/search-service" container_name: "search-service" - ports: - - "127.0.0.1:5023:5023" - - "127.0.0.1:4023:5000" - - "127.0.0.1:7023:4000" - depends_on: - - index-service assistant-service: <<: *service image: "marginalia.nu/assistant-service" container_name: "assistant-service" - ports: - - "127.0.0.1:5025:5025" - - "127.0.0.1:4025:5000" - - "127.0.0.1:7025:4000" - depends_on: - - mariadb api-service: <<: *service image: "marginalia.nu/api-service" container_name: "api-service" - ports: - - "127.0.0.1:5004:5004" - - "127.0.0.1:4004:5000" - - "127.0.0.1:7004:4000" - depends_on: - - mariadb query-service: <<: *service image: "marginalia.nu/query-service" container_name: "query-service" - ports: - - "127.0.0.1:5022:5022" - - "127.0.0.1:4022:5000" - - "127.0.0.1:7022:4000" - depends_on: - - mariadb - executor-service: - <<: *service - image: "marginalia.nu/executor-service" - container_name: "executor-service" - ports: - - "127.0.0.1:5024:5024" - - "127.0.0.1:4024:5000" - - "127.0.0.1:7024:4000" - depends_on: - - mariadb - environment: - - "WMSA_SERVICE_NODE=0" dating-service: <<: *service image: "marginalia.nu/dating-service" container_name: "dating-service" - ports: - - "127.0.0.1:5070:5070" - - "127.0.0.1:4070:5000" - - "127.0.0.1:7070:4000" - depends_on: - - mariadb explorer-service: <<: *service image: "marginalia.nu/explorer-service" container_name: "explorer-service" - ports: - - "127.0.0.1:5071:5071" - - "127.0.0.1:4071:5000" - - "127.0.0.1:7071:4000" - depends_on: - - mariadb control-service: <<: *service image: "marginalia.nu/control-service" container_name: "control-service" - ports: - - "127.0.0.1:5090:5090" - - "127.0.0.1:4090:5000" - - "127.0.0.1:7090:4000" - - "127.0.0.1:7099:4001" - depends_on: - - mariadb + mariadb: image: "mariadb:lts" container_name: "mariadb" @@ -143,12 +104,18 @@ volumes: type: none o: bind device: run/db - vol: + index: driver: local driver_opts: type: none o: bind - device: run/vol + device: run/index + work: + driver: local + driver_opts: + type: none + o: bind + device: run/work backup: driver: local driver_opts: diff --git a/run/env/service.env b/run/env/service.env index d8057488..54184bdd 100644 --- a/run/env/service.env +++ b/run/env/service.env @@ -1,6 +1,6 @@ WMSA_HOME=run/ EXECUTOR_SERVICE_OPTS="-DdistPath=/dist" -CONVERTER_PROCESS_OPTS="-ea -Dservice-host=0.0.0.0 -ea -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=4001 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -XX:StartFlightRecording:dumponexit=true,filename=/samples/converter.jfr" -CRAWLER_PROCESS_OPTS="-Dservice-host=0.0.0.0 -ea -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=4001 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -XX:StartFlightRecording:dumponexit=true,filename=/samples/crawler.jfr" -LOADER_PROCESS_OPTS="-Dservice-host=0.0.0.0 -ea -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=4001 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -XX:StartFlightRecording:dumponexit=true,filename=/samples/loader.jfr" -INDEX_CONSTRUCTION_PROCESS_OPTS="-ea -Djava.util.concurrent.ForkJoinPool.common.parallelism=4" +CONVERTER_PROCESS_OPTS="-Dservice-host=0.0.0.0" +CRAWLER_PROCESS_OPTS="-Dservice-host=0.0.0.0" +LOADER_PROCESS_OPTS="-Dservice-host=0.0.0.0" +INDEX_CONSTRUCTION_PROCESS_OPTS="-Djava.util.concurrent.ForkJoinPool.common.parallelism=4" diff --git a/run/nginx-site.conf b/run/nginx-site.conf index a1762990..8b9054c5 100644 --- a/run/nginx-site.conf +++ b/run/nginx-site.conf @@ -15,35 +15,26 @@ server { rewrite ^/links/(.*)$ /search?query=links:$1&profile=corpo; location /screenshot { - proxy_pass http://assistant-service:5025/public/screenshot; + proxy_pass http://assistant-service/public/screenshot; } location /site-search { - proxy_pass http://search-service:5023/public/site-search; + proxy_pass http://search-service/public/site-search; } location /site/suggest { - proxy_pass http://search-service:5023/public/site/suggest; + proxy_pass http://search-service/public/site/suggest; } location /site/flag-site { - proxy_pass http://search-service:5023/public/site/flag-site; + proxy_pass http://search-service/public/site/flag-site; } location /site/ { rewrite ^/site/(.*)$ /search?query=site:$1&profile=yolo; } - location /debug/wordmeta { - proxy_pass http://index-service:5021/public/debug/wordmeta; - } - location /debug/docmeta { - proxy_pass http://index-service:5021/public/debug/docmeta; - } - location /debug/word { - proxy_pass http://index-service:5021/public/debug/word; - } location /suggest/ { - proxy_pass http://assistant-service:5025/public$request_uri; + proxy_pass http://assistant-service/public$request_uri; access_log off; } location / { - proxy_pass http://search-service:5023/public/; + proxy_pass http://search-service/public/; } } @@ -62,7 +53,7 @@ server { proxy_set_header X-Public "1"; location / { - proxy_pass http://control-service:5090/public/; + proxy_pass http://control-service/public/; access_log off; } @@ -81,7 +72,7 @@ server { proxy_set_header X-Public "1"; location / { - proxy_pass http://api-service:5004/public/; + proxy_pass http://api-service/public/; access_log off; } diff --git a/run/setup.sh b/run/setup.sh index 7094f1b4..cacb311b 100755 --- a/run/setup.sh +++ b/run/setup.sh @@ -18,7 +18,7 @@ function download_model { pushd $(dirname $0) -mkdir -p model logs db samples backup install vol/{ir,iw} vol/{lr,lw} vol/ss vol/{ldbw,ldbr} data samples/export +mkdir -p model logs db samples backup install work index data samples/export download_model model/English.DICT https://raw.githubusercontent.com/datquocnguyen/RDRPOSTagger/master/Models/POS/English.DICT download_model model/English.RDR https://raw.githubusercontent.com/datquocnguyen/RDRPOSTagger/master/Models/POS/English.RDR
NodeIndexExecutorNodeQueriesEnabledIndexExecutor
- node-{{node.id}} + node-{{id}} + + {{#if configuration.acceptQueries}} + ✓ + {{/if}} + + {{#unless configuration.disabled}} + ✓ + {{/unless}} OnlineOffline