(*) Get multi-node routing working.

This commit is contained in:
Viktor Lofgren 2023-10-15 18:38:30 +02:00
parent eacbf87979
commit 16e0738731
38 changed files with 322 additions and 348 deletions

View File

@ -21,7 +21,7 @@ public class AssistantClient extends AbstractDynamicClient {
@Inject
public AssistantClient(ServiceDescriptors descriptors) {
super(descriptors.forId(ServiceId.Assistant), WmsaHome.getHostsFile(), GsonFactory::get);
super(descriptors.forId(ServiceId.Assistant), GsonFactory::get);
}
public Observable<DictionaryResponse> dictionaryLookup(Context ctx, String word) {

View File

@ -4,6 +4,9 @@ import com.google.inject.Inject;
import nu.marginalia.WmsaHome;
import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.client.Context;
import nu.marginalia.client.route.RouteProvider;
import nu.marginalia.client.route.ServiceRoute;
import nu.marginalia.service.descriptor.ServiceDescriptor;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.executor.model.ActorRunStates;
import nu.marginalia.executor.model.crawl.RecrawlParameters;
@ -20,7 +23,7 @@ import java.util.List;
public class ExecutorClient extends AbstractDynamicClient {
@Inject
public ExecutorClient(ServiceDescriptors descriptors) {
super(descriptors.forId(ServiceId.Executor), WmsaHome.getHostsFile(), GsonFactory::get);
super(descriptors.forId(ServiceId.Executor), GsonFactory::get);
}
public void startFsm(Context ctx, int node, String actorName) {

View File

@ -5,7 +5,7 @@ import com.google.inject.Singleton;
import com.google.inject.name.Named;
import io.prometheus.client.Summary;
import io.reactivex.rxjava3.core.Observable;
import nu.marginalia.WmsaHome;
import io.reactivex.rxjava3.schedulers.Schedulers;
import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.client.Context;
import nu.marginalia.index.client.model.query.SearchSpecification;
@ -15,6 +15,7 @@ import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import java.util.List;
import javax.annotation.CheckReturnValue;
import java.util.UUID;
@ -31,7 +32,7 @@ public class IndexClient extends AbstractDynamicClient {
MessageQueueFactory messageQueueFactory,
@Named("wmsa-system-node") Integer nodeId)
{
super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get);
super(descriptors.forId(ServiceId.Index), GsonFactory::get);
String inboxName = ServiceId.Index.name;
String outboxName = System.getProperty("service-name:"+nodeId, UUID.randomUUID().toString());
@ -50,6 +51,15 @@ public class IndexClient extends AbstractDynamicClient {
);
}
@CheckReturnValue
public SearchResultSet query(Context ctx, List<Integer> nodes, SearchSpecification specs) {
return Observable.fromIterable(nodes)
.subscribeOn(Schedulers.io())
.concatMap(node -> this.postGet(ctx, node,"/search/", specs, SearchResultSet.class))
.reduce(SearchResultSet::combine)
.blockingGet();
}
@CheckReturnValue
public Observable<Boolean> isBlocked(Context ctx, int node) {

View File

@ -4,6 +4,8 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@AllArgsConstructor @Getter @ToString
@ -12,4 +14,15 @@ public class SearchResultSet {
public int size() {
return results.size();
}
public static SearchResultSet combine(SearchResultSet l, SearchResultSet r) {
List<DecoratedSearchResultItem> combinedItems = new ArrayList<>(l.size() + r.size());
combinedItems.addAll(l.results);
combinedItems.addAll(r.results);
// TODO: Do we combine these correctly?
combinedItems.sort(Comparator.comparing(item -> item.rankingScore));
return new SearchResultSet(combinedItems);
}
}

View File

@ -30,10 +30,9 @@ public class QueryClient extends AbstractDynamicClient {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject
public QueryClient(ServiceDescriptors descriptors,
MessageQueueFactory messageQueueFactory) {
public QueryClient(ServiceDescriptors descriptors) {
super(descriptors.forId(ServiceId.Query), WmsaHome.getHostsFile(), GsonFactory::get);
super(descriptors.forId(ServiceId.Query), GsonFactory::get);
}
/** Delegate an Index API style query directly to the index service */

View File

@ -2,7 +2,6 @@ package nu.marginalia;
import nu.marginalia.service.ServiceHomeNotConfiguredException;
import nu.marginalia.service.descriptor.HostsFile;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -56,19 +55,6 @@ public class WmsaHome {
.toString();
}
public static HostsFile getHostsFile() {
Path hostsFile = getHomePath().resolve("conf/hosts");
if (Files.isRegularFile(hostsFile)) {
try {
return new HostsFile(hostsFile);
} catch (IOException e) {
throw new RuntimeException("Failed to load hosts file " + hostsFile, e);
}
}
else {
return new HostsFile();
}
}
public static Path getAdsDefinition() {
return getHomePath().resolve("data").resolve("adblock.txt");

View File

@ -33,8 +33,15 @@ public class FileStorageService {
public Optional<FileStorage> findFileStorageToDelete() {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT ID FROM FILE_STORAGE WHERE STATE='DELETE' LIMIT 1
SELECT FILE_STORAGE.ID FROM FILE_STORAGE
INNER JOIN FILE_STORAGE_BASE ON BASE_ID=FILE_STORAGE_BASE.ID
WHERE STATE='DELETE'
AND NODE = ?
LIMIT 1
""")) {
stmt.setInt(1, node);
var rs = stmt.executeQuery();
if (rs.next()) {
return Optional.of(getStorage(new FileStorageId(rs.getLong(1))));
@ -106,9 +113,16 @@ public class FileStorageService {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT PATH FROM FILE_STORAGE WHERE BASE_ID = ?
SELECT FILE_STORAGE.PATH
FROM FILE_STORAGE INNER JOIN FILE_STORAGE_BASE
ON BASE_ID = FILE_STORAGE_BASE.ID
WHERE BASE_ID = ?
AND NODE = ?
""")) {
stmt.setLong(1, base.id().id());
stmt.setInt(2, node);
var rs = stmt.executeQuery();
while (rs.next()) {
ignoredPaths.add(rs.getString(1));
@ -494,8 +508,11 @@ public class FileStorageService {
var stmt = conn.prepareStatement("""
SELECT PATH, STATE, TYPE, DESCRIPTION, CREATE_DATE, ID, BASE_ID
FROM FILE_STORAGE_VIEW
WHERE NODE=?
""")) {
stmt.setInt(1, node);
long storageId;
long baseId;
String path;
@ -510,7 +527,15 @@ public class FileStorageService {
storageId = rs.getLong("ID");
path = rs.getString("PATH");
state = rs.getString("STATE");
type = FileStorageType.valueOf(rs.getString("TYPE"));
try {
type = FileStorageType.valueOf(rs.getString("TYPE"));
}
catch (IllegalArgumentException ex) {
logger.warn("Illegal file storage type {} in db", rs.getString("TYPE"));
continue;
}
description = rs.getString("DESCRIPTION");
createDateTime = rs.getTimestamp("CREATE_DATE").toLocalDateTime();
var base = getStorageBase(new FileStorageBaseId(baseId));

View File

@ -0,0 +1 @@
DELETE FROM FILE_STORAGE WHERE TYPE IN ('INDEX_STAGING', 'INDEX_LIVE', 'SEARCH_SETS', 'LINKDB_LIVE', 'LINKDB_STAGING');

View File

@ -10,16 +10,16 @@ import nu.marginalia.client.exception.LocalException;
import nu.marginalia.client.exception.NetworkException;
import nu.marginalia.client.exception.RemoteException;
import nu.marginalia.client.exception.RouteNotConfiguredException;
import nu.marginalia.client.model.ClientRoute;
import nu.marginalia.client.route.ServiceRoute;
import nu.marginalia.client.route.RouteProvider;
import nu.marginalia.client.route.ServiceRoutes;
import nu.marginalia.service.descriptor.ServiceDescriptor;
import okhttp3.*;
import org.apache.http.HttpHost;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@ -32,7 +32,7 @@ public abstract class AbstractClient implements AutoCloseable {
private final OkHttpClient client;
private boolean quiet;
private final Map<Integer, String> serviceRoutes;
private final ServiceRoutes serviceRoutes;
private int timeout;
private final LivenessMonitor livenessMonitor = new LivenessMonitor();
@ -42,18 +42,14 @@ public abstract class AbstractClient implements AutoCloseable {
this.timeout = timeout;
}
public AbstractClient(ClientRoute route, int timeout, Supplier<Gson> gsonProvider) {
this(Map.of(0, route), timeout, gsonProvider);
public AbstractClient(ServiceDescriptor service, int timeout, Supplier<Gson> gsonProvider) {
this(RouteProvider.fromService(service), timeout, gsonProvider);
}
public AbstractClient(Map<Integer, ClientRoute> routes,
public AbstractClient(RouteProvider routeProvider,
int timeout,
Supplier<Gson> gsonProvider)
{
routes.forEach((node, route) -> {
logger.info("Creating client route for {}:{} -> {}:{}", getClass().getSimpleName(), node, route.host(), route.port());
});
this.gson = gsonProvider.get();
this.timeout = timeout;
@ -64,11 +60,7 @@ public abstract class AbstractClient implements AutoCloseable {
.followRedirects(true)
.build();
serviceRoutes = new HashMap<>(routes.size());
routes.forEach((node, client) ->
serviceRoutes.put(node, new HttpHost(client.host(), client.port()).toURI())
);
serviceRoutes = new ServiceRoutes(routeProvider);
RxJavaPlugins.setErrorHandler(e -> {
if (e.getMessage() == null) {
@ -96,10 +88,10 @@ public abstract class AbstractClient implements AutoCloseable {
for (; ; ) {
boolean allAlive = true;
try {
for (int node : serviceRoutes.keySet()) {
for (int node : serviceRoutes.getNodes()) {
boolean isResponsive = isResponsive(node);
alivenessMap.put(node, isResponsive);
allAlive &= !isResponsive;
allAlive &= isResponsive;
}
}
//
@ -121,13 +113,14 @@ public abstract class AbstractClient implements AutoCloseable {
}
public boolean isAlive(int node) {
return alivenessMap.getOrDefault(node, false);
// compute-if-absence ensures we do a synchronous status check if this is a cold start,
// that way we don't have to wait for the polling loop to find out if the service is up
return alivenessMap.computeIfAbsent(node, this::isResponsive);
}
public synchronized boolean isResponsive(int node) {
Context ctx = Context.internal("ping");
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + "/internal/ping").get().build();
return Observable.just(client.newCall(req))
.subscribeOn(scheduler().get())
.map(Call::execute)
@ -201,7 +194,7 @@ public abstract class AbstractClient implements AutoCloseable {
@SneakyThrows
protected synchronized Observable<HttpStatusCode> post(Context ctx, int node, String endpoint, GeneratedMessageV3 data) {
ensureAlive(0);
ensureAlive(node);
RequestBody body = RequestBody.create(data.toByteArray(), MediaType.parse("application/protobuf"));
@ -225,7 +218,7 @@ public abstract class AbstractClient implements AutoCloseable {
@SneakyThrows
protected synchronized <T> Observable<T> postGet(Context ctx, int node, String endpoint, Object data, Class<T> returnType) {
ensureAlive(0);
ensureAlive(node);
RequestBody body = RequestBody.create(json(data), MediaType.parse("application/json"));
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).post(body).build();
@ -243,7 +236,7 @@ public abstract class AbstractClient implements AutoCloseable {
}
protected synchronized Observable<HttpStatusCode> post(Context ctx, int node, String endpoint, String data, MediaType mediaType) {
ensureAlive(0);
ensureAlive(node);
var body = RequestBody.create(data, mediaType);
@ -269,7 +262,7 @@ public abstract class AbstractClient implements AutoCloseable {
}
protected synchronized <T> Observable<T> get(Context ctx, int node, String endpoint, Class<T> type) {
ensureAlive(0);
ensureAlive(node);
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).get().build();
@ -287,7 +280,7 @@ public abstract class AbstractClient implements AutoCloseable {
@SuppressWarnings("unchecked")
protected synchronized Observable<String> get(Context ctx, int node, String endpoint) {
ensureAlive(0);
ensureAlive(node);
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).get().build();
@ -304,7 +297,7 @@ public abstract class AbstractClient implements AutoCloseable {
}
protected synchronized Observable<HttpStatusCode> delete(Context ctx, int node, String endpoint) {
ensureAlive(0);
ensureAlive(node);
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).delete().build();
@ -335,10 +328,11 @@ public abstract class AbstractClient implements AutoCloseable {
@SneakyThrows
private void ensureAlive(int node) {
if (!isAlive(node)) {
wait(2000);
if (!isAlive(node)) {
throw new RouteNotConfiguredException("Route not configured for " + name() + " -- tried " + serviceRoutes.get(node));
}
var route = serviceRoutes.get(node);
logger.error("Route not configured for {}:{}; {}; {}", name(), node, livenessMonitor.alivenessMap, serviceRoutes.getNodes()
.stream().map(serviceRoutes::get).toList());
throw new RouteNotConfiguredException("Route not configured for " + name() + ":" + node + " -- tried " + route);
}
}

View File

@ -1,25 +1,20 @@
package nu.marginalia.client;
import com.google.gson.Gson;
import io.reactivex.rxjava3.core.Observable;
import lombok.SneakyThrows;
import nu.marginalia.client.model.ClientRoute;
import nu.marginalia.client.route.RouteProvider;
import nu.marginalia.client.route.ServiceRoute;
import nu.marginalia.service.descriptor.ServiceDescriptor;
import nu.marginalia.service.descriptor.HostsFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.function.Supplier;
public class AbstractDynamicClient extends AbstractClient {
private final ServiceDescriptor service;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final AbortingScheduler scheduler;
public AbstractDynamicClient(@Nonnull ServiceDescriptor service, HostsFile hosts, Supplier<Gson> gsonProvider) {
public AbstractDynamicClient(@Nonnull ServiceDescriptor service, Supplier<Gson> gsonProvider) {
super(
new ClientRoute(hosts.getHost(service), service.port),
service,
10,
gsonProvider
);

View File

@ -1,4 +0,0 @@
package nu.marginalia.client.model;
public record ClientRoute(String host, int port) {
}

View File

@ -0,0 +1,11 @@
package nu.marginalia.client.route;
import nu.marginalia.service.descriptor.ServiceDescriptor;
public interface RouteProvider {
ServiceRoute findRoute(int node);
static RouteProvider fromService(ServiceDescriptor serviceDescriptor) {
return (n) -> new ServiceRoute(serviceDescriptor.getHostName(n), 80);
}
}

View File

@ -0,0 +1,12 @@
package nu.marginalia.client.route;
import org.apache.http.HttpHost;
public record ServiceRoute(String hostname, int port) {
public String toString() {
if (port == 80) {
return "http://" + hostname;
}
return new HttpHost(hostname(), port()).toURI();
}
}

View File

@ -0,0 +1,22 @@
package nu.marginalia.client.route;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class ServiceRoutes {
private final ConcurrentHashMap<Integer, ServiceRoute> knownRoutes = new ConcurrentHashMap<>();
private final RouteProvider provider;
public ServiceRoutes(RouteProvider provider) {
this.provider = provider;
}
public ServiceRoute get(int node) {
return knownRoutes.computeIfAbsent(node, provider::findRoute);
}
public List<Integer> getNodes() {
return new ArrayList<>(knownRoutes.keySet());
}
}

View File

@ -5,7 +5,7 @@ import io.reactivex.rxjava3.core.Observable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import nu.marginalia.client.model.ClientRoute;
import nu.marginalia.client.route.ServiceRoute;
import org.junit.jupiter.api.*;
import spark.Request;
import spark.Response;
@ -34,7 +34,7 @@ public class AbstractClientTest {
int port = new Random().nextInt(6000, 10000);
testServer = new TestServer(port);
client = new AbstractClient(new ClientRoute("localhost", port), 1, Gson::new) {
client = new AbstractClient(n -> new ServiceRoute("localhost", port), 1, Gson::new) {
@Override
public AbortingScheduler scheduler() {
return new AbortingScheduler(name());

View File

@ -8,14 +8,14 @@ import java.util.List;
public class SearchServiceDescriptors {
public static ServiceDescriptors descriptors = new ServiceDescriptors(
List.of(new ServiceDescriptor(ServiceId.Api, 5004),
new ServiceDescriptor(ServiceId.Index, 5021),
new ServiceDescriptor(ServiceId.Query, 5022),
new ServiceDescriptor(ServiceId.Search, 5023),
new ServiceDescriptor(ServiceId.Executor, 5024),
new ServiceDescriptor(ServiceId.Assistant, 5025),
new ServiceDescriptor(ServiceId.Dating, 5070),
new ServiceDescriptor(ServiceId.Explorer, 5071),
new ServiceDescriptor(ServiceId.Control, 5090)
List.of(new ServiceDescriptor(ServiceId.Api),
new ServiceDescriptor(ServiceId.Index),
new ServiceDescriptor(ServiceId.Query),
new ServiceDescriptor(ServiceId.Search),
new ServiceDescriptor(ServiceId.Executor),
new ServiceDescriptor(ServiceId.Assistant),
new ServiceDescriptor(ServiceId.Dating),
new ServiceDescriptor(ServiceId.Explorer),
new ServiceDescriptor(ServiceId.Control)
));
}

View File

@ -1,45 +0,0 @@
package nu.marginalia.service.descriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
/** Mappings file between ServiceDescriptor.name and host
*
* */
public class HostsFile {
private final Map<String, String> hostsMap = new HashMap<>();
private static final Logger logger = LoggerFactory.getLogger(HostsFile.class);
public HostsFile(Path fileName) throws IOException {
var lines = Files.readAllLines(fileName);
for (var line : lines) {
if (line.startsWith("#") || line.isBlank()) {
continue;
}
String[] parts = line.strip().split(" ");
if (parts.length != 2) throw new IllegalArgumentException("Invalid hosts file entry " + line);
String descriptorName = parts[0];
String hostName = parts[1];
try {
hostsMap.put(descriptorName, hostName);
}
catch (IllegalArgumentException ex) {
logger.warn("Hosts file contains entry for unknown service {}", descriptorName);
}
}
}
public HostsFile() {
}
public String getHost(ServiceDescriptor sd) {
return hostsMap.getOrDefault(sd.name, sd.name);
}
}

View File

@ -5,18 +5,24 @@ import nu.marginalia.service.id.ServiceId;
public class ServiceDescriptor {
public final ServiceId id;
public final String name;
public final int port;
public ServiceDescriptor(ServiceId id, int port) {
public ServiceDescriptor(ServiceId id) {
this.id = id;
this.name = id.name;
this.port = port;
}
public ServiceDescriptor(ServiceId id, String host, int port) {
public ServiceDescriptor(ServiceId id, String host) {
this.id = id;
this.name = host;
this.port = port;
}
public String getHostName(int node) {
if (node > 0)
return name + "-" + node;
return name;
}
public String toString() {
return name;
}

View File

@ -41,7 +41,7 @@ public class ServiceConfigurationModule extends AbstractModule {
return Integer.parseInt(port);
}
return descriptors.forId(id).port;
return 80;
}
private int getPrometheusPort() {
@ -51,7 +51,7 @@ public class ServiceConfigurationModule extends AbstractModule {
return Integer.parseInt(prometheusPortEnv);
}
return descriptors.forId(id).port + 1000;
return 7000;
}
private int getNode() {

View File

@ -74,7 +74,7 @@ public class FileStorageMonitorActor extends AbstractActorPrototype {
transition(REMOVE_STALE, missing.get().id());
}
fileStorageService.synchronizeStorageManifests(fileStorageService.getStorageBase(FileStorageBaseType.WORK));
fileStorageService.synchronizeStorageManifests(fileStorageService.getStorageBase(FileStorageBaseType.STORAGE));
TimeUnit.SECONDS.sleep(10);
}

View File

@ -117,8 +117,8 @@ public class IndexConstructorMain {
private void createPrioReverseIndex() throws SQLException, IOException {
Path outputFileDocs = ReverseIndexFullFileNames.resolve(IndexLocations.getCurrentIndex(fileStorageService), ReverseIndexFullFileNames.FileIdentifier.DOCS, ReverseIndexFullFileNames.FileVersion.NEXT);
Path outputFileWords = ReverseIndexFullFileNames.resolve(IndexLocations.getCurrentIndex(fileStorageService), ReverseIndexFullFileNames.FileIdentifier.WORDS, ReverseIndexFullFileNames.FileVersion.NEXT);
Path outputFileDocs = ReverseIndexPrioFileNames.resolve(IndexLocations.getCurrentIndex(fileStorageService), ReverseIndexPrioFileNames.FileIdentifier.DOCS, ReverseIndexPrioFileNames.FileVersion.NEXT);
Path outputFileWords = ReverseIndexPrioFileNames.resolve(IndexLocations.getCurrentIndex(fileStorageService), ReverseIndexPrioFileNames.FileIdentifier.WORDS, ReverseIndexPrioFileNames.FileVersion.NEXT);
Path workDir = IndexLocations.getIndexConstructionArea(fileStorageService);
Path tmpDir = workDir.resolve("tmp");

View File

@ -1,16 +1,21 @@
# Control Service
The control service provides an operator's user interface, and is responsible for orchestrating the various
processes of the system using Actors.
The control service provides an operator's user interface. By default this interface is
exposed on port 8081. It does not offer any sort of access control or authentication.
Actors within the control service will spawn processes when necessary, by
monitoring their message queue inboxes.
The control service will itself execute tasks that affect the entire system, but delegate
node-specific tasks to the corresponding [executor-service](../executor-service) via the
[executor-api](../../api/executor-api).
Conceptually the application is broken into three parts:
* Application specific tasks relate to the high level abstractions such as blacklisting and API keys
* System tasks relate to low level abstractions such as the message queue and event log.
* Node tasks relate to index node specific tasks, such as crawling and indexing.
## Central Classes
* [ControlService](src/main/java/nu/marginalia/control/ControlService.java)
* [ControlActors](src/main/java/nu/marginalia/control/actor/ControlActors.java) - Class responsible for Actors' lifecycle
* [ProcessService](src/main/java/nu/marginalia/control/process/ProcessService.java) - Class responsible for spawning Processes
## See Also

View File

@ -157,26 +157,6 @@ public class ControlService extends Service {
"events", eventLogService.getLastEntries(Long.MAX_VALUE, 20));
}
private Object processesModel(Request request, Response response) {
var processes = heartbeatService.getProcessHeartbeats();
var jobs = heartbeatService.getTaskHeartbeats();
return Map.of("processes", processes,
"jobs", jobs,
"actors", controlActorService.getActorStates(request),
"messages", messageQueueService.getLastEntries(20));
}
// private Object actorDetailsModel(Request request, Response response) {
// final Actor actor = Actor.valueOf(request.params("fsm").toUpperCase());
// final String inbox = actor.id();
//
// return Map.of(
// "actor", actor,
// "state-graph", controlActorService.getActorStateGraph(actor),
// "messages", messageQueueService.getLastEntriesForInbox(inbox, 20));
// }
private Object serveStatic(Request request, Response response) {
String resource = request.params("resource");

View File

@ -3,7 +3,6 @@ package nu.marginalia.control;
import spark.ResponseTransformer;
public class Redirects {
public static final HtmlRedirect redirectToServices = new HtmlRedirect("/services");
public static final HtmlRedirect redirectToActors = new HtmlRedirect("/actors");
public static final HtmlRedirect redirectToApiKeys = new HtmlRedirect("/api-keys");
public static final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage");
@ -15,7 +14,7 @@ public class Redirects {
private final String html;
/** Because Spark doesn't have a redirect method that works with relative URLs
* (without explicitly providing the external address),we use HTML and let the
* (without explicitly providing the external address), we use HTML and let the
* browser resolve the relative redirect instead */
public HtmlRedirect(String destination) {
this.html = """

View File

@ -1,17 +0,0 @@
package nu.marginalia.control.node.model;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
public record ActorState(String name,
boolean current,
List<String> transitions,
String description) {
public ActorState(nu.marginalia.actor.state.ActorState gs, boolean current) {
this(gs.name(), current, toTransitions(gs.next(), gs.transitions()), gs.description());
}
private static List<String> toTransitions(String next, String[] transitions) {
return Stream.concat(Stream.of(next), Arrays.stream(transitions)).distinct().toList();
}
}

View File

@ -1,50 +0,0 @@
package nu.marginalia.control.node.model;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStateInstance;
import java.util.*;
public record ActorStateGraph(String description, List<nu.marginalia.control.node.model.ActorState> states) {
public ActorStateGraph(AbstractActorPrototype graph, ActorStateInstance currentState) {
this(graph.describe(), getStateList(graph, currentState));
}
private static List<nu.marginalia.control.node.model.ActorState> getStateList(
AbstractActorPrototype graph,
ActorStateInstance currentState)
{
Map<String, ActorState> declaredStates = graph.declaredStates();
Set<ActorState> seenStates = new HashSet<>(declaredStates.size());
LinkedList<ActorState> edge = new LinkedList<>();
List<nu.marginalia.control.node.model.ActorState> statesList = new ArrayList<>(declaredStates.size());
edge.add(declaredStates.get("INITIAL"));
while (!edge.isEmpty()) {
var first = edge.removeFirst();
if (first == null || !seenStates.add(first)) {
continue;
}
statesList.add(new nu.marginalia.control.node.model.ActorState(first, currentState.name().equals(first.name())));
edge.add(declaredStates.get(first.next()));
for (var transition : first.transitions()) {
edge.add(declaredStates.get(transition));
}
}
if (!declaredStates.containsKey("ERROR")) {
statesList.add(new nu.marginalia.control.node.model.ActorState("ERROR", currentState.name().equals("ERROR"), List.of(), "Terminal error state"));
}
if (!declaredStates.containsKey("END")) {
statesList.add(new nu.marginalia.control.node.model.ActorState("END", currentState.name().equals("END"), List.of(), "The machine terminated successfully"));
}
return statesList;
}
}

View File

@ -1,4 +1,9 @@
package nu.marginalia.control.node.model;
public record IndexNodeStatus(IndexNode node, boolean indexServiceOnline, boolean executorServiceOnline) {
import nu.marginalia.nodecfg.model.NodeConfiguration;
public record IndexNodeStatus(NodeConfiguration configuration, boolean indexServiceOnline, boolean executorServiceOnline) {
public int id() {
return configuration.node();
}
}

View File

@ -19,7 +19,7 @@ import java.sql.SQLException;
@Singleton
public class ControlFileStorageService {
private final FileStorageService fileStorageService;
private Logger logger = LoggerFactory.getLogger(getClass());
private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject
public ControlFileStorageService( FileStorageService fileStorageService)

View File

@ -127,7 +127,7 @@ public class ControlNodeService {
"nextNodeId", nextId);
}
private Object triggerCrawl(Request request, Response response) throws Exception {
private Object triggerCrawl(Request request, Response response) {
int nodeId = Integer.parseInt(request.params("id"));
executorClient.triggerCrawl(Context.fromRequest(request), nodeId, request.params("fid"));
@ -135,7 +135,7 @@ public class ControlNodeService {
return redirectToOverview(request);
}
private Object triggerRestoreBackup(Request request, Response response) throws Exception {
private Object triggerRestoreBackup(Request request, Response response) {
int nodeId = Integer.parseInt(request.params("id"));
executorClient.restoreBackup(Context.fromRequest(request), nodeId, request.params("fid"));
@ -362,9 +362,10 @@ public class ControlNodeService {
private Object nodeOverviewModel(Request request, Response response) throws SQLException {
int nodeId = Integer.parseInt(request.params("id"));
var config = nodeConfigurationService.get(nodeId);
return Map.of(
"node", new IndexNode(nodeId),
"status", getStatus(new IndexNode(nodeId)),
"status", getStatus(config),
"events", getEvents(nodeId),
"processes", heartbeatService.getProcessHeartbeatsForNode(nodeId),
"jobs", heartbeatService.getTaskHeartbeatsForNode(nodeId)
@ -394,29 +395,21 @@ public class ControlNodeService {
return events;
}
public List<IndexNode> getConfiguredNodes() {
return fileStorageService
.getConfiguredNodes()
.stream()
.sorted()
.map(IndexNode::new)
.toList();
}
@SneakyThrows
public List<IndexNodeStatus> getNodeStatusList() {
return fileStorageService
.getConfiguredNodes()
return nodeConfigurationService
.getAll()
.stream()
.sorted()
.map(IndexNode::new)
.sorted(Comparator.comparing(NodeConfiguration::node))
.map(this::getStatus)
.toList();
}
IndexNodeStatus getStatus(IndexNode node) {
return new IndexNodeStatus(node,
monitors.isServiceUp(ServiceId.Index, node.id()),
monitors.isServiceUp(ServiceId.Executor, node.id())
@SneakyThrows
public IndexNodeStatus getStatus(NodeConfiguration config) {
return new IndexNodeStatus(config,
monitors.isServiceUp(ServiceId.Index, config.node()),
monitors.isServiceUp(ServiceId.Executor, config.node())
);
}

View File

@ -2,14 +2,23 @@
<h2>Nodes</h2>
<table class="table">
<tr>
<th>Node</th><th>Index</th><th>Executor</th>
<th>Node</th><th>Queries</th><th>Enabled</th><th>Index</th><th>Executor</th>
</tr>
{{#each .}}
<tr>
<td>
<a href="/nodes/{{node.id}}">node-{{node.id}}</a>
<a href="/nodes/{{id}}">node-{{id}}</a>
</td>
<td>
{{#if configuration.acceptQueries}}
&check;
{{/if}}
</td>
<td>
{{#unless configuration.disabled}}
&check;
{{/unless}}
</td>
{{#if indexServiceOnline}}<td>Online</td>{{/if}}
{{#unless indexServiceOnline}}<td class="table-danger">Offline</td>{{/unless}}

View File

@ -186,7 +186,7 @@ class TestModule extends AbstractModule {
@Provides
public ServiceDescriptors getServiceDescriptors() {
return new ServiceDescriptors(
List.of(new ServiceDescriptor(ServiceId.Executor, "127.0.0.1", ExecutorSvcApiIntegrationTest.port))
List.of(new ServiceDescriptor(ServiceId.Executor, "127.0.0.1"))
);
}

View File

@ -8,8 +8,10 @@ import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.model.query.SearchSpecification;
import nu.marginalia.index.client.model.results.DecoratedSearchResultItem;
import nu.marginalia.index.client.model.results.SearchResultSet;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.query.model.QueryParams;
import nu.marginalia.query.model.QueryResponse;
import nu.marginalia.query.svc.NodeConfigurationWatcher;
import nu.marginalia.query.svc.QueryFactory;
import nu.marginalia.service.server.BaseServiceParams;
import nu.marginalia.service.server.Service;
@ -17,24 +19,31 @@ import spark.Request;
import spark.Response;
import spark.Spark;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class QueryService extends Service {
private final IndexClient indexClient;
private final NodeConfigurationWatcher nodeWatcher;
private final Gson gson;
private final DomainBlacklist blacklist;
private final QueryFactory queryFactory;
private volatile List<Integer> nodes = new ArrayList<>();
@Inject
public QueryService(BaseServiceParams params,
IndexClient indexClient,
NodeConfigurationWatcher nodeWatcher,
Gson gson,
DomainBlacklist blacklist,
QueryFactory queryFactory)
{
super(params);
this.indexClient = indexClient;
this.nodeWatcher = nodeWatcher;
this.gson = gson;
this.blacklist = blacklist;
this.queryFactory = queryFactory;
@ -73,7 +82,9 @@ public class QueryService extends Service {
}
private SearchResultSet executeQuery(Context ctx, SearchSpecification query) {
return indexClient.query(ctx, 0, query);
var nodes = nodeWatcher.getQueryNodes();
return indexClient.query(ctx, nodes, query);
}
private boolean isBlacklisted(DecoratedSearchResultItem item) {

View File

@ -0,0 +1,53 @@
package nu.marginalia.query.svc;
import com.google.inject.Inject;
import lombok.SneakyThrows;
import nu.marginalia.nodecfg.NodeConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class NodeConfigurationWatcher {
private static final Logger logger = LoggerFactory.getLogger(NodeConfigurationWatcher.class);
private volatile List<Integer> queryNodes = new ArrayList<>();
private final NodeConfigurationService configurationService;
@Inject
public NodeConfigurationWatcher(NodeConfigurationService configurationService) {
this.configurationService = configurationService;
var watcherThread = new Thread(this::pollConfiguration, "Node Configuration Watcher");
watcherThread.setDaemon(true);
watcherThread.start();
}
@SneakyThrows
private void pollConfiguration() {
for (;;) {
List<Integer> goodNodes = new ArrayList<>();
try {
for (var cfg : configurationService.getAll()) {
if (!cfg.disabled() && cfg.acceptQueries()) {
goodNodes.add(cfg.node());
}
}
queryNodes = goodNodes;
}
catch (SQLException ex) {
logger.warn("Failed to update node configurations", ex);
}
TimeUnit.SECONDS.sleep(10);
}
}
public List<Integer> getQueryNodes() {
return queryNodes;
}
}

View File

@ -2,113 +2,74 @@ x-svc: &service
env_file:
- "run/env/service.env"
volumes:
- vol:/vol
- backup:/backup
- conf:/wmsa/conf:ro
- model:/wmsa/model
- data:/wmsa/data
- dist:/dist
- samples:/samples
- logs:/var/log/wmsa
- dist:/dist
networks:
- wmsa
depends_on:
- mariadb
x-p1: &partition-1
env_file:
- "run/env/service.env"
volumes:
- conf:/wmsa/conf:ro
- model:/wmsa/model
- data:/wmsa/data
- logs:/var/log/wmsa
- dist:/dist
- index:/idx
- work:/work
- backup:/backup
- samples:/storage
networks:
- wmsa
depends_on:
- mariadb
environment:
- "WMSA_SERVICE_NODE=1"
services:
index-service:
<<: *service
index-service-1:
<<: *partition-1
image: "marginalia.nu/index-service"
container_name: "index-service"
ports:
- "127.0.0.1:5021:5021/tcp"
- "127.0.0.1:4021:5000"
- "127.0.0.1:7021:4000"
environment:
- "WMSA_SERVICE_NODE=0"
container_name: "index-service-1"
executor-service-1:
<<: *partition-1
image: "marginalia.nu/executor-service"
container_name: "executor-service-1"
search-service:
<<: *service
image: "marginalia.nu/search-service"
container_name: "search-service"
ports:
- "127.0.0.1:5023:5023"
- "127.0.0.1:4023:5000"
- "127.0.0.1:7023:4000"
depends_on:
- index-service
assistant-service:
<<: *service
image: "marginalia.nu/assistant-service"
container_name: "assistant-service"
ports:
- "127.0.0.1:5025:5025"
- "127.0.0.1:4025:5000"
- "127.0.0.1:7025:4000"
depends_on:
- mariadb
api-service:
<<: *service
image: "marginalia.nu/api-service"
container_name: "api-service"
ports:
- "127.0.0.1:5004:5004"
- "127.0.0.1:4004:5000"
- "127.0.0.1:7004:4000"
depends_on:
- mariadb
query-service:
<<: *service
image: "marginalia.nu/query-service"
container_name: "query-service"
ports:
- "127.0.0.1:5022:5022"
- "127.0.0.1:4022:5000"
- "127.0.0.1:7022:4000"
depends_on:
- mariadb
executor-service:
<<: *service
image: "marginalia.nu/executor-service"
container_name: "executor-service"
ports:
- "127.0.0.1:5024:5024"
- "127.0.0.1:4024:5000"
- "127.0.0.1:7024:4000"
depends_on:
- mariadb
environment:
- "WMSA_SERVICE_NODE=0"
dating-service:
<<: *service
image: "marginalia.nu/dating-service"
container_name: "dating-service"
ports:
- "127.0.0.1:5070:5070"
- "127.0.0.1:4070:5000"
- "127.0.0.1:7070:4000"
depends_on:
- mariadb
explorer-service:
<<: *service
image: "marginalia.nu/explorer-service"
container_name: "explorer-service"
ports:
- "127.0.0.1:5071:5071"
- "127.0.0.1:4071:5000"
- "127.0.0.1:7071:4000"
depends_on:
- mariadb
control-service:
<<: *service
image: "marginalia.nu/control-service"
container_name: "control-service"
ports:
- "127.0.0.1:5090:5090"
- "127.0.0.1:4090:5000"
- "127.0.0.1:7090:4000"
- "127.0.0.1:7099:4001"
depends_on:
- mariadb
mariadb:
image: "mariadb:lts"
container_name: "mariadb"
@ -143,12 +104,18 @@ volumes:
type: none
o: bind
device: run/db
vol:
index:
driver: local
driver_opts:
type: none
o: bind
device: run/vol
device: run/index
work:
driver: local
driver_opts:
type: none
o: bind
device: run/work
backup:
driver: local
driver_opts:

8
run/env/service.env vendored
View File

@ -1,6 +1,6 @@
WMSA_HOME=run/
EXECUTOR_SERVICE_OPTS="-DdistPath=/dist"
CONVERTER_PROCESS_OPTS="-ea -Dservice-host=0.0.0.0 -ea -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=4001 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -XX:StartFlightRecording:dumponexit=true,filename=/samples/converter.jfr"
CRAWLER_PROCESS_OPTS="-Dservice-host=0.0.0.0 -ea -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=4001 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -XX:StartFlightRecording:dumponexit=true,filename=/samples/crawler.jfr"
LOADER_PROCESS_OPTS="-Dservice-host=0.0.0.0 -ea -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=4001 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -XX:StartFlightRecording:dumponexit=true,filename=/samples/loader.jfr"
INDEX_CONSTRUCTION_PROCESS_OPTS="-ea -Djava.util.concurrent.ForkJoinPool.common.parallelism=4"
CONVERTER_PROCESS_OPTS="-Dservice-host=0.0.0.0"
CRAWLER_PROCESS_OPTS="-Dservice-host=0.0.0.0"
LOADER_PROCESS_OPTS="-Dservice-host=0.0.0.0"
INDEX_CONSTRUCTION_PROCESS_OPTS="-Djava.util.concurrent.ForkJoinPool.common.parallelism=4"

View File

@ -15,35 +15,26 @@ server {
rewrite ^/links/(.*)$ /search?query=links:$1&profile=corpo;
location /screenshot {
proxy_pass http://assistant-service:5025/public/screenshot;
proxy_pass http://assistant-service/public/screenshot;
}
location /site-search {
proxy_pass http://search-service:5023/public/site-search;
proxy_pass http://search-service/public/site-search;
}
location /site/suggest {
proxy_pass http://search-service:5023/public/site/suggest;
proxy_pass http://search-service/public/site/suggest;
}
location /site/flag-site {
proxy_pass http://search-service:5023/public/site/flag-site;
proxy_pass http://search-service/public/site/flag-site;
}
location /site/ {
rewrite ^/site/(.*)$ /search?query=site:$1&profile=yolo;
}
location /debug/wordmeta {
proxy_pass http://index-service:5021/public/debug/wordmeta;
}
location /debug/docmeta {
proxy_pass http://index-service:5021/public/debug/docmeta;
}
location /debug/word {
proxy_pass http://index-service:5021/public/debug/word;
}
location /suggest/ {
proxy_pass http://assistant-service:5025/public$request_uri;
proxy_pass http://assistant-service/public$request_uri;
access_log off;
}
location / {
proxy_pass http://search-service:5023/public/;
proxy_pass http://search-service/public/;
}
}
@ -62,7 +53,7 @@ server {
proxy_set_header X-Public "1";
location / {
proxy_pass http://control-service:5090/public/;
proxy_pass http://control-service/public/;
access_log off;
}
@ -81,7 +72,7 @@ server {
proxy_set_header X-Public "1";
location / {
proxy_pass http://api-service:5004/public/;
proxy_pass http://api-service/public/;
access_log off;
}

View File

@ -18,7 +18,7 @@ function download_model {
pushd $(dirname $0)
mkdir -p model logs db samples backup install vol/{ir,iw} vol/{lr,lw} vol/ss vol/{ldbw,ldbr} data samples/export
mkdir -p model logs db samples backup install work index data samples/export
download_model model/English.DICT https://raw.githubusercontent.com/datquocnguyen/RDRPOSTagger/master/Models/POS/English.DICT
download_model model/English.RDR https://raw.githubusercontent.com/datquocnguyen/RDRPOSTagger/master/Models/POS/English.RDR