(service) Keep disabled multi-noded services dormant when they are configured to be disabled.

This commit is contained in:
Viktor Lofgren 2023-10-14 20:58:55 +02:00
parent a9dff407a1
commit 108b4cb648
4 changed files with 103 additions and 0 deletions

View File

@ -14,6 +14,7 @@ dependencies {
implementation project(':code:common:service-discovery')
implementation project(':code:libraries:message-queue')
implementation project(':code:common:db')
implementation project(':code:common:config')
implementation libs.spark
implementation libs.guice

View File

@ -0,0 +1,97 @@
package nu.marginalia.service.server;
import com.google.inject.name.Named;
import jakarta.inject.Inject;
import nu.marginalia.nodecfg.NodeConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/** The node status watcher ensures that services that can be run on multiple nodes
* find the configuration they expect, and kills the services when a node is disabled.
* <br><br>
* Install the watcher by adding to the Main class an
* <br>
* <code>injector.getInstance(NodeStatusWatcher.class);</code>
* <br>
* before anything else is initialized.
*/
public class NodeStatusWatcher {
private static final Logger logger = LoggerFactory.getLogger(NodeStatusWatcher.class);
private final NodeConfigurationService configurationService;
private final int nodeId;
private final Duration pollDuration = Duration.ofSeconds(15);
@Inject
public NodeStatusWatcher(NodeConfigurationService configurationService,
@Named("wmsa-system-node") Integer nodeId) throws InterruptedException {
this.configurationService = configurationService;
this.nodeId = nodeId;
awaitConfiguration();
var watcherThread = new Thread(this::watcher, "node watcher");
watcherThread.setDaemon(true);
watcherThread.start();
}
/** Wait for the presence of an enabled NodeConfiguration before permitting the service to start */
private void awaitConfiguration() throws InterruptedException {
boolean complained = false;
for (;;) {
try {
var config = configurationService.get(nodeId);
if (null != config && !config.disabled()) {
return;
}
else if (!complained) {
logger.info("Waiting for node configuration, id = {}", nodeId);
complained = true;
}
}
catch (SQLException ex) {
logger.error("Error updating node status", ex);
}
TimeUnit.SECONDS.sleep(pollDuration.toSeconds());
}
}
/** Look for changes in the configuration and kill the service if the corresponding
* NodeConfiguration is set to be disabled.
*/
private void watcher() {
for (;;) {
try {
TimeUnit.SECONDS.sleep(pollDuration.toSeconds());
}
catch (InterruptedException ex) {
logger.error("Watcher thread interrupted", ex);
return;
}
try {
var config = configurationService.get(nodeId);
if (null == config || config.disabled()) {
logger.info("Current node disabled!! Shutting down!");
System.exit(0);
}
}
catch (SQLException ex) {
logger.error("Error updating node status", ex);
}
}
}
}

View File

@ -9,6 +9,7 @@ import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.NodeStatusWatcher;
public class ExecutorMain extends MainClass {
private final ExecutorSvc service;
@ -26,6 +27,7 @@ public class ExecutorMain extends MainClass {
new DatabaseModule(),
new ServiceConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Executor)
);
injector.getInstance(NodeStatusWatcher.class);
injector.getInstance(ExecutorMain.class);
injector.getInstance(Initialization.class).setReady();

View File

@ -9,6 +9,7 @@ import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.server.Initialization;
import nu.marginalia.service.server.NodeStatusWatcher;
public class IndexMain extends MainClass {
private final IndexService service;
@ -27,6 +28,8 @@ public class IndexMain extends MainClass {
new ServiceConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Index)
);
injector.getInstance(NodeStatusWatcher.class);
injector.getInstance(IndexMain.class);
injector.getInstance(Initialization.class).setReady();