(*) Add node-affinity to services, processes and file storage.

This commit is contained in:
Viktor Lofgren 2023-10-10 12:32:22 +02:00
parent 61288c5e68
commit 199c459697
23 changed files with 154 additions and 53 deletions

View File

@ -1,5 +1,6 @@
package nu.marginalia.db.storage;
import com.google.inject.name.Named;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.db.storage.model.*;
@ -25,25 +26,11 @@ import java.util.concurrent.TimeUnit;
@Singleton
public class FileStorageService {
private final HikariDataSource dataSource;
private final int node;
private final Logger logger = LoggerFactory.getLogger(FileStorageService.class);
private static final DateTimeFormatter dirNameDatePattern = DateTimeFormatter.ofPattern("__uu-MM-dd'T'HH_mm_ss.SSS"); // filesystem safe ISO8601
@Inject
public FileStorageService(HikariDataSource dataSource) {
this.dataSource = dataSource;
for (var type : FileStorageType.values()) {
String overrideProperty = System.getProperty(type.overrideName());
if (overrideProperty == null || overrideProperty.isBlank())
continue;
logger.info("FileStorage override present: {} -> {}", type,
FileStorage.createOverrideStorage(type, overrideProperty).asPath());
}
}
public Optional<FileStorage> findFileStorageToDelete() {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
@ -59,6 +46,22 @@ public class FileStorageService {
return Optional.empty();
}
@Inject
public FileStorageService(HikariDataSource dataSource, @Named("wmsa-system-node") Integer node) {
this.dataSource = dataSource;
this.node = node;
for (var type : FileStorageType.values()) {
String overrideProperty = System.getProperty(type.overrideName());
if (overrideProperty == null || overrideProperty.isBlank())
continue;
logger.info("FileStorage override present: {} -> {}", type,
FileStorage.createOverrideStorage(type, overrideProperty).asPath());
}
}
/** @return the storage base with the given id, or null if it does not exist */
public FileStorageBase getStorageBase(FileStorageBaseId type) throws SQLException {
try (var conn = dataSource.getConnection();
@ -173,9 +176,10 @@ public class FileStorageService {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT ID, NAME, PATH, TYPE, PERMIT_TEMP
FROM FILE_STORAGE_BASE WHERE TYPE = ?
FROM FILE_STORAGE_BASE WHERE TYPE = ? AND NODE = ?
""")) {
stmt.setString(1, type.name());
stmt.setInt(2, node);
try (var rs = stmt.executeQuery()) {
if (rs.next()) {
return new FileStorageBase(
@ -199,13 +203,14 @@ public class FileStorageService {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
INSERT INTO FILE_STORAGE_BASE(NAME, PATH, TYPE, PERMIT_TEMP)
VALUES (?, ?, ?, ?)
INSERT INTO FILE_STORAGE_BASE(NAME, PATH, TYPE, PERMIT_TEMP, NODE)
VALUES (?, ?, ?, ?, ?)
""")) {
stmt.setString(1, name);
stmt.setString(2, path.toString());
stmt.setString(3, type.name());
stmt.setBoolean(4, permitTemp);
stmt.setInt(5, node);
int update = stmt.executeUpdate();
if (update < 0) {
@ -360,9 +365,10 @@ public class FileStorageService {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT PATH, DESCRIPTION, ID, BASE_ID, CREATE_DATE
FROM FILE_STORAGE_VIEW WHERE TYPE = ?
FROM FILE_STORAGE_VIEW WHERE TYPE = ? AND NODE = ?
""")) {
stmt.setString(1, type.name());
stmt.setInt(2, node);
long storageId;
long baseId;

View File

@ -0,0 +1,21 @@
ALTER TABLE FILE_STORAGE_BASE MODIFY COLUMN NAME VARCHAR(255) NOT NULL;
ALTER TABLE FILE_STORAGE_BASE MODIFY COLUMN PATH VARCHAR(255) NOT NULL;
DROP INDEX PATH ON FILE_STORAGE_BASE;
DROP INDEX NAME ON FILE_STORAGE_BASE;
ALTER TABLE FILE_STORAGE_BASE ADD COLUMN NODE INT NOT NULL DEFAULT -1;
CREATE UNIQUE INDEX FILE_STORAGE_BASE__NODE_NAME ON FILE_STORAGE_BASE(NODE, NAME);
CREATE UNIQUE INDEX FILE_STORAGE_BASE__NODE_PATH ON FILE_STORAGE_BASE(NODE, PATH);
DROP VIEW FILE_STORAGE_VIEW;
CREATE VIEW FILE_STORAGE_VIEW
AS SELECT
CONCAT(BASE.PATH, '/', STORAGE.PATH) AS PATH,
STORAGE.TYPE AS TYPE,
NODE AS NODE,
DESCRIPTION AS DESCRIPTION,
CREATE_DATE AS CREATE_DATE,
STORAGE.ID AS ID,
BASE.ID AS BASE_ID
FROM FILE_STORAGE STORAGE
INNER JOIN FILE_STORAGE_BASE BASE ON STORAGE.BASE_ID=BASE.ID;

View File

@ -11,13 +11,16 @@ import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import static org.junit.Assert.*;
@ -48,6 +51,29 @@ public class FileStorageServiceTest {
config.setPassword("wmsa");
dataSource = new HikariDataSource(config);
// apply migrations
List<String> migrations = List.of("db/migration/V23_11_0_000__file_storage_node.sql");
for (String migration : migrations) {
try (var resource = Objects.requireNonNull(ClassLoader.getSystemResourceAsStream(migration),
"Could not load migration script " + migration);
var conn = dataSource.getConnection();
var stmt = conn.createStatement()
) {
String script = new String(resource.readAllBytes());
String[] cmds = script.split("\\s*;\\s*");
for (String cmd : cmds) {
if (cmd.isBlank())
continue;
System.out.println(cmd);
stmt.executeUpdate(cmd);
}
}
catch (IOException|SQLException ex) {
}
}
}
@ -58,7 +84,7 @@ public class FileStorageServiceTest {
System.setProperty(type.overrideName(), "");
}
fileStorageService = new FileStorageService(dataSource);
fileStorageService = new FileStorageService(dataSource, 0);
}
@AfterEach
@ -101,14 +127,14 @@ public class FileStorageServiceTest {
public void testOverride() throws SQLException {
System.setProperty(FileStorageType.BACKUP.overrideName(), "/tmp");
System.out.println(FileStorageType.BACKUP.overrideName());
fileStorageService = new FileStorageService(dataSource);
fileStorageService = new FileStorageService(dataSource, 0);
Assertions.assertEquals(Path.of("/tmp"), fileStorageService.getStorageByType(FileStorageType.BACKUP).asPath());
}
@Test
public void testCreateBase() throws SQLException, FileNotFoundException {
String name = "test-" + UUID.randomUUID();
var storage = new FileStorageService(dataSource);
var storage = new FileStorageService(dataSource, 0);
var base = storage.createStorageBase(name, createTempDir(), FileStorageBaseType.SLOW, false);
Assertions.assertEquals(name, base.name());
@ -119,7 +145,7 @@ public class FileStorageServiceTest {
public void testAllocateTempInNonPermitted() throws SQLException, FileNotFoundException {
String name = "test-" + UUID.randomUUID();
var storage = new FileStorageService(dataSource);
var storage = new FileStorageService(dataSource, 0);
var base = storage.createStorageBase(name, createTempDir(), FileStorageBaseType.SLOW, false);
@ -138,7 +164,7 @@ public class FileStorageServiceTest {
public void testAllocatePermanentInNonPermitted() throws SQLException, IOException {
String name = "test-" + UUID.randomUUID();
var storage = new FileStorageService(dataSource);
var storage = new FileStorageService(dataSource, 0);
var base = storage.createStorageBase(name, createTempDir(), FileStorageBaseType.SLOW, false);
@ -153,7 +179,7 @@ public class FileStorageServiceTest {
public void testAllocateTempInPermitted() throws IOException, SQLException {
String name = "test-" + UUID.randomUUID();
var storage = new FileStorageService(dataSource);
var storage = new FileStorageService(dataSource, 0);
var base = storage.createStorageBase(name, createTempDir(), FileStorageBaseType.SLOW, true);
var fileStorage = storage.allocateTemporaryStorage(base, FileStorageType.CRAWL_DATA, "xyz", "thisShouldSucceed");

View File

@ -0,0 +1,41 @@
package nu.marginalia;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import java.util.Objects;
import java.util.UUID;
public class ProcessConfigurationModule extends AbstractModule {
private final String processName;
public ProcessConfigurationModule(String processName) {
this.processName = processName;
}
public void configure() {
bind(Integer.class).annotatedWith(Names.named("wmsa-system-node")).toInstance(getNode());
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration(processName, getNode(), UUID.randomUUID()));
}
private int getNode() {
String nodeEnv = System.getenv("WMSA_PROCESS_NODE");
if (null == nodeEnv) {
// fallback logic, try to inherit from parent
nodeEnv = System.getenv("WMSA_SERVICE_NODE");
}
//
if (null == nodeEnv) {
throw new IllegalStateException("Either WMSA_PROCESS_NODE or WMSA_SERVICE_NODE must be set, indicating the node affinity of the process!");
}
return Integer.parseInt(nodeEnv);
}
}

View File

@ -1,17 +1,18 @@
package nu.marginalia.service.module;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import java.util.Objects;
import java.util.UUID;
public class ConfigurationModule extends AbstractModule {
public class ServiceConfigurationModule extends AbstractModule {
private final ServiceDescriptors descriptors;
private final ServiceId id;
public ConfigurationModule(ServiceDescriptors descriptors, ServiceId id) {
public ServiceConfigurationModule(ServiceDescriptors descriptors, ServiceId id) {
this.descriptors = descriptors;
this.id = id;
}
@ -19,14 +20,17 @@ public class ConfigurationModule extends AbstractModule {
public void configure() {
bind(ServiceDescriptors.class).toInstance(descriptors);
int node = getNode();
var configObject = new ServiceConfiguration(id,
getNode(),
node,
getHost(),
getBasePort(),
getPrometheusPort(),
UUID.randomUUID()
);
bind(Integer.class).annotatedWith(Names.named("wmsa-system-node")).toInstance(node);
bind(ServiceConfiguration.class).toInstance(configObject);
}

View File

@ -4,6 +4,7 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.converting.sideload.SideloadSourceFactory;
@ -25,7 +26,6 @@ import nu.marginalia.converting.processor.DomainProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collection;
@ -49,6 +49,7 @@ public class ConverterMain {
public static void main(String... args) throws Exception {
Injector injector = Guice.createInjector(
new ConverterModule(),
new ProcessConfigurationModule("converter"),
new DatabaseModule()
);

View File

@ -8,8 +8,6 @@ import nu.marginalia.ProcessConfiguration;
import nu.marginalia.WmsaHome;
import nu.marginalia.model.gson.GsonFactory;
import java.util.UUID;
public class ConverterModule extends AbstractModule {
public ConverterModule() {
@ -18,8 +16,6 @@ public class ConverterModule extends AbstractModule {
public void configure() {
bind(Gson.class).toInstance(createGson());
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("converter", 0, UUID.randomUUID()));
bind(Double.class).annotatedWith(Names.named("min-document-quality")).toInstance(-15.);
bind(Integer.class).annotatedWith(Names.named("min-document-length")).toInstance(250);
bind(Integer.class).annotatedWith(Names.named("max-title-length")).toInstance(128);

View File

@ -4,6 +4,7 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.retreival.CrawlDataReference;
@ -99,6 +100,7 @@ public class CrawlerMain {
Injector injector = Guice.createInjector(
new CrawlerModule(),
new ProcessConfigurationModule("crawler"),
new DatabaseModule()
);
var crawler = injector.getInstance(CrawlerMain.class);

View File

@ -2,6 +2,7 @@ package nu.marginalia.crawl;
import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import lombok.SneakyThrows;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.UserAgent;
@ -15,7 +16,6 @@ public class CrawlerModule extends AbstractModule {
public void configure() {
bind(Gson.class).toInstance(createGson());
bind(UserAgent.class).toInstance(WmsaHome.getUserAgent());
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("crawler", 0, UUID.randomUUID()));
}
private Gson createGson() {

View File

@ -3,6 +3,7 @@ package nu.marginalia.index;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageType;
@ -49,6 +50,7 @@ public class IndexConstructorMain {
var main = Guice.createInjector(
new IndexConstructorModule(),
new ProcessConfigurationModule("index-constructor"),
new DatabaseModule())
.getInstance(IndexConstructorMain.class);

View File

@ -8,7 +8,5 @@ import java.util.UUID;
public class IndexConstructorModule extends AbstractModule {
@Override
public void configure() {
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("index-constructor", 0, UUID.randomUUID()));
}
}

View File

@ -6,6 +6,7 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import lombok.Getter;
import lombok.SneakyThrows;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.linkdb.LinkdbWriter;
import nu.marginalia.loading.documents.DocumentLoaderService;
@ -54,6 +55,7 @@ public class LoaderMain {
new org.mariadb.jdbc.Driver();
Injector injector = Guice.createInjector(
new ProcessConfigurationModule("loader"),
new LoaderModule(),
new DatabaseModule()
);

View File

@ -30,7 +30,6 @@ public class LoaderModule extends AbstractModule {
public void configure() {
bind(ServiceDescriptors.class).toInstance(SearchServiceDescriptors.descriptors);
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("loader", 0, UUID.randomUUID()));
bind(Gson.class).toProvider(this::createGson);
bind(Path.class).annotatedWith(Names.named("local-index-path")).toInstance(Path.of(System.getProperty("local-index-path", "/vol")));

View File

@ -6,7 +6,7 @@ import com.google.inject.Injector;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ConfigurationModule;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.server.Initialization;
@ -21,7 +21,7 @@ public class ApiMain extends MainClass {
Injector injector = Guice.createInjector(
new DatabaseModule(),
new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Api));
new ServiceConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Api));
injector.getInstance(ApiMain.class);
injector.getInstance(Initialization.class).setReady();
}

View File

@ -6,7 +6,7 @@ import com.google.inject.Injector;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ConfigurationModule;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.server.Initialization;
import spark.Spark;
@ -26,7 +26,7 @@ public class DatingMain extends MainClass {
Injector injector = Guice.createInjector(
new DatingModule(),
new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Dating),
new ServiceConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Dating),
new DatabaseModule()
);

View File

@ -6,7 +6,7 @@ import com.google.inject.Injector;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ConfigurationModule;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.server.Initialization;
import spark.Spark;
@ -25,7 +25,7 @@ public class ExplorerMain extends MainClass {
Spark.staticFileLocation("/static/explore/");
Injector injector = Guice.createInjector(
new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Explorer),
new ServiceConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Explorer),
new DatabaseModule()
);

View File

@ -6,7 +6,7 @@ import com.google.inject.Injector;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ConfigurationModule;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.server.Initialization;
import spark.Spark;
@ -27,7 +27,7 @@ public class SearchMain extends MainClass {
Injector injector = Guice.createInjector(
new SearchModule(),
new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Search),
new ServiceConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Search),
new DatabaseModule()
);

View File

@ -6,7 +6,7 @@ import com.google.inject.Injector;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ConfigurationModule;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.server.Initialization;
@ -23,7 +23,7 @@ public class AssistantMain extends MainClass {
Injector injector = Guice.createInjector(
new AssistantModule(),
new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Assistant),
new ServiceConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Assistant),
new DatabaseModule()
);

View File

@ -6,7 +6,7 @@ import com.google.inject.Injector;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ConfigurationModule;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.server.Initialization;
@ -22,7 +22,7 @@ public class ControlMain extends MainClass {
Injector injector = Guice.createInjector(
new DatabaseModule(),
new ControlProcessModule(),
new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Control));
new ServiceConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Control));
injector.getInstance(ControlMain.class);
injector.getInstance(Initialization.class).setReady();

View File

@ -117,6 +117,7 @@ public class ProcessService {
* along with WMSA_HOME, but it has special logic */
private final List<String> propagatedEnvironmentVariables = List.of(
"JAVA_HOME",
"WMSA_SERVICE_NODE",
"CONVERTER_PROCESS_OPTS",
"LOADER_PROCESS_OPTS",
"INDEX_CONSTRUCTION_PROCESS_OPTS",

View File

@ -6,7 +6,7 @@ import com.google.inject.Injector;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ConfigurationModule;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.server.Initialization;
@ -25,7 +25,7 @@ public class IndexMain extends MainClass {
new IndexTablesModule(),
new IndexModule(),
new DatabaseModule(),
new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Index)
new ServiceConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Index)
);
injector.getInstance(IndexMain.class);

View File

@ -6,7 +6,7 @@ import com.google.inject.Injector;
import nu.marginalia.service.MainClass;
import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.module.ConfigurationModule;
import nu.marginalia.service.module.ServiceConfigurationModule;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.server.Initialization;
@ -24,7 +24,7 @@ public class QueryMain extends MainClass {
Injector injector = Guice.createInjector(
new QueryModule(),
new DatabaseModule(),
new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Query)
new ServiceConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Query)
);
injector.getInstance(QueryMain.class);

View File

@ -93,6 +93,8 @@ services:
- "127.0.0.1:4090:5000"
- "127.0.0.1:7090:4000"
- "127.0.0.1:7099:4001"
environment:
- "WMSA_SERVICE_NODE=0"
depends_on:
- mariadb
mariadb: