Embryo of new control process

* New events and heartbeat tables in mariadb
* Refactored to a cleaner Service interface
This commit is contained in:
Viktor Lofgren 2023-07-03 10:40:32 +02:00
parent 42375f0e53
commit 62cc9df206
34 changed files with 835 additions and 109 deletions

View File

@ -0,0 +1,17 @@
CREATE TABLE PROC_SERVICE_HEARTBEAT(
SERVICE_NAME VARCHAR(255) PRIMARY KEY COMMENT 'Full name of the service, including node id if applicable, e.g. search-service:0',
SERVICE_BASE VARCHAR(255) NOT NULL COMMENT 'Base name of the service, e.g. search-service',
INSTANCE VARCHAR(255) NOT NULL COMMENT 'UUID of the service instance',
ALIVE BOOLEAN NOT NULL DEFAULT TRUE COMMENT 'Set to false when the service is doing an orderly shutdown',
HEARTBEAT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'Service was last seen at this point'
);
CREATE TABLE PROC_SERVICE_EVENTLOG(
ID BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT 'Unique id',
SERVICE_NAME VARCHAR(255) NOT NULL COMMENT 'Full name of the service, including node id if applicable, e.g. search-service:0',
SERVICE_BASE VARCHAR(255) NOT NULL COMMENT 'Base name of the service, e.g. search-service',
INSTANCE VARCHAR(255) NOT NULL COMMENT 'UUID of the service instance',
EVENT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'Event time',
EVENT_TYPE VARCHAR(255) NOT NULL COMMENT 'Event type',
EVENT_MESSAGE VARCHAR(255) NOT NULL COMMENT 'Event message'
);

View File

@ -0,0 +1,17 @@
CREATE TABLE PROC_SERVICE_HEARTBEAT(
SERVICE_NAME VARCHAR(255) PRIMARY KEY COMMENT "Full name of the service, including node id if applicable, e.g. search-service:0",
SERVICE_BASE VARCHAR(255) NOT NULL COMMENT "Base name of the service, e.g. search-service",
INSTANCE VARCHAR(255) NOT NULL COMMENT "UUID of the service instance",
ALIVE BOOLEAN NOT NULL DEFAULT TRUE COMMENT "Set to false when the service is doing an orderly shutdown",
HEARTBEAT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT "Service was last seen at this point"
);
CREATE TABLE PROC_SERVICE_EVENTLOG(
ID BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT "Unique id",
SERVICE_NAME VARCHAR(255) NOT NULL COMMENT "Full name of the service, including node id if applicable, e.g. search-service:0",
SERVICE_BASE VARCHAR(255) NOT NULL COMMENT "Base name of the service, e.g. search-service",
INSTANCE VARCHAR(255) NOT NULL COMMENT "UUID of the service instance",
EVENT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT "Event time",
EVENT_TYPE VARCHAR(255) NOT NULL COMMENT "Event type",
EVENT_MESSAGE VARCHAR(255) NOT NULL COMMENT "Event message"
);

View File

@ -13,7 +13,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class AbortingScheduler {
private final String name;
private final ThreadFactory threadFactory;
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -22,7 +21,6 @@ public class AbortingScheduler {
private ExecutorService executorService;
public AbortingScheduler(String name) {
this.name = name;
threadFactory = new ThreadFactoryBuilder()
.setNameFormat(name+"client--%d")
.setUncaughtExceptionHandler(this::handleException)

View File

@ -0,0 +1,133 @@
package nu.marginalia.client;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.service.id.ServiceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Singleton
public class ServiceMonitors {
private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Set<String> runningServices = new HashSet<>();
private final Set<Runnable> callbacks = new HashSet<>();
private final int heartbeatInterval = Integer.getInteger("mcp.heartbeat.interval", 5);
private volatile boolean running;
@Inject
public ServiceMonitors(HikariDataSource dataSource) {
this.dataSource = dataSource;
var runThread = new Thread(this::run, "service monitor");
runThread.setDaemon(true);
runThread.start();
}
public void subscribe(Runnable callback) {
synchronized (callbacks) {
callbacks.add(callback);
}
}
public void unsubscribe(Runnable callback) {
synchronized (callbacks) {
callbacks.remove(callback);
}
}
public void run() {
if (running) {
return;
}
else {
running = true;
}
while (running) {
if (updateRunningServices()) {
runCallbacks();
}
try {
TimeUnit.SECONDS.sleep(heartbeatInterval);
}
catch (InterruptedException ex) {
logger.warn("ServiceMonitors interrupted", ex);
running = false;
}
}
}
private void runCallbacks() {
synchronized (callbacks) {
for (var callback : callbacks) {
synchronized (runningServices) {
callback.run();
}
}
}
}
private boolean updateRunningServices() {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT SERVICE_BASE, TIMESTAMPDIFF(SECOND, HEARTBEAT_TIME, CURRENT_TIMESTAMP(6))
FROM PROC_SERVICE_HEARTBEAT
WHERE ALIVE=1
""")) {
try (var rs = stmt.executeQuery()) {
Set<String> newRunningServices = new HashSet<>(10);
while (rs.next()) {
String svc = rs.getString(1);
int dtime = rs.getInt(2);
if (dtime < 2.5 * heartbeatInterval) {
newRunningServices.add(svc);
}
}
boolean changed;
synchronized (runningServices) {
changed = !Objects.equals(runningServices, newRunningServices);
runningServices.clear();
runningServices.addAll(newRunningServices);
}
return changed;
}
}
catch (SQLException ex) {
logger.warn("Failed to update running services", ex);
}
return false;
}
public boolean isServiceUp(ServiceId serviceId) {
synchronized (runningServices) {
return runningServices.contains(serviceId.name);
}
}
public List<ServiceId> getRunningServices() {
List<ServiceId> ret = new ArrayList<>(ServiceId.values().length);
synchronized (runningServices) {
for (var runningService : runningServices) {
ret.add(ServiceId.byName(runningService));
}
}
return ret;
}
}

View File

@ -13,5 +13,7 @@ public class SearchServiceDescriptors {
new ServiceDescriptor(ServiceId.Search, 5023),
new ServiceDescriptor(ServiceId.Assistant, 5025),
new ServiceDescriptor(ServiceId.Dating, 5070),
new ServiceDescriptor(ServiceId.Explorer, 5071)));
new ServiceDescriptor(ServiceId.Explorer, 5071),
new ServiceDescriptor(ServiceId.Control, 5090)
));
}

View File

@ -7,19 +7,22 @@ public enum ServiceId {
Search("search-service"),
Index("index-service"),
Control("control-service"),
Dating("dating-service"),
Explorer("explorer-service"),
Other_Auth("auth"),
Other_Memex("memex"),
Other_ResourceStore("resource-store"),
Other_Renderer("renderer"),
Other_PodcastScraper("podcast-scraper");
Explorer("explorer-service");
public final String name;
ServiceId(String name) {
this.name = name;
}
public static ServiceId byName(String name) {
for (ServiceId id : values()) {
if (id.name.equals(name)) {
return id;
}
}
return null;
}
}

View File

@ -12,6 +12,7 @@ java {
dependencies {
implementation project(':code:common:service-client')
implementation project(':code:common:service-discovery')
implementation project(':code:common:db')
implementation libs.lombok
annotationProcessor libs.lombok

View File

@ -3,6 +3,52 @@
Contains the base classes for the services. This is where port configuration,
and common endpoints are set up.
## Creating a new Service
The minimal service needs a `MainClass` and a `Service` class.
For proper initiation, the main class should look like this:
```java
public class FoobarMain extends MainClass {
@Inject
public FoobarMain(FoobarService service) {}
public static void main(String... args) {
init(ServiceId.Foobar, args);
Injector injector = Guice.createInjector(
new FoobarModule(), /* optional custom bindings go here */
new DatabaseModule(),
new ConfigurationModule(SearchServiceDescriptors.descriptors,
ServiceId.Foobar));
injector.getInstance(FoobarMain.class);
// set the service as ready so that delayed tasks can be started
injector.getInstance(Initialization.class).setReady();
}
}
```
A service class has a boilerplate set-up that looks like this:
```java
@Singleton
public class FoobarService extends Service {
@Inject
public FoobarService(BaseServiceParams params) {
super(params);
// set up Spark endpoints here
}
}
```
Further the new service needs to be added to the `ServiceId` enum in [service-discovery](../service-discovery).
## Central Classes
* [MainClass](src/main/java/nu/marginalia/service/MainClass.java) bootstraps all executables

View File

@ -11,6 +11,9 @@ import org.slf4j.LoggerFactory;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
/** Each main class of a service should extend this class.
* They must also invoke init() in their main method.
*/
public abstract class MainClass {
private final Logger logger = LoggerFactory.getLogger(getClass());

View File

@ -0,0 +1,58 @@
package nu.marginalia.service.control;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Objects;
import java.util.UUID;
@Singleton
public class ServiceEventLog {
private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(ServiceEventLog.class);
private final String serviceName;
private final UUID instanceUuid;
private final String serviceBase;
@Inject
public ServiceEventLog(HikariDataSource dataSource,
ServiceConfiguration configuration
) {
this.dataSource = dataSource;
this.serviceName = configuration.serviceName() + ":" + configuration.node();
this.instanceUuid = configuration.instanceUuid();
this.serviceBase = configuration.serviceName();
logger.info("Starting service {} instance {}", serviceName, instanceUuid);
logEvent("START", "Service starting");
}
public void logEvent(String type, String message) {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
INSERT INTO PROC_SERVICE_EVENTLOG(SERVICE_NAME, SERVICE_BASE, INSTANCE, EVENT_TYPE, EVENT_MESSAGE)
VALUES (?, ?, ?, ?, ?)
""")) {
stmt.setString(1, serviceName);
stmt.setString(2, serviceBase);
stmt.setString(3, instanceUuid.toString());
stmt.setString(4, type);
stmt.setString(5, Objects.requireNonNull(message, ""));
stmt.executeUpdate();
}
catch (SQLException ex) {
logger.error("Failed to log event {}:{}", type, message);
}
}
}

View File

@ -0,0 +1,145 @@
package nu.marginalia.service.control;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
/** This service sends a heartbeat to the database every 5 seconds.
*/
@Singleton
public class ServiceHeartbeat {
private final Logger logger = LoggerFactory.getLogger(ServiceHeartbeat.class);
private final String serviceName;
private final String serviceBase;
private final String instanceUUID;
private final HikariDataSource dataSource;
private final Thread runnerThread;
private final int heartbeatInterval = Integer.getInteger("mcp.heartbeat.interval", 5);
private volatile boolean running = false;
@Inject
public ServiceHeartbeat(ServiceConfiguration configuration,
HikariDataSource dataSource)
{
this.serviceName = configuration.serviceName() + ":" + configuration.node();
this.serviceBase = configuration.serviceName();
this.dataSource = dataSource;
this.instanceUUID = configuration.instanceUuid().toString();
runnerThread = new Thread(this::run);
Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown));
}
public void start() {
if (!running) {
runnerThread.start();
}
}
public void shutDown() {
if (!running)
return;
running = false;
try {
runnerThread.join();
heartbeatStop();
}
catch (InterruptedException|SQLException ex) {
logger.warn("ServiceHeartbeat shutdown failed", ex);
}
}
private void run() {
if (!running)
running = true;
else
return;
try {
heartbeatInit();
while (running) {
try {
heartbeatUpdate();
}
catch (SQLException ex) {
logger.warn("ServiceHeartbeat failed to update", ex);
}
TimeUnit.SECONDS.sleep(heartbeatInterval);
}
}
catch (InterruptedException|SQLException ex) {
logger.error("ServiceHeartbeat caught irrecoverable exception, killing service", ex);
System.exit(255);
}
}
private void heartbeatInit() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
INSERT INTO PROC_SERVICE_HEARTBEAT (SERVICE_NAME, SERVICE_BASE, INSTANCE, HEARTBEAT_TIME, ALIVE)
VALUES (?, ?, ?, CURRENT_TIMESTAMP(6), 1)
ON DUPLICATE KEY UPDATE
INSTANCE = ?,
HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
ALIVE = 1
"""
))
{
stmt.setString(1, serviceName);
stmt.setString(2, serviceBase);
stmt.setString(3, instanceUUID);
stmt.setString(4, instanceUUID);
stmt.executeUpdate();
}
}
}
private void heartbeatUpdate() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE PROC_SERVICE_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6)
WHERE INSTANCE = ? AND ALIVE = 1
""")
)
{
stmt.setString(1, instanceUUID);
stmt.executeUpdate();
}
}
}
private void heartbeatStop() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE PROC_SERVICE_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6), ALIVE = 0
WHERE INSTANCE = ?
""")
)
{
stmt.setString(1, instanceUUID);
stmt.executeUpdate();
}
}
}
}

View File

@ -8,9 +8,9 @@ import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import java.util.Objects;
import java.util.UUID;
public class ConfigurationModule extends AbstractModule {
private static final String SERVICE_NAME = System.getProperty("service-name");
private final ServiceDescriptors descriptors;
private final ServiceId id;
@ -21,15 +21,13 @@ public class ConfigurationModule extends AbstractModule {
public void configure() {
bind(ServiceDescriptors.class).toInstance(descriptors);
bind(String.class).annotatedWith(Names.named("service-name")).toInstance(Objects.requireNonNull(SERVICE_NAME));
bind(String.class).annotatedWith(Names.named("service-host")).toInstance(System.getProperty("service-host", "127.0.0.1"));
bind(Integer.class).annotatedWith(Names.named("service-port")).toInstance(descriptors.forId(id).port);
}
@Provides
@Named("metrics-server-port")
public Integer provideMetricsServerPort(@Named("service-port") Integer servicePort) {
return servicePort + 1000;
int basePort = descriptors.forId(id).port;
int prometheusPort = basePort + 1000;
String host = Objects.requireNonNull(System.getProperty("service-host", "127.0.0.1"));
var configObject = new ServiceConfiguration(id, 0, host, basePort, prometheusPort, UUID.randomUUID());
bind(ServiceConfiguration.class).toInstance(configObject);
}
}

View File

@ -0,0 +1,27 @@
package nu.marginalia.service.module;
import nu.marginalia.service.id.ServiceId;
import java.util.UUID;
/**
* Configuration object for a service. This is a guice-injectable object
* intended to keep down the amount of named bindings.
*
* @param serviceId - service descriptor
* @param node - always 0 for now, for future service partitioning
* @param host - the bind address of the service
* @param port - main port of the service
* @param metricsPort - prometheus metrics server port
* @param instanceUuid - unique identifier for this instance of the service
*/
public record ServiceConfiguration(ServiceId serviceId,
int node,
String host,
int port,
int metricsPort,
UUID instanceUuid) {
public String serviceName() {
return serviceId.name;
}
}

View File

@ -0,0 +1,30 @@
package nu.marginalia.service.server;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.service.module.ServiceConfiguration;
/** This class exists to reduce Service boilerplate */
@Singleton
public class BaseServiceParams {
public final ServiceConfiguration configuration;
public final Initialization initialization;
public final MetricsServer metricsServer;
public final ServiceHeartbeat heartbeat;
public final ServiceEventLog eventLog;
@Inject
public BaseServiceParams(ServiceConfiguration configuration,
Initialization initialization,
MetricsServer metricsServer,
ServiceHeartbeat heartbeat,
ServiceEventLog eventLog) {
this.configuration = configuration;
this.initialization = initialization;
this.metricsServer = metricsServer;
this.heartbeat = heartbeat;
this.eventLog = eventLog;
}
}

View File

@ -5,10 +5,14 @@ import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@Singleton
public class Initialization {
boolean initialized;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final List<Runnable> callbacks = new ArrayList<>();
public static Initialization already() {
Initialization init = new Initialization();
@ -21,6 +25,27 @@ public class Initialization {
logger.info("Initialized");
initialized = true;
notifyAll();
}
callbacks.forEach(Runnable::run);
callbacks.clear();
}
public void addCallback(Runnable callback) {
boolean runNow;
synchronized (this) {
if (!initialized) {
callbacks.add(callback);
runNow = false;
} else {
runNow = true;
}
}
if (runNow) {
callback.run();
}
}

View File

@ -1,9 +1,9 @@
package nu.marginalia.service.server;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.prometheus.client.exporter.MetricsServlet;
import lombok.SneakyThrows;
import nu.marginalia.service.module.ServiceConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
@ -12,8 +12,8 @@ public class MetricsServer {
@SneakyThrows
@Inject
public MetricsServer(@Named("metrics-server-port") int port) {
Server server = new Server(port);
public MetricsServer(ServiceConfiguration configuration) {
Server server = new Server(configuration.metricsPort());
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
server.setHandler(context);

View File

@ -1,6 +1,5 @@
package nu.marginalia.service.server;
import com.google.common.base.Strings;
import io.prometheus.client.Counter;
import nu.marginalia.client.Context;
import nu.marginalia.client.exception.MessagingException;
@ -35,22 +34,28 @@ public class Service {
.labelNames("service")
.register();
private final String serviceName;
private static volatile boolean initialized = false;
public Service(String ip, int port, Initialization initialization, MetricsServer metricsServer, Runnable configureStaticFiles) {
this.initialization = initialization;
public Service(BaseServiceParams params,
Runnable configureStaticFiles
) {
this.initialization = params.initialization;
serviceName = System.getProperty("service-name");
initialization.addCallback(params.heartbeat::start);
initialization.addCallback(() -> params.eventLog.logEvent("SVC-INIT", ""));
if (!initialization.isReady() && ! initialized ) {
initialized = true;
Spark.threadPool(32, 4, 60_000);
Spark.ipAddress(ip);
Spark.port(port);
Spark.ipAddress(params.configuration.host());
Spark.port(params.configuration.port());
logger.info("{} Listening to {}:{}", getClass().getSimpleName(), ip == null ? "" : ip, port);
logger.info("{} Listening to {}:{}", getClass().getSimpleName(),
params.configuration.host(),
params.configuration.port());
configureStaticFiles.run();
@ -66,8 +71,8 @@ public class Service {
}
}
public Service(String ip, int port, Initialization initialization, MetricsServer metricsServer) {
this(ip, port, initialization, metricsServer, () -> {
public Service(BaseServiceParams params) {
this(params, () -> {
// configureStaticFiles can't be an overridable method in Service because it may
// need to depend on parameters to the constructor, and super-constructors
// must run first

View File

@ -2,7 +2,6 @@ package nu.marginalia.assistant;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import lombok.SneakyThrows;
import nu.marginalia.assistant.eval.Units;
import nu.marginalia.assistant.suggest.Suggestions;
@ -10,9 +9,7 @@ import nu.marginalia.assistant.eval.MathParser;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.screenshot.ScreenshotService;
import nu.marginalia.assistant.dict.DictionaryService;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.MetricsServer;
import nu.marginalia.service.server.Service;
import nu.marginalia.service.server.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Request;
@ -28,18 +25,15 @@ public class AssistantService extends Service {
@SneakyThrows
@Inject
public AssistantService(@Named("service-host") String ip,
@Named("service-port") Integer port,
Initialization initialization,
MetricsServer metricsServer,
public AssistantService(BaseServiceParams params,
DictionaryService dictionaryService,
MathParser mathParser,
Units units,
ScreenshotService screenshotService,
Suggestions suggestions
)
Suggestions suggestions)
{
super(ip, port, initialization, metricsServer);
super(params);
this.mathParser = mathParser;
this.units = units;
this.suggestions = suggestions;

View File

@ -8,6 +8,7 @@ import nu.marginalia.WmsaHome;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.KeywordLexiconReadOnlyView;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.service.control.ServiceEventLog;
import java.nio.file.Path;
@ -20,13 +21,20 @@ public class IndexModule extends AbstractModule {
@Provides
@SneakyThrows
private KeywordLexiconReadOnlyView createLexicon() {
return new KeywordLexiconReadOnlyView(
new KeywordLexicon(
new KeywordLexiconJournal(WmsaHome.getDisk("index-write").resolve("dictionary.dat").toFile()
private KeywordLexiconReadOnlyView createLexicon(ServiceEventLog eventLog) {
try {
eventLog.logEvent("INDEX-LEXICON-LOAD-BEGIN", "");
return new KeywordLexiconReadOnlyView(
new KeywordLexicon(
new KeywordLexiconJournal(WmsaHome.getDisk("index-write").resolve("dictionary.dat").toFile()
)
)
)
);
);
}
finally {
eventLog.logEvent("INDEX-LEXICON-LOAD-OK", "");
}
}
@Provides

View File

@ -2,16 +2,14 @@ package nu.marginalia.index;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.reactivex.rxjava3.schedulers.Schedulers;
import nu.marginalia.index.index.SearchIndex;
import nu.marginalia.index.svc.IndexOpsService;
import nu.marginalia.index.svc.IndexQueryService;
import nu.marginalia.index.svc.IndexSearchSetsService;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.MetricsServer;
import nu.marginalia.service.server.Service;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.*;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,28 +32,29 @@ public class IndexService extends Service {
private final IndexServicesFactory servicesFactory;
private final IndexSearchSetsService searchSetsService;
private final ServiceEventLog eventLog;
@Inject
public IndexService(@Named("service-host") String ip,
@Named("service-port") Integer port,
Initialization init,
MetricsServer metricsServer,
public IndexService(BaseServiceParams params,
IndexOpsService opsService,
IndexQueryService indexQueryService,
SearchIndex searchIndex,
IndexServicesFactory servicesFactory,
IndexSearchSetsService searchSetsService)
IndexSearchSetsService searchSetsService,
ServiceEventLog eventLog)
{
super(ip, port, init, metricsServer);
super(params);
this.opsService = opsService;
this.searchIndex = searchIndex;
this.servicesFactory = servicesFactory;
this.searchSetsService = searchSetsService;
this.eventLog = eventLog;
final Gson gson = GsonFactory.get();
this.init = init;
this.init = params.initialization;
Spark.post("/search/", indexQueryService::search, gson::toJson);
@ -94,9 +93,11 @@ public class IndexService extends Service {
}
try {
eventLog.logEvent("INDEX-AUTO-CONVERT-BEGIN", "");
logger.info("Auto-converting");
searchSetsService.recalculateAll();
searchIndex.switchIndex();
eventLog.logEvent("INDEX-AUTO-CONVERT-END", "");
logger.info("Auto-conversion finished!");
}
catch (IOException ex) {

View File

@ -6,6 +6,7 @@ import nu.marginalia.index.IndexServicesFactory;
import nu.marginalia.index.query.*;
import nu.marginalia.index.query.filter.QueryFilterStepFromPredicate;
import nu.marginalia.index.svc.IndexSearchSetsService;
import nu.marginalia.service.control.ServiceEventLog;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,10 +37,15 @@ public class SearchIndex {
private final IndexServicesFactory servicesFactory;
private final IndexSearchSetsService searchSetsService;
private final ServiceEventLog eventLog;
@Inject
public SearchIndex(@NotNull IndexServicesFactory servicesFactory, IndexSearchSetsService searchSetsService) {
public SearchIndex(@NotNull IndexServicesFactory servicesFactory,
IndexSearchSetsService searchSetsService,
ServiceEventLog eventLog) {
this.servicesFactory = servicesFactory;
this.searchSetsService = searchSetsService;
this.eventLog = eventLog;
}
public void init() {
@ -51,7 +57,13 @@ public class SearchIndex {
if (indexReader == null) {
indexReader = servicesFactory.getSearchIndexReader();
eventLog.logEvent("INDEX-INIT", "Index loaded");
}
else {
eventLog.logEvent("INDEX-INIT", "No index loaded");
}
}
catch (Exception ex) {
logger.error("Uncaught exception", ex);
@ -63,9 +75,12 @@ public class SearchIndex {
public boolean switchIndex() throws IOException {
eventLog.logEvent("CONVERT-INDEX-BEGIN", "");
servicesFactory.convertIndex(searchSetsService.getDomainRankings());
eventLog.logEvent("CONVERT-INDEX-END", "");
System.gc();
eventLog.logEvent("INDEX-SWITCH-BEGIN", "");
Lock lock = indexReplacementLock.writeLock();
try {
lock.lock();
@ -73,11 +88,15 @@ public class SearchIndex {
servicesFactory.switchFilesJob().call();
indexReader = servicesFactory.getSearchIndexReader();
eventLog.logEvent("INDEX-SWITCH-OK", "");
}
catch (Exception ex) {
eventLog.logEvent("INDEX-SWITCH-ERR", "");
logger.error("Uncaught exception", ex);
}
finally {
lock.unlock();
}

View File

@ -1,11 +1,8 @@
package nu.marginalia.index.svc;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import nu.marginalia.WmsaHome;
import nu.marginalia.index.IndexServicesFactory;
import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.index.journal.writer.IndexJournalWriterImpl;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.KeywordLexiconReadOnlyView;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
@ -13,12 +10,17 @@ import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.index.svc.searchset.SearchSetAny;
import nu.marginalia.index.util.TestUtil;
import nu.marginalia.index.client.model.query.SearchSetIdentifier;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ServiceConfiguration;
import org.mockito.Mockito;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Random;
import java.util.UUID;
import static org.mockito.Mockito.when;
@ -62,8 +64,18 @@ public class IndexQueryServiceIntegrationTestModule extends AbstractModule {
bind(IndexJournalWriter.class).toInstance(servicesFactory.createIndexJournalWriter(keywordLexicon));
bind(String.class).annotatedWith(Names.named("service-host")).toInstance("127.0.0.1");
bind(Integer.class).annotatedWith(Names.named("service-port")).toProvider(this::randomPort);
bind(ServiceEventLog.class).toInstance(Mockito.mock(ServiceEventLog.class));
bind(ServiceHeartbeat.class).toInstance(Mockito.mock(ServiceHeartbeat.class));
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(
ServiceId.Index,
0,
"127.0.0.1",
randomPort(),
randomPort(),
UUID.randomUUID()
));
} catch (IOException e) {
throw new RuntimeException(e);
}

View File

@ -2,17 +2,13 @@ package nu.marginalia.search;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import lombok.SneakyThrows;
import nu.marginalia.WebsiteUrl;
import nu.marginalia.client.Context;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.search.svc.SearchFrontPageService;
import nu.marginalia.search.svc.*;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.MetricsServer;
import nu.marginalia.service.server.Service;
import nu.marginalia.service.server.StaticResources;
import nu.marginalia.service.server.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Request;
@ -31,10 +27,7 @@ public class SearchService extends Service {
@SneakyThrows
@Inject
public SearchService(@Named("service-host") String ip,
@Named("service-port") Integer port,
Initialization initialization,
MetricsServer metricsServer,
public SearchService(BaseServiceParams params,
WebsiteUrl websiteUrl,
StaticResources staticResources,
SearchFrontPageService frontPageService,
@ -44,7 +37,7 @@ public class SearchService extends Service {
SearchQueryService searchQueryService,
SearchApiQueryService apiQueryService
) {
super(ip, port, initialization, metricsServer);
super(params);
this.websiteUrl = websiteUrl;
this.staticResources = staticResources;

View File

@ -2,7 +2,6 @@ package nu.marginalia.api;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import nu.marginalia.api.model.ApiLicense;
import nu.marginalia.api.svc.LicenseService;
import nu.marginalia.api.svc.RateLimiterService;
@ -11,9 +10,7 @@ import nu.marginalia.client.Context;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.search.client.SearchClient;
import nu.marginalia.search.client.model.ApiSearchResults;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.MetricsServer;
import nu.marginalia.service.server.Service;
import nu.marginalia.service.server.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
@ -36,16 +33,14 @@ public class ApiService extends Service {
private final Marker queryMarker = MarkerFactory.getMarker("QUERY");
@Inject
public ApiService(@Named("service-host") String ip,
@Named("service-port") Integer port,
Initialization initialization,
MetricsServer metricsServer,
public ApiService(BaseServiceParams params,
SearchClient searchClient,
ResponseCache responseCache,
LicenseService licenseService,
RateLimiterService rateLimiterService) {
RateLimiterService rateLimiterService
) {
super(ip, port, initialization, metricsServer);
super(params);
this.searchClient = searchClient;
this.responseCache = responseCache;

View File

@ -0,0 +1,64 @@
plugins {
id 'java'
id "io.freefair.lombok" version "5.3.3.3"
id 'application'
id 'com.palantir.docker' version '0.34.0'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
application {
mainClass = 'nu.marginalia.control.ControlMain'
applicationName = 'control-service'
}
tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/docker-service.gradle"
dependencies {
implementation project(':code:common:db')
implementation project(':code:common:model')
implementation project(':code:common:service')
implementation project(':code:common:config')
implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client')
implementation project(':code:api:search-api')
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation libs.prometheus
implementation libs.notnull
implementation libs.guice
implementation libs.trove
implementation libs.spark
implementation libs.fastutil
implementation libs.bundles.gson
implementation libs.bundles.mariadb
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation 'org.testcontainers:mariadb:1.17.4'
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
}
test {
useJUnitPlatform()
}
task fastTests(type: Test) {
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -0,0 +1,29 @@
package nu.marginalia.control;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ConfigurationModule;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.server.Initialization;
public class ControlMain extends MainClass {
@Inject
public ControlMain(ControlService service) {
}
public static void main(String... args) {
init(ServiceId.Control, args);
Injector injector = Guice.createInjector(
new DatabaseModule(),
new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Control));
injector.getInstance(ControlMain.class);
injector.getInstance(Initialization.class).setReady();
}
}

View File

@ -0,0 +1,42 @@
package nu.marginalia.control;
import com.google.gson.Gson;
import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.service.server.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Spark;
public class ControlService extends Service {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Gson gson = GsonFactory.get();
private final ServiceMonitors monitors;
@Inject
public ControlService(BaseServiceParams params,
ServiceMonitors monitors,
HeartbeatService heartbeatService
) {
super(params);
this.monitors = monitors;
Spark.get("/public/heartbeats", (req, res) -> {
res.type("application/json");
return heartbeatService.getHeartbeats();
}, gson::toJson);
monitors.subscribe(this::logMonitorStateChange);
}
private void logMonitorStateChange() {
logger.info("Service state change: {}", monitors.getRunningServices());
}
}

View File

@ -0,0 +1,48 @@
package nu.marginalia.control;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.control.model.ServiceHeartbeat;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@Singleton
public class HeartbeatService {
private final HikariDataSource dataSource;
@Inject
public HeartbeatService(HikariDataSource dataSource) {
this.dataSource = dataSource;
}
public List<ServiceHeartbeat> getHeartbeats() {
List<ServiceHeartbeat> heartbeats = new ArrayList<>();
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT SERVICE_NAME, SERVICE_BASE, INSTANCE, ALIVE,
TIMESTAMPDIFF(MICROSECOND, HEARTBEAT_TIME, CURRENT_TIMESTAMP(6)) AS TSDIFF
FROM PROC_SERVICE_HEARTBEAT
""")) {
var rs = stmt.executeQuery();
while (rs.next()) {
heartbeats.add(new ServiceHeartbeat(
rs.getString("SERVICE_NAME"),
rs.getString("SERVICE_BASE"),
rs.getString("INSTANCE"),
rs.getInt("TSDIFF") / 1000.,
rs.getBoolean("ALIVE")
));
}
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
return heartbeats;
}
}

View File

@ -0,0 +1,11 @@
package nu.marginalia.control.model;
public record ServiceHeartbeat(
String serviceId,
String serviceBase,
String uuid,
double lastSeenMillis,
boolean alive
) {
}

View File

@ -1,7 +1,6 @@
package nu.marginalia.dating;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import lombok.SneakyThrows;
import nu.marginalia.browse.DbBrowseDomainsRandom;
import nu.marginalia.browse.DbBrowseDomainsSimilarCosine;
@ -11,9 +10,7 @@ import nu.marginalia.renderer.MustacheRenderer;
import nu.marginalia.renderer.RendererFactory;
import nu.marginalia.screenshot.ScreenshotService;
import nu.marginalia.model.id.EdgeId;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.MetricsServer;
import nu.marginalia.service.server.Service;
import nu.marginalia.service.server.*;
import org.jetbrains.annotations.NotNull;
import spark.Request;
import spark.Response;
@ -33,17 +30,15 @@ public class DatingService extends Service {
private final String SESSION_OBJECT_NAME = "so";
@SneakyThrows
@Inject
public DatingService(@Named("service-host") String ip,
@Named("service-port") Integer port,
public DatingService(BaseServiceParams params,
RendererFactory rendererFactory,
Initialization initialization,
MetricsServer metricsServer,
DomainBlacklist blacklist,
DbBrowseDomainsSimilarCosine browseSimilarCosine,
DbBrowseDomainsRandom browseRandom,
ScreenshotService screenshotService) {
ScreenshotService screenshotService)
{
super(ip, port, initialization, metricsServer);
super(params);
this.blacklist = blacklist;

View File

@ -1,15 +1,11 @@
package nu.marginalia.explorer;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.renderer.MustacheRenderer;
import nu.marginalia.renderer.RendererFactory;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.MetricsServer;
import nu.marginalia.service.server.Service;
import nu.marginalia.service.server.StaticResources;
import nu.marginalia.service.server.*;
import org.jetbrains.annotations.NotNull;
import spark.Request;
import spark.Response;
@ -42,16 +38,13 @@ public class ExplorerService extends Service {
@SneakyThrows
@Inject
public ExplorerService(@Named("service-host") String ip,
@Named("service-port") Integer port,
Initialization initialization,
MetricsServer metricsServer,
RendererFactory rendererFactory,
HikariDataSource dataSource,
public ExplorerService(BaseServiceParams params,
RendererFactory rendererFactory,
HikariDataSource dataSource,
StaticResources staticResources
) {
super(ip, port, initialization, metricsServer);
super(params);
renderer = rendererFactory.renderer("explorer/explorer");

View File

@ -71,6 +71,16 @@ services:
- "127.0.0.1:7071:4000"
depends_on:
- mariadb
control-service:
<<: *service
image: "marginalia.nu/control-service"
container_name: "control-service"
ports:
- "127.0.0.1:5090:5090"
- "127.0.0.1:4090:5000"
- "127.0.0.1:7090:4000"
depends_on:
- mariadb
mariadb:
image: "mariadb/server:10.3"
container_name: "mariadb"

View File

@ -33,6 +33,9 @@ server {
proxy_pass http://assistant-service:5025/public$request_uri;
access_log off;
}
location /control/ {
proxy_pass http://control-service:5090/public/;
}
location / {
proxy_pass http://search-service:5023/public/;
}

View File

@ -7,6 +7,7 @@ include 'code:services-core:search-service'
include 'code:services-satellite:api-service'
include 'code:services-satellite:dating-service'
include 'code:services-satellite:explorer-service'
include 'code:services-satellite:control-service'
include 'code:libraries:array'
include 'code:libraries:btree'