From d7ab21fe3406b7d8172cbe483112c09bf4219d5e Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 17 Jul 2023 21:20:31 +0200 Subject: [PATCH] (*) Refactor Control Service and processes --- .../db/storage/model/FileStorageBaseType.java | 3 +- .../db/storage/model/FileStorageType.java | 1 + .../resources/sql/current/13-file-storage.sql | 4 +- .../java/nu/marginalia/mqsm/StateMachine.java | 24 +++++ .../nu/marginalia/process/log/WorkLog.java | 4 + .../service/module/DatabaseModule.java | 4 +- .../src/main/resources/log4j2.properties | 39 +++++++- .../forward/ForwardIndexConverterTest.java | 3 +- .../writer/IndexJournalWriterImpl.java | 3 +- .../ReverseIndexFullConverterTest.java | 3 +- .../ReverseIndexFullConverterTest2.java | 3 +- .../ReverseIndexPriorityConverterTest2.java | 3 +- .../nu/marginalia/dict/DictionaryMap.java | 10 ++ .../nu/marginalia/lexicon/KeywordLexicon.java | 43 +++++++- .../lexicon/KeywordLexiconReadOnlyView.java | 19 +++- .../journal/KeywordLexiconJournal.java | 66 ++++++++++--- .../KeywordLexiconJournalCommitQueue.java | 3 + .../journal/KeywordLexiconJournalFile.java | 1 - .../KeywordLexiconJournalFingerprint.java | 10 ++ .../journal/KeywordLexiconJournalMode.java | 6 ++ .../lexicon/KeywordLexiconTest.java | 5 +- .../src/main/java/plan/CrawlPlan.java | 2 +- .../marginalia/converting/ConverterMain.java | 8 +- .../nu/marginalia/loading/LoaderMain.java | 7 ++ .../nu/marginalia/loading/loader/Loader.java | 8 -- .../loader/LoaderIndexJournalWriter.java | 11 ++- .../java/nu/marginalia/index/IndexModule.java | 3 +- .../index/IndexServicesFactory.java | 27 ++--- .../marginalia/index/svc/IndexOpsService.java | 8 +- ...ndexQueryServiceIntegrationTestModule.java | 12 ++- .../nu/marginalia/control/ControlService.java | 91 ++++++++--------- .../marginalia/control/fsm/ControlFSMs.java | 33 ++++--- .../monitor/AbstractProcessSpawnerFSM.java | 2 +- .../control/fsm/task/ReconvertAndLoadFSM.java | 98 +++++++++---------- .../fsm/task/RepartitionReindexFSM.java | 20 ++-- .../control/model/ControlProcessState.java | 6 ++ .../control/model/FileStorageWithActions.java | 3 +- .../control/model/ProcessHeartbeat.java | 2 +- .../control/svc/ControlFsmService.java | 72 ++++++++++++++ .../control/svc/EventLogService.java | 62 ++++++++++++ .../control/svc/ProcessService.java | 11 ++- .../resources/templates/control/index.hdb | 2 +- .../templates/control/partials/nav.hdb | 6 +- .../control/partials/processes-table.hdb | 4 +- .../control/partials/services-table.hdb | 2 +- .../resources/templates/control/processes.hdb | 2 +- .../templates/control/service-by-id.hdb | 21 ++++ .../resources/templates/control/services.hdb | 2 +- .../resources/templates/control/storage.hdb | 4 +- run/env/service.env | 3 +- 50 files changed, 585 insertions(+), 204 deletions(-) create mode 100644 code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFingerprint.java create mode 100644 code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalMode.java create mode 100644 code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlFsmService.java create mode 100644 code/services-satellite/control-service/src/main/resources/templates/control/service-by-id.hdb diff --git a/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageBaseType.java b/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageBaseType.java index df9f497f..08d67069 100644 --- a/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageBaseType.java +++ b/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageBaseType.java @@ -3,5 +3,6 @@ package nu.marginalia.db.storage.model; public enum FileStorageBaseType { SSD_INDEX, SSD_WORK, - SLOW + SLOW, + BACKUP } diff --git a/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageType.java b/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageType.java index 390262ec..97eef136 100644 --- a/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageType.java +++ b/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageType.java @@ -8,5 +8,6 @@ public enum FileStorageType { LEXICON_STAGING, INDEX_LIVE, LEXICON_LIVE, + BACKUP, SEARCH_SETS } diff --git a/code/common/db/src/main/resources/sql/current/13-file-storage.sql b/code/common/db/src/main/resources/sql/current/13-file-storage.sql index af111186..763f39a0 100644 --- a/code/common/db/src/main/resources/sql/current/13-file-storage.sql +++ b/code/common/db/src/main/resources/sql/current/13-file-storage.sql @@ -2,7 +2,7 @@ CREATE TABLE IF NOT EXISTS FILE_STORAGE_BASE ( ID BIGINT PRIMARY KEY AUTO_INCREMENT, NAME VARCHAR(255) NOT NULL UNIQUE, PATH VARCHAR(255) NOT NULL UNIQUE COMMENT 'The path to the storage base', - TYPE ENUM ('SSD_INDEX', 'SSD_WORK', 'SLOW') NOT NULL, + TYPE ENUM ('SSD_INDEX', 'SSD_WORK', 'SLOW', 'BACKUP') NOT NULL, MUST_CLEAN BOOLEAN NOT NULL DEFAULT FALSE COMMENT 'If true, the storage must be cleaned after use', PERMIT_TEMP BOOLEAN NOT NULL DEFAULT FALSE COMMENT 'If true, the storage can be used for temporary files' ) @@ -14,7 +14,7 @@ CREATE TABLE IF NOT EXISTS FILE_STORAGE ( BASE_ID BIGINT NOT NULL, PATH VARCHAR(255) NOT NULL COMMENT 'The path to the storage relative to the base', DESCRIPTION VARCHAR(255) NOT NULL, - TYPE ENUM ('CRAWL_SPEC', 'CRAWL_DATA', 'PROCESSED_DATA', 'INDEX_STAGING', 'LEXICON_STAGING', 'INDEX_LIVE', 'LEXICON_LIVE', 'SEARCH_SETS') NOT NULL, + TYPE ENUM ('CRAWL_SPEC', 'CRAWL_DATA', 'PROCESSED_DATA', 'INDEX_STAGING', 'LEXICON_STAGING', 'INDEX_LIVE', 'LEXICON_LIVE', 'SEARCH_SETS', 'BACKUP') NOT NULL, DO_PURGE BOOLEAN NOT NULL DEFAULT FALSE COMMENT 'If true, the storage may be cleaned', CREATE_DATE TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), CONSTRAINT CONS UNIQUE (BASE_ID, PATH), diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java index 36ed81cf..c3b32cd6 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java @@ -129,6 +129,18 @@ public class StateMachine { smOutbox.notify(transition.state(), transition.message()); } + /** Initialize the state machine. */ + public void initFrom(String firstState) throws Exception { + var transition = StateTransition.to(firstState); + + synchronized (this) { + this.state = allStates.get(transition.state()); + notifyAll(); + } + + smOutbox.notify(transition.state(), transition.message()); + } + /** Initialize the state machine. */ public void init(String jsonEncodedArgument) throws Exception { var transition = StateTransition.to("INITIAL", jsonEncodedArgument); @@ -141,6 +153,18 @@ public class StateMachine { smOutbox.notify(transition.state(), transition.message()); } + /** Initialize the state machine. */ + public void initFrom(String state, String jsonEncodedArgument) throws Exception { + var transition = StateTransition.to(state, jsonEncodedArgument); + + synchronized (this) { + this.state = allStates.get(transition.state()); + notifyAll(); + } + + smOutbox.notify(transition.state(), transition.message()); + } + /** Resume the state machine from the last known state. */ private void resume() { diff --git a/code/common/process/src/main/java/nu/marginalia/process/log/WorkLog.java b/code/common/process/src/main/java/nu/marginalia/process/log/WorkLog.java index 86dd100c..b74ab5b4 100644 --- a/code/common/process/src/main/java/nu/marginalia/process/log/WorkLog.java +++ b/code/common/process/src/main/java/nu/marginalia/process/log/WorkLog.java @@ -93,4 +93,8 @@ public class WorkLog implements AutoCloseable { logWriter.flush(); logWriter.close(); } + + public int countFinishedJobs() { + return finishedJobs.size(); + } } diff --git a/code/common/service/src/main/java/nu/marginalia/service/module/DatabaseModule.java b/code/common/service/src/main/java/nu/marginalia/service/module/DatabaseModule.java index e3d660ad..70af3ed4 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/module/DatabaseModule.java +++ b/code/common/service/src/main/java/nu/marginalia/service/module/DatabaseModule.java @@ -90,8 +90,8 @@ public class DatabaseModule extends AbstractModule { config.addDataSourceProperty("prepStmtCacheSize", "250"); config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048"); - config.setMaximumPoolSize(100); - config.setMinimumIdle(10); + config.setMaximumPoolSize(20); + config.setMinimumIdle(2); config.setMaxLifetime(Duration.ofMinutes(9).toMillis()); diff --git a/code/common/service/src/main/resources/log4j2.properties b/code/common/service/src/main/resources/log4j2.properties index 66d688b0..96c73ea0 100644 --- a/code/common/service/src/main/resources/log4j2.properties +++ b/code/common/service/src/main/resources/log4j2.properties @@ -4,6 +4,22 @@ appender.console.type = Console appender.console.name = LogToConsole appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{HH:mm:ss,SSS} %style{%-8markerSimpleName}{FG_Cyan} %highlight{%-5level}{FATAL=red, ERROR=red, WARN=yellow} %-24t %-20c{1} -- %msg{nolookups}%n +appender.console.filter.process.type = MarkerFilter +appender.console.filter.process.onMismatch=ACCEPT +appender.console.filter.process.onMatch=DENY +appender.console.filter.process.marker=PROCESS +appender.console.filter.http.type = MarkerFilter +appender.console.filter.http.onMismatch=ACCEPT +appender.console.filter.http.onMatch=DENY +appender.console.filter.http.marker=HTTP +appender.processconsole.type = Console +appender.processconsole.name = ProcessLogToConsole +appender.processconsole.layout.type = PatternLayout +appender.processconsole.layout.pattern = %msg{nolookups}%n +appender.processconsole.filter.process.type = MarkerFilter +appender.processconsole.filter.process.onMismatch=DENY +appender.processconsole.filter.process.onMatch=ACCEPT +appender.processconsole.filter.process.marker=PROCESS appender.rolling.type = RollingFile appender.rolling.name = RollingFile appender.rolling.fileName = /var/log/wmsa/wmsa-${sys:service-name}.log @@ -23,6 +39,27 @@ appender.rolling.filter.http.type = MarkerFilter appender.rolling.filter.http.onMismatch=ACCEPT appender.rolling.filter.http.onMatch=DENY appender.rolling.filter.http.marker=HTTP +appender.rolling.filter.process.type = MarkerFilter +appender.rolling.filter.process.onMismatch=ACCEPT +appender.rolling.filter.process.onMatch=DENY +appender.rolling.filter.process.marker=PROCESS +appender.process.type = RollingFile +appender.process.name = ProcessFile +appender.process.fileName = /var/log/wmsa/process.log +appender.process.filePattern = /var/log/wmsa/process-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.process.layout.pattern = %msg{nolookups}%n +appender.process.layout.type = PatternLayout +appender.process.policies.type = Policies +appender.process.policies.size.type = SizeBasedTriggeringPolicy +appender.process.policies.size.size=10MB +appender.process.strategy.type = DefaultRolloverStrategy +appender.process.strategy.max = 10 +appender.process.filter.process.type = MarkerFilter +appender.process.filter.process.onMismatch=DENY +appender.process.filter.process.onMatch=ACCEPT +appender.process.filter.process.marker=PROCESS rootLogger.level = info rootLogger.appenderRef.console.ref = LogToConsole -rootLogger.appenderRef.rolling.ref = RollingFile \ No newline at end of file +rootLogger.appenderRef.processconsole.ref = ProcessLogToConsole +rootLogger.appenderRef.rolling.ref = RollingFile +rootLogger.appenderRef.process.ref = ProcessFile \ No newline at end of file diff --git a/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java b/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java index 8e8bc252..c2411575 100644 --- a/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java +++ b/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java @@ -5,6 +5,7 @@ import nu.marginalia.dict.OffHeapDictionaryHashMap; import nu.marginalia.index.journal.model.IndexJournalEntry; import nu.marginalia.index.journal.writer.IndexJournalWriterImpl; import nu.marginalia.index.journal.writer.IndexJournalWriter; +import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; @@ -45,7 +46,7 @@ class ForwardIndexConverterTest { dictionaryFile = Files.createTempFile("tmp", ".dict"); dictionaryFile.toFile().deleteOnExit(); - keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile())); + keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile(), KeywordLexiconJournalMode.READ_WRITE)); keywordLexicon.getOrInsert("0"); indexFile = Files.createTempFile("tmp", ".idx"); diff --git a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/writer/IndexJournalWriterImpl.java b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/writer/IndexJournalWriterImpl.java index c9bf44cd..4406350f 100644 --- a/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/writer/IndexJournalWriterImpl.java +++ b/code/features-index/index-journal/src/main/java/nu.marginalia.index/journal/writer/IndexJournalWriterImpl.java @@ -27,7 +27,8 @@ public class IndexJournalWriterImpl implements IndexJournalWriter{ this.lexicon = lexicon; this.outputFile = outputFile; - var fileStream = Files.newOutputStream(outputFile, StandardOpenOption.CREATE); + var fileStream = Files.newOutputStream(outputFile, StandardOpenOption.CREATE, + StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); writeHeaderPlaceholder(fileStream); diff --git a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest.java b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest.java index a61f2a91..01df3e2f 100644 --- a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest.java +++ b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest.java @@ -8,6 +8,7 @@ import nu.marginalia.index.journal.model.IndexJournalEntry; import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile; import nu.marginalia.index.journal.writer.IndexJournalWriterImpl; import nu.marginalia.index.journal.writer.IndexJournalWriter; +import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; @@ -42,7 +43,7 @@ class ReverseIndexFullConverterTest { dictionaryFile = Files.createTempFile("tmp", ".dict"); dictionaryFile.toFile().deleteOnExit(); - keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile())); + keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile(), KeywordLexiconJournalMode.READ_WRITE)); keywordLexicon.getOrInsert("0"); indexFile = Files.createTempFile("tmp", ".idx"); diff --git a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java index 5ce603c1..4488912b 100644 --- a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java +++ b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java @@ -10,6 +10,7 @@ import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile import nu.marginalia.index.journal.writer.IndexJournalWriterImpl; import nu.marginalia.index.journal.writer.IndexJournalWriter; import nu.marginalia.index.priority.ReverseIndexPriorityParameters; +import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; @@ -52,7 +53,7 @@ class ReverseIndexFullConverterTest2 { dictionaryFile = Files.createTempFile("tmp", ".dict"); dictionaryFile.toFile().deleteOnExit(); - keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile())); + keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile(), KeywordLexiconJournalMode.READ_WRITE)); keywordLexicon.getOrInsert("0"); indexFile = Files.createTempFile("tmp", ".idx"); diff --git a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java index 21d6198b..d634c175 100644 --- a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java +++ b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java @@ -12,6 +12,7 @@ import nu.marginalia.index.priority.ReverseIndexPriorityConverter; import nu.marginalia.index.priority.ReverseIndexPriorityParameters; import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; +import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.test.TestUtil; import org.junit.jupiter.api.AfterEach; @@ -52,7 +53,7 @@ class ReverseIndexPriorityConverterTest2 { dictionaryFile = Files.createTempFile("tmp", ".dict"); dictionaryFile.toFile().deleteOnExit(); - keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile())); + keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile(), KeywordLexiconJournalMode.READ_WRITE)); keywordLexicon.getOrInsert("0"); indexFile = Files.createTempFile("tmp", ".idx"); diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryMap.java b/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryMap.java index 1f9525a2..260015be 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryMap.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryMap.java @@ -1,9 +1,19 @@ package nu.marginalia.dict; +/** Backing store for the KeywordLexicon, available in on and off-heap versions. + *

+ * The off-heap version is necessary when loading a lexicon that is too large to fit in RAM, due + * to Java's 2GB limit on the size of a single array. It is slower and less optimized than the on-heap version. + *

+ * The off-heap version is on the precipice of being deprecated and its use is discouraged. + */ public interface DictionaryMap { int NO_VALUE = Integer.MIN_VALUE; static DictionaryMap create() { + // Default to on-heap version + // TODO: Make this configurable + return new OnHeapDictionaryMap(); } diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexicon.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexicon.java index bd88efc8..4929b9c1 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexicon.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexicon.java @@ -6,6 +6,7 @@ import io.prometheus.client.Gauge; import lombok.SneakyThrows; import nu.marginalia.dict.DictionaryMap; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; +import nu.marginalia.lexicon.journal.KeywordLexiconJournalFingerprint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,6 +17,19 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +/** The keyword lexicon is used to map keywords to unique numeric IDs. + * This class is used to both construct the lexicon, and to read from it. + *

+ * Readers will want to use the KeywordLexiconReadOnlyView wrapper, as it + * only exposes read-only methods and hides the mutating methods. + *

+ * Between instances, the lexicon is stored in a journal file, exactly in the + * order they were received by the writer. The journal file is then replayed + * on startup to reconstruct the lexicon, giving each term an ID according to + * the order they are loaded. It is therefore important that the journal file + * is not tampered with, as this will cause the lexicon to be corrupted. + * */ + public class KeywordLexicon implements AutoCloseable { private final DictionaryMap reverseIndex; @@ -30,6 +44,8 @@ public class KeywordLexicon implements AutoCloseable { .register(); private final KeywordLexiconJournal journal; + private volatile KeywordLexiconJournalFingerprint fingerprint = null; + @SneakyThrows public KeywordLexicon(KeywordLexiconJournal keywordLexiconJournal) { @@ -42,21 +58,36 @@ public class KeywordLexicon implements AutoCloseable { logger.error("MULTIPLE LEXICON INSTANCES!"); } - journal.loadFile(bytes -> reverseIndex.put(hashFunction.hashBytes(bytes).padToLong())); + reload(); logger.info("Done creating dictionary writer"); } - public void reload() throws IOException { - logger.info("Reloading dictionary writer"); - journal.loadFile(bytes -> reverseIndex.put(hashFunction.hashBytes(bytes).padToLong())); - logger.info("Done reloading dictionary writer"); + public boolean needsReload() throws IOException { + var newFingerprint = journal.journalFingerprint(); + return !newFingerprint.equals(fingerprint); } + /** Reload the lexicon from the journal */ + public void reload() throws IOException { + var lock = memoryLock.writeLock(); + lock.lock(); + try { + reverseIndex.clear(); + journal.loadFile(bytes -> reverseIndex.put(hashFunction.hashBytes(bytes).padToLong())); + fingerprint = journal.journalFingerprint(); + } + finally { + lock.unlock(); + } + } + + /** Get method that inserts the word into the lexicon if it is not present */ public int getOrInsert(String macroWord) { return getOrInsert(macroWord.getBytes(StandardCharsets.UTF_8)); } + /** Get method that inserts the word into the lexicon if it is not present */ @SneakyThrows private int getOrInsert(byte[] bytes) { if (bytes.length >= Byte.MAX_VALUE) { @@ -96,11 +127,13 @@ public class KeywordLexicon implements AutoCloseable { } } + /** Get method that does not modify the lexicon if the word is not present */ public int getReadOnly(String word) { final byte[] bytes = word.getBytes(StandardCharsets.UTF_8); return getReadOnly(hashFunction.hashBytes(bytes).padToLong()); } + /** Get method that does not modify the lexicon if the word is not present */ public int getReadOnly(long hashedKey) { Lock lock = memoryLock.readLock(); try { diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexiconReadOnlyView.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexiconReadOnlyView.java index ba7983a5..076cc84d 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexiconReadOnlyView.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexiconReadOnlyView.java @@ -3,13 +3,19 @@ package nu.marginalia.lexicon; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import lombok.SneakyThrows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.TimeUnit; +/** A read-only view of a keyword lexicon. + * + * @see KeywordLexicon + * */ public class KeywordLexiconReadOnlyView { private final KeywordLexicon writer; - + private final Logger logger = LoggerFactory.getLogger(getClass()); private final Cache cache = CacheBuilder.newBuilder().maximumSize(10_000).expireAfterAccess(60, TimeUnit.SECONDS).build(); @SneakyThrows @@ -22,8 +28,15 @@ public class KeywordLexiconReadOnlyView { return cache.get(word, () -> writer.getReadOnly(word)); } - public boolean reload() throws IOException { - writer.reload(); + public boolean suggestReload() throws IOException { + if (writer.needsReload()) { + logger.info("Reloading lexicon"); + writer.reload(); + cache.invalidateAll(); + } + else { + logger.info("Foregoing lexicon reload"); + } return true; } } diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournal.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournal.java index 013f2c49..01ba412b 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournal.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournal.java @@ -5,35 +5,70 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; import java.util.List; import java.util.function.Consumer; +/** The journal for the keyword lexicon. + * It's used both for writing the lexicon, but also for reconstructing it for reading later. + */ public class KeywordLexiconJournal { private static final boolean noCommit = Boolean.getBoolean("DictionaryJournal.noCommit"); private final KeywordLexiconJournalCommitQueue commitQueue; - private final KeywordLexiconJournalFile journalFile; + private KeywordLexiconJournalFile journalFile; private final Logger logger = LoggerFactory.getLogger(getClass()); private final Thread commitToDiskThread; private volatile boolean running = true; + private final Path journalFilePath; - public KeywordLexiconJournal(File file) throws IOException { - commitQueue = new KeywordLexiconJournalCommitQueue(); - journalFile = new KeywordLexiconJournalFile(file); + /** Create a new journal. + * + * @param file The file to use for the journal. + * @param mode The mode to use for the journal. If READ_ONLY, the journal will be read-only and refuse + * to accept new entries. + */ + public KeywordLexiconJournal(File file, KeywordLexiconJournalMode mode) throws IOException { + journalFilePath = file.toPath(); - commitToDiskThread = new Thread(this::commitToDiskRunner, "CommitToDiskThread"); - commitToDiskThread.start(); + if (mode == KeywordLexiconJournalMode.READ_WRITE) { + commitQueue = new KeywordLexiconJournalCommitQueue(); + journalFile = new KeywordLexiconJournalFile(file); - Runtime.getRuntime().addShutdownHook(new Thread(this::commitToDisk)); + commitToDiskThread = new Thread(this::commitToDiskRunner, "CommitToDiskThread"); + commitToDiskThread.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(this::commitToDisk)); + } + else { + journalFile = new KeywordLexiconJournalFile(file); + + commitQueue = null; + commitToDiskThread = null; + } } public void enqueue(byte[] word) throws InterruptedException { + if (null == commitQueue) + throw new UnsupportedOperationException("Lexicon journal is read-only"); + commitQueue.enqueue(word); } + public KeywordLexiconJournalFingerprint journalFingerprint() throws IOException { + var attributes = Files.readAttributes(journalFilePath, BasicFileAttributes.class); + + long cTime = attributes.creationTime().toMillis(); + long mTime = attributes.lastModifiedTime().toMillis(); + long size = attributes.size(); + + return new KeywordLexiconJournalFingerprint(cTime, mTime, size); + } public void commitToDiskRunner() { if (noCommit) return; @@ -57,14 +92,23 @@ public class KeywordLexiconJournal { public void close() throws Exception { logger.info("Closing Journal"); running = false; - commitToDiskThread.join(); - commitToDisk(); - journalFile.close(); + if (commitToDiskThread != null) { + commitToDiskThread.join(); + commitToDisk(); + } + + if (journalFile != null) { + journalFile.close(); + } } public void loadFile(Consumer loadJournalEntry) throws IOException { - journalFile.rewind(); + if (journalFile != null) { + journalFile.close(); + } + + journalFile = new KeywordLexiconJournalFile(journalFilePath.toFile()); journalFile.loadFile(loadJournalEntry); } } diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalCommitQueue.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalCommitQueue.java index 7c6a460f..8ff12d6d 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalCommitQueue.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalCommitQueue.java @@ -7,6 +7,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +/** An in-memory queue for lexicon journal entries used to improve the performance of + * large bursts of insert-operations. + */ class KeywordLexiconJournalCommitQueue { private final ArrayList commitQueue = new ArrayList<>(10_000); private final Logger logger = LoggerFactory.getLogger(getClass()); diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFile.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFile.java index f7404296..81789891 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFile.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFile.java @@ -1,6 +1,5 @@ package nu.marginalia.lexicon.journal; -import lombok.SneakyThrows; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFingerprint.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFingerprint.java new file mode 100644 index 00000000..a08d7124 --- /dev/null +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFingerprint.java @@ -0,0 +1,10 @@ +package nu.marginalia.lexicon.journal; + +/** Contains values used to assess whether the lexicon is in sync with the journal + * or if it has been replaced with a newer version and should be reloaded + * */ +public record KeywordLexiconJournalFingerprint(long createdTime, + long mTime, + long sizeBytes) +{ +} diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalMode.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalMode.java new file mode 100644 index 00000000..6208fc47 --- /dev/null +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalMode.java @@ -0,0 +1,6 @@ +package nu.marginalia.lexicon.journal; + +public enum KeywordLexiconJournalMode { + READ_ONLY, + READ_WRITE +} diff --git a/code/features-index/lexicon/src/test/java/nu/marginalia/lexicon/KeywordLexiconTest.java b/code/features-index/lexicon/src/test/java/nu/marginalia/lexicon/KeywordLexiconTest.java index ca044e5e..98249c27 100644 --- a/code/features-index/lexicon/src/test/java/nu/marginalia/lexicon/KeywordLexiconTest.java +++ b/code/features-index/lexicon/src/test/java/nu/marginalia/lexicon/KeywordLexiconTest.java @@ -2,6 +2,7 @@ package nu.marginalia.lexicon; import nu.marginalia.dict.OnHeapDictionaryMap; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; +import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -23,7 +24,7 @@ public class KeywordLexiconTest { public void setUp() throws IOException { journalFile = Files.createTempFile(getClass().getSimpleName(), ".dat"); - var lexiconJournal = new KeywordLexiconJournal(journalFile.toFile()); + var lexiconJournal = new KeywordLexiconJournal(journalFile.toFile(), KeywordLexiconJournalMode.READ_WRITE); lexicon = new KeywordLexicon(lexiconJournal); } @@ -64,7 +65,7 @@ public class KeywordLexiconTest { int c = lexicon.getOrInsert("ccc"); lexicon.commitToDisk(); - var lexiconJournal = new KeywordLexiconJournal(journalFile.toFile()); + var lexiconJournal = new KeywordLexiconJournal(journalFile.toFile(), KeywordLexiconJournalMode.READ_WRITE); try (var anotherLexicon = new KeywordLexicon(lexiconJournal)) { assertEquals(a, anotherLexicon.getReadOnly("aaa")); assertEquals(b, anotherLexicon.getReadOnly("bbb")); diff --git a/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java b/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java index b425e29b..655525d6 100644 --- a/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java +++ b/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java @@ -109,7 +109,7 @@ public class CrawlPlan { return WorkLog.iterableMap(crawl.getLogFile(), entry -> { - if (!idPredicate.test(entry.path())) { + if (!idPredicate.test(entry.id())) { return Optional.empty(); } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index 36e5b558..5488a6c2 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX; @@ -135,7 +136,12 @@ public class ConverterMain { }; - for (var domain : plan.domainsIterable(id -> !processLog.isJobFinished(id))) { + // Advance the progress bar to the current position if this is a resumption + processedDomains.set(processLog.countFinishedJobs()); + heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains); + + for (var domain : plan.domainsIterable(id -> !processLog.isJobFinished(id))) + { pipe.accept(domain); } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java index f6ccc79d..5dff9388 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java @@ -7,6 +7,7 @@ import com.google.inject.Injector; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; import nu.marginalia.db.storage.FileStorageService; +import nu.marginalia.loading.loader.IndexLoadKeywords; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.inbox.MqInboxResponse; @@ -40,6 +41,7 @@ public class LoaderMain { private final ProcessHeartbeat heartbeat; private final MessageQueueFactory messageQueueFactory; private final FileStorageService fileStorageService; + private final IndexLoadKeywords indexLoadKeywords; private final Gson gson; private volatile boolean running = true; @@ -65,6 +67,7 @@ public class LoaderMain { ProcessHeartbeat heartbeat, MessageQueueFactory messageQueueFactory, FileStorageService fileStorageService, + IndexLoadKeywords indexLoadKeywords, Gson gson ) { @@ -73,6 +76,7 @@ public class LoaderMain { this.heartbeat = heartbeat; this.messageQueueFactory = messageQueueFactory; this.fileStorageService = fileStorageService; + this.indexLoadKeywords = indexLoadKeywords; this.gson = gson; heartbeat.start(); @@ -122,6 +126,9 @@ public class LoaderMain { running = false; processorThread.join(); instructions.ok(); + + // This needs to be done in order to have a readable index journal + indexLoadKeywords.close(); } catch (Exception ex) { logger.error("Failed to load", ex); diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java index 66eea626..21216b35 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java @@ -144,12 +144,4 @@ public class Loader implements Interpreter { sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList); } - public void close() { - try { - indexLoadKeywords.close(); - } - catch (Exception ex) { - logger.error("Error when closing the index loader", ex); - } - } } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java index 35f8e79f..14962f9b 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java @@ -12,6 +12,7 @@ import nu.marginalia.index.journal.writer.IndexJournalWriter; import nu.marginalia.keyword.model.DocumentKeywords; import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; +import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.model.idx.DocumentMetadata; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; @@ -20,6 +21,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.attribute.PosixFilePermissions; import java.sql.SQLException; import java.util.Arrays; @@ -38,7 +41,13 @@ public class LoaderIndexJournalWriter { var lexiconPath = lexiconArea.asPath().resolve("dictionary.dat"); var indexPath = indexArea.asPath().resolve("page-index.dat"); - lexicon = new KeywordLexicon(new KeywordLexiconJournal(lexiconPath.toFile())); + Files.deleteIfExists(lexiconPath); + Files.deleteIfExists(indexPath); + + Files.createFile(indexPath, PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--"))); + Files.createFile(lexiconPath, PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--"))); + + lexicon = new KeywordLexicon(new KeywordLexiconJournal(lexiconPath.toFile(), KeywordLexiconJournalMode.READ_WRITE)); indexWriter = new IndexJournalWriterImpl(lexicon, indexPath); } diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexModule.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexModule.java index a0bad25d..e0a3b2de 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexModule.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexModule.java @@ -11,6 +11,7 @@ import nu.marginalia.WmsaHome; import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.KeywordLexiconReadOnlyView; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; +import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.service.control.ServiceEventLog; import java.nio.file.Path; @@ -32,7 +33,7 @@ public class IndexModule extends AbstractModule { var area = fileStorageService.getStorageByType(FileStorageType.LEXICON_LIVE); var path = area.asPath().resolve("dictionary.dat"); - return new KeywordLexiconReadOnlyView(new KeywordLexicon(new KeywordLexiconJournal(path.toFile()))); + return new KeywordLexiconReadOnlyView(new KeywordLexicon(new KeywordLexiconJournal(path.toFile(), KeywordLexiconJournalMode.READ_ONLY))); } finally { eventLog.logEvent("INDEX-LEXICON-LOAD-OK", ""); diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexServicesFactory.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexServicesFactory.java index 11008677..eafd3d57 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexServicesFactory.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexServicesFactory.java @@ -38,7 +38,7 @@ public class IndexServicesFactory { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final PartitionedDataFile writerIndexFile; + private final Path writerIndexFile; private final PartitionedDataFile fwdIndexDocId; private final PartitionedDataFile fwdIndexDocData; @@ -67,7 +67,7 @@ public class IndexServicesFactory { Files.createDirectories(tmpFileDir); } - writerIndexFile = new PartitionedDataFile(stagingStorage, "page-index.dat"); + writerIndexFile = stagingStorage.resolve("page-index.dat"); fwdIndexDocId = new PartitionedDataFile(liveStorage, "fwd-doc-id.dat"); fwdIndexDocData = new PartitionedDataFile(liveStorage, "fwd-doc-data.dat"); @@ -85,7 +85,7 @@ public class IndexServicesFactory { public boolean isPreconvertedIndexPresent() { return Stream.of( - writerIndexFile.get(LIVE_PART).toPath() + writerIndexFile ).allMatch(Files::exists); } @@ -100,10 +100,6 @@ public class IndexServicesFactory { ).noneMatch(Files::exists); } - public IndexJournalWriter createIndexJournalWriter(KeywordLexicon lexicon) throws IOException { - return new IndexJournalWriterImpl(lexicon, writerIndexFile.get(LIVE_PART).toPath()); - } - public void convertIndex(DomainRankings domainRankings) throws IOException { convertForwardIndex(domainRankings); convertFullReverseIndex(domainRankings); @@ -111,11 +107,9 @@ public class IndexServicesFactory { } private void convertFullReverseIndex(DomainRankings domainRankings) throws IOException { - var source = writerIndexFile.get(0).toPath(); + logger.info("Converting full reverse index {}", writerIndexFile); - logger.info("Converting full reverse index {}", source); - - var journalReader = new IndexJournalReaderSingleCompressedFile(source); + var journalReader = new IndexJournalReaderSingleCompressedFile(writerIndexFile); var converter = new ReverseIndexFullConverter(tmpFileDir, journalReader, domainRankings, @@ -129,11 +123,9 @@ public class IndexServicesFactory { private void convertPriorityReverseIndex(DomainRankings domainRankings) throws IOException { - var source = writerIndexFile.get(0).toPath(); + logger.info("Converting priority reverse index {}", writerIndexFile); - logger.info("Converting priority reverse index {}", source); - - var journalReader = new IndexJournalReaderSingleCompressedFile(source, null, + var journalReader = new IndexJournalReaderSingleCompressedFile(writerIndexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord); var converter = new ReverseIndexPriorityConverter(tmpFileDir, @@ -149,11 +141,10 @@ public class IndexServicesFactory { private void convertForwardIndex(DomainRankings domainRankings) throws IOException { - var source = writerIndexFile.get(0); - logger.info("Converting forward index data {}", source); + logger.info("Converting forward index data {}", writerIndexFile); - new ForwardIndexConverter(source, + new ForwardIndexConverter(writerIndexFile.toFile(), fwdIndexDocId.get(NEXT_PART).toPath(), fwdIndexDocData.get(NEXT_PART).toPath(), domainRankings) diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java index 31192d37..22e514d8 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java @@ -9,7 +9,6 @@ import spark.Response; import spark.Spark; import javax.annotation.CheckReturnValue; -import java.io.IOException; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.locks.ReentrantLock; @@ -39,10 +38,13 @@ public class IndexOpsService { return run(searchSetService::recalculateAll); } public boolean reindex() throws Exception { - return run(index::switchIndex).isPresent(); + return run(() -> { + return index.switchIndex() && lexicon.suggestReload(); + }).isPresent(); } + public boolean reloadLexicon() throws Exception { - return run(lexicon::reload).isPresent(); + return run(lexicon::suggestReload).isPresent(); } public Object repartitionEndpoint(Request request, Response response) throws Exception { diff --git a/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTestModule.java b/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTestModule.java index 77ea0a2e..997e2a74 100644 --- a/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTestModule.java +++ b/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTestModule.java @@ -6,9 +6,11 @@ import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.index.IndexServicesFactory; import nu.marginalia.index.journal.writer.IndexJournalWriter; +import nu.marginalia.index.journal.writer.IndexJournalWriterImpl; import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.KeywordLexiconReadOnlyView; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; +import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.index.svc.searchset.SearchSetAny; import nu.marginalia.index.util.TestUtil; @@ -70,15 +72,19 @@ public class IndexQueryServiceIntegrationTestModule extends AbstractModule { when(setsServiceMock.getDomainRankings()).thenReturn(new DomainRankings()); bind(IndexSearchSetsService.class).toInstance(setsServiceMock); - var keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(slowDir.resolve("dictionary.dat").toFile())); + var keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal( + slowDir.resolve("dictionary.dat").toFile(), + KeywordLexiconJournalMode.READ_WRITE) + ); bind(KeywordLexicon.class).toInstance(keywordLexicon); bind(KeywordLexiconReadOnlyView.class).toInstance(new KeywordLexiconReadOnlyView(keywordLexicon)); - bind(IndexJournalWriter.class).toInstance(servicesFactory.createIndexJournalWriter(keywordLexicon)); - bind(ServiceEventLog.class).toInstance(Mockito.mock(ServiceEventLog.class)); bind(ServiceHeartbeat.class).toInstance(Mockito.mock(ServiceHeartbeat.class)); + bind(IndexJournalWriter.class).toInstance(new IndexJournalWriterImpl(keywordLexicon, + slowDir.resolve("page-index.dat"))); + bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration( ServiceId.Index, 0, diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java index a3db382b..2c04fe6f 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -3,10 +3,7 @@ package nu.marginalia.control; import com.google.gson.Gson; import com.google.inject.Inject; import nu.marginalia.client.ServiceMonitors; -import nu.marginalia.control.model.ControlProcess; -import nu.marginalia.control.fsm.ControlFSMs; import nu.marginalia.control.svc.*; -import nu.marginalia.db.storage.model.FileStorageId; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.renderer.MustacheRenderer; import nu.marginalia.renderer.RendererFactory; @@ -26,11 +23,12 @@ public class ControlService extends Service { private final Gson gson = GsonFactory.get(); private final ServiceMonitors monitors; - private final MustacheRenderer indexRenderer; - private final MustacheRenderer> servicesRenderer; - private final MustacheRenderer> processesRenderer; - private final MustacheRenderer> storageRenderer; + private final HeartbeatService heartbeatService; + private final EventLogService eventLogService; + private final ControlFsmService controlFsmService; private final StaticResources staticResources; + private final MessageQueueViewService messageQueueViewService; + private final ControlFileStorageService controlFileStorageService; @Inject @@ -39,7 +37,7 @@ public class ControlService extends Service { HeartbeatService heartbeatService, EventLogService eventLogService, RendererFactory rendererFactory, - ControlFSMs controlFSMs, + ControlFsmService controlFsmService, StaticResources staticResources, MessageQueueViewService messageQueueViewService, ControlFileStorageService controlFileStorageService @@ -47,13 +45,20 @@ public class ControlService extends Service { super(params); this.monitors = monitors; + this.heartbeatService = heartbeatService; + this.eventLogService = eventLogService; - indexRenderer = rendererFactory.renderer("control/index"); - servicesRenderer = rendererFactory.renderer("control/services"); - processesRenderer = rendererFactory.renderer("control/processes"); - storageRenderer = rendererFactory.renderer("control/storage"); + var indexRenderer = rendererFactory.renderer("control/index"); + var servicesRenderer = rendererFactory.renderer("control/services"); + var serviceByIdRenderer = rendererFactory.renderer("control/service-by-id"); + var processesRenderer = rendererFactory.renderer("control/processes"); + var storageRenderer = rendererFactory.renderer("control/storage"); + + this.controlFsmService = controlFsmService; this.staticResources = staticResources; + this.messageQueueViewService = messageQueueViewService; + this.controlFileStorageService = controlFileStorageService; Spark.get("/public/heartbeats", (req, res) -> { res.type("application/json"); @@ -62,45 +67,21 @@ public class ControlService extends Service { Spark.get("/public/", (req, rsp) -> indexRenderer.render(Map.of())); - Spark.get("/public/services", - (req, rsp) -> Map.of("services", heartbeatService.getServiceHeartbeats(), - "events", eventLogService.getLastEntries(20)), - (map) -> servicesRenderer.render((Map) map)); - - Spark.get("/public/processes", - (req, rsp) -> Map.of("processes", heartbeatService.getProcessHeartbeats(), - "fsms", controlFSMs.getFsmStates(), - "messages", messageQueueViewService.getLastEntries(20)), - (map) -> processesRenderer.render((Map) map)); - - Spark.get("/public/storage", - (req, rsp) -> Map.of("storage", controlFileStorageService.getStorageList()), - (map) -> storageRenderer.render((Map) map)); + Spark.get("/public/services", this::servicesModel, servicesRenderer::render); + Spark.get("/public/services/:id", this::serviceModel, serviceByIdRenderer::render); + Spark.get("/public/processes", this::processesModel, processesRenderer::render); + Spark.get("/public/storage", this::storageModel, storageRenderer::render); final HtmlRedirect redirectToServices = new HtmlRedirect("/services"); final HtmlRedirect redirectToProcesses = new HtmlRedirect("/processes"); final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage"); - Spark.post("/public/fsms/:fsm/start", (req, rsp) -> { - controlFSMs.start(ControlProcess.valueOf(req.params("fsm").toUpperCase())); - return ""; - }, redirectToProcesses); + Spark.post("/public/fsms/:fsm/start", controlFsmService::startFsm, redirectToProcesses); + Spark.post("/public/fsms/:fsm/stop", controlFsmService::stopFsm, redirectToProcesses); - Spark.post("/public/fsms/:fsm/stop", (req, rsp) -> { - controlFSMs.stop(ControlProcess.valueOf(req.params("fsm").toUpperCase())); - return ""; - }, redirectToProcesses); + Spark.post("/public/storage/:fid/process", controlFsmService::triggerProcessing, redirectToProcesses); + Spark.post("/public/storage/:fid/load", controlFsmService::loadProcessedData, redirectToProcesses); - // TODO: This should be a POST - Spark.get("/public/repartition", (req, rsp) -> { - controlFSMs.start(ControlProcess.REPARTITION_REINDEX); - return ""; - } , redirectToProcesses); - - Spark.post("/public/storage/:fid/process", (req, rsp) -> { - controlFSMs.start(ControlProcess.RECONVERT_LOAD, FileStorageId.of(Integer.parseInt(req.params("fid")))); - return ""; - }, redirectToProcesses); Spark.post("/public/storage/:fid/delete", controlFileStorageService::flagFileForDeletionRequest, redirectToStorage); Spark.get("/public/:resource", this::serveStatic); @@ -108,6 +89,28 @@ public class ControlService extends Service { monitors.subscribe(this::logMonitorStateChange); } + private Object serviceModel(Request request, Response response) { + String serviceName = request.params("id"); + + return Map.of( + "id", serviceName, + "events", eventLogService.getLastEntriesForService(serviceName, 20)); + } + + private Object storageModel(Request request, Response response) { + return Map.of("storage", controlFileStorageService.getStorageList()); + } + + private Object servicesModel(Request request, Response response) { + return Map.of("services", heartbeatService.getServiceHeartbeats(), + "events", eventLogService.getLastEntries(20)); + } + + private Object processesModel(Request request, Response response) { + return Map.of("processes", heartbeatService.getProcessHeartbeats(), + "fsms", controlFsmService.getFsmStates(), + "messages", messageQueueViewService.getLastEntries(20)); + } private Object serveStatic(Request request, Response response) { String resource = request.params("resource"); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/ControlFSMs.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/ControlFSMs.java index 0c756114..4945c6d5 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/ControlFSMs.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/ControlFSMs.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; @Singleton public class ControlFSMs { @@ -68,33 +69,39 @@ public class ControlFSMs { eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state); } + public void startFrom(ControlProcess process, String state) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).initFrom(state); + } + public void start(ControlProcess process) throws Exception { eventLog.logEvent("FSM-START", process.id()); stateMachines.get(process).init(); } + public void startFrom(ControlProcess process, String state, Object arg) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).initFrom(state, gson.toJson(arg)); + } + public void start(ControlProcess process, Object arg) throws Exception { eventLog.logEvent("FSM-START", process.id()); stateMachines.get(process).init(gson.toJson(arg)); } - public List getFsmStates() { - return stateMachines.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> { - - final MachineState state = e.getValue().getState(); - - final String machineName = e.getKey().name(); - final String stateName = state.name(); - final boolean terminal = state.isFinal(); - - return new ControlProcessState(machineName, stateName, terminal); - }).toList(); - } - @SneakyThrows public void stop(ControlProcess fsm) { stateMachines.get(fsm).abortExecution(); } + + public Map getMachineStates() { + return stateMachines.entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, e -> e.getValue().getState()) + ); + } } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/AbstractProcessSpawnerFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/AbstractProcessSpawnerFSM.java index 75944553..90a704c9 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/AbstractProcessSpawnerFSM.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/AbstractProcessSpawnerFSM.java @@ -58,7 +58,7 @@ public class AbstractProcessSpawnerFSM extends AbstractStateGraph { } } - @GraphState(name = RUN) + @GraphState(name = RUN, resume = ResumeBehavior.RESTART) public void run(Integer attempts) throws Exception { try { processService.trigger(processId); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java index 781882a5..f3e625a6 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java @@ -23,7 +23,13 @@ import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.search.client.SearchClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -32,23 +38,19 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph { // STATES - private static final String INITIAL = "INITIAL"; - private static final String RECONVERT = "RECONVERT"; - private static final String RECONVERT_WAIT = "RECONVERT_WAIT"; - private static final String LOAD = "LOAD"; - private static final String LOAD_WAIT = "LOAD_WAIT"; - private static final String MOVE_INDEX_FILES = "MOVE_INDEX_FILES"; - private static final String RELOAD_LEXICON = "RELOAD_LEXICON"; - private static final String RELOAD_LEXICON_WAIT = "RELOAD_LEXICON_WAIT"; - private static final String FLUSH_CACHES = "FLUSH_CACHES"; - private static final String END = "END"; + public static final String INITIAL = "INITIAL"; + public static final String RECONVERT = "RECONVERT"; + public static final String RECONVERT_WAIT = "RECONVERT-WAIT"; + public static final String LOAD = "LOAD"; + public static final String LOAD_WAIT = "LOAD-WAIT"; + public static final String SWAP_LEXICON = "SWAP-LEXICON"; + public static final String END = "END"; private final ProcessService processService; - private final MqOutbox mqIndexOutbox; - private final MqOutbox mqSearchOutbox; private final MqOutbox mqConverterOutbox; private final MqOutbox mqLoaderOutbox; private final FileStorageService storageService; private final Gson gson; + private final Logger logger = LoggerFactory.getLogger(getClass()); @AllArgsConstructor @With @NoArgsConstructor @@ -62,17 +64,13 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph { @Inject public ReconvertAndLoadFSM(StateFactory stateFactory, ProcessService processService, - IndexClient indexClient, ProcessOutboxFactory processOutboxFactory, - SearchClient searchClient, FileStorageService storageService, Gson gson ) { super(stateFactory); this.processService = processService; - this.mqIndexOutbox = indexClient.outbox(); - this.mqSearchOutbox = searchClient.outbox(); this.mqConverterOutbox = processOutboxFactory.createConverterOutbox(); this.mqLoaderOutbox = processOutboxFactory.createLoaderOutbox(); this.storageService = storageService; @@ -92,8 +90,12 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph { @GraphState(name = RECONVERT, next = RECONVERT_WAIT, resume = ResumeBehavior.ERROR) public Message reconvert(Message message) throws Exception { // Create processed data area + + var toProcess = storageService.getStorage(message.crawlStorageId); + var base = storageService.getStorageBase(FileStorageBaseType.SLOW); - var processedArea = storageService.allocateTemporaryStorage(base, FileStorageType.PROCESSED_DATA, "processed-data", "Processed Data"); + var processedArea = storageService.allocateTemporaryStorage(base, FileStorageType.PROCESSED_DATA, "processed-data", + "Processed Data; " + toProcess.description()); // Pre-send convert request var request = new ConvertRequest(message.crawlStorageId, processedArea.id()); @@ -124,7 +126,7 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph { } - @GraphState(name = LOAD_WAIT, next = END, resume = ResumeBehavior.RETRY) + @GraphState(name = LOAD_WAIT, next = SWAP_LEXICON, resume = ResumeBehavior.RETRY) public void loadWait(Message message) throws Exception { var rsp = waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, message.loaderMsgId); @@ -132,6 +134,33 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph { error("Loader failed"); } + + + @GraphState(name = SWAP_LEXICON, next = END, resume = ResumeBehavior.RETRY) + public void swapLexicon(Message message) throws Exception { + var live = storageService.getStorageByType(FileStorageType.LEXICON_LIVE); + + var staging = storageService.getStorageByType(FileStorageType.LEXICON_STAGING); + var fromSource = staging.asPath().resolve("dictionary.dat"); + var liveDest = live.asPath().resolve("dictionary.dat"); + + // Backup live lexicon + var backupBase = storageService.getStorageBase(FileStorageBaseType.BACKUP); + var backup = storageService.allocateTemporaryStorage(backupBase, FileStorageType.BACKUP, + "lexicon", "Lexicon Backup; " + LocalDateTime.now()); + + Path backupDest = backup.asPath().resolve("dictionary.dat"); + + logger.info("Moving " + liveDest + " to " + backupDest); + Files.move(liveDest, backupDest); + + // Swap in new lexicon + logger.info("Moving " + fromSource + " to " + liveDest); + Files.move(fromSource, liveDest, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE); + } + + + public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception { if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) { error("Process " + processId + " did not launch"); @@ -162,37 +191,4 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph { return false; } -// @GraphState(name = MOVE_INDEX_FILES, next = RELOAD_LEXICON, resume = ResumeBehavior.ERROR) -// public void moveIndexFiles(String crawlJob) throws Exception { -// Path indexData = Path.of("/vol/index.dat"); -// Path indexDest = Path.of("/vol/iw/0/page-index.dat"); -// -// if (!Files.exists(indexData)) -// error("Index data not found"); -// -// Files.move(indexData, indexDest, StandardCopyOption.REPLACE_EXISTING); -// } -// -// @GraphState(name = RELOAD_LEXICON, next = RELOAD_LEXICON_WAIT, resume = ResumeBehavior.ERROR) -// public long reloadLexicon() throws Exception { -// return mqIndexOutbox.sendAsync(IndexMqEndpoints.INDEX_RELOAD_LEXICON, ""); -// } -// -// @GraphState(name = RELOAD_LEXICON_WAIT, next = FLUSH_CACHES, resume = ResumeBehavior.RETRY) -// public void reloadLexiconWait(long id) throws Exception { -// var rsp = mqIndexOutbox.waitResponse(id); -// -// if (rsp.state() != MqMessageState.OK) { -// error("RELOAD_LEXICON failed"); -// } -// } -// -// @GraphState(name = FLUSH_CACHES, next = END, resume = ResumeBehavior.RETRY) -// public void flushCaches() throws Exception { -// var rsp = mqSearchOutbox.send(SearchMqEndpoints.FLUSH_CACHES, ""); -// -// if (rsp.state() != MqMessageState.OK) { -// error("FLUSH_CACHES failed"); -// } -// } } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/RepartitionReindexFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/RepartitionReindexFSM.java index ed3aad0a..deb72004 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/RepartitionReindexFSM.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/RepartitionReindexFSM.java @@ -18,12 +18,12 @@ public class RepartitionReindexFSM extends AbstractStateGraph { // STATES - private static final String INITIAL = "INITIAL"; - private static final String REPARTITION = "REPARTITION"; - private static final String REPARTITION_REPLY = "REPARTITION-REPLY"; - private static final String REINDEX = "REINDEX"; - private static final String REINDEX_REPLY = "REINDEX-REPLY"; - private static final String END = "END"; + public static final String INITIAL = "INITIAL"; + public static final String REPARTITION = "REPARTITION"; + public static final String REPARTITION_WAIT = "REPARTITION-WAIT"; + public static final String REINDEX = "REINDEX"; + public static final String REINDEX_WAIT = "REINDEX-WAIT"; + public static final String END = "END"; @Inject @@ -43,12 +43,12 @@ public class RepartitionReindexFSM extends AbstractStateGraph { } } - @GraphState(name = REPARTITION, next = REPARTITION_REPLY) + @GraphState(name = REPARTITION, next = REPARTITION_WAIT) public Long repartition() throws Exception { return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, ""); } - @GraphState(name = REPARTITION_REPLY, next = REINDEX, resume = ResumeBehavior.RETRY) + @GraphState(name = REPARTITION_WAIT, next = REINDEX, resume = ResumeBehavior.RETRY) public void repartitionReply(Long id) throws Exception { var rsp = indexOutbox.waitResponse(id); @@ -57,12 +57,12 @@ public class RepartitionReindexFSM extends AbstractStateGraph { } } - @GraphState(name = REINDEX, next = REINDEX_REPLY) + @GraphState(name = REINDEX, next = REINDEX_WAIT) public Long reindex() throws Exception { return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, ""); } - @GraphState(name = REINDEX_REPLY, next = END, resume = ResumeBehavior.RETRY) + @GraphState(name = REINDEX_WAIT, next = END, resume = ResumeBehavior.RETRY) public void reindexReply(Long id) throws Exception { var rsp = indexOutbox.waitResponse(id); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcessState.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcessState.java index 39d69ebd..a7324164 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcessState.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcessState.java @@ -5,6 +5,12 @@ public record ControlProcessState(String name, String state, boolean terminal) { if (terminal) { return "\uD83D\uDE34"; } + else if (state.equals("MONITOR")) { + return "\uD83D\uDD26"; + } + else if (state.endsWith("WAIT") || state.endsWith("REPLY")) { + return "\uD83D\uDD59"; + } else { return "\uD83C\uDFC3"; } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java index 927262d2..674e92bc 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java @@ -11,6 +11,7 @@ public record FileStorageWithActions(FileStorage storage) { return storage.type() == FileStorageType.CRAWL_DATA; } public boolean isDeletable() { - return storage.type() == FileStorageType.PROCESSED_DATA; + return storage.type() == FileStorageType.PROCESSED_DATA + || storage.type() == FileStorageType.BACKUP; } } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java index e92a2a1a..47640dde 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java @@ -31,7 +31,7 @@ public record ProcessHeartbeat( public String progressStyle() { if ("RUNNING".equals(status) && progress != null) { return """ - background: linear-gradient(90deg, #fff 0%%, #ccc %d%%, #fff %d%%) + background: linear-gradient(90deg, #ccc 0%%, #ccc %d%%, #fff %d%%) """.formatted(progress, progress, progress); } return ""; diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlFsmService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlFsmService.java new file mode 100644 index 00000000..24e5fd51 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlFsmService.java @@ -0,0 +1,72 @@ +package nu.marginalia.control.svc; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.control.fsm.ControlFSMs; +import nu.marginalia.control.fsm.task.ReconvertAndLoadFSM; +import nu.marginalia.control.model.ControlProcess; +import nu.marginalia.control.model.ControlProcessState; +import nu.marginalia.db.storage.model.FileStorageId; +import nu.marginalia.mqsm.state.MachineState; +import spark.Request; +import spark.Response; + +import java.util.List; +import java.util.Map; + +@Singleton +public class ControlFsmService { + private final ControlFSMs controlFSMs; + + @Inject + public ControlFsmService(ControlFSMs controlFSMs) { + this.controlFSMs = controlFSMs; + } + + public Object startFsm(Request req, Response rsp) throws Exception { + controlFSMs.start( + ControlProcess.valueOf(req.params("fsm").toUpperCase()) + ); + return ""; + } + + public Object stopFsm(Request req, Response rsp) throws Exception { + controlFSMs.stop( + ControlProcess.valueOf(req.params("fsm").toUpperCase()) + ); + return ""; + } + + public Object triggerProcessing(Request request, Response response) throws Exception { + controlFSMs.start( + ControlProcess.RECONVERT_LOAD, + FileStorageId.of(Integer.parseInt(request.params("fid"))) + ); + return ""; + } + + public Object loadProcessedData(Request request, Response response) throws Exception { + var fid = FileStorageId.of(Integer.parseInt(request.params("fid"))); + + // Start the FSM from the intermediate state that triggers the load + controlFSMs.startFrom( + ControlProcess.RECONVERT_LOAD, + ReconvertAndLoadFSM.LOAD, + new ReconvertAndLoadFSM.Message(null, fid, 0L, 0L) + ); + + return ""; + } + + public Object getFsmStates() { + return controlFSMs.getMachineStates().entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> { + + final MachineState state = e.getValue(); + final String machineName = e.getKey().name(); + final String stateName = state.name(); + final boolean terminal = state.isFinal(); + + return new ControlProcessState(machineName, stateName, terminal); + }).toList(); + } +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/EventLogService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/EventLogService.java index 8167c71c..d2cd6bcb 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/EventLogService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/EventLogService.java @@ -45,4 +45,66 @@ public class EventLogService { } } + public List getLastEntriesForService(String serviceName, int n) { + try (var conn = dataSource.getConnection(); + var query = conn.prepareStatement(""" + SELECT SERVICE_NAME, INSTANCE, EVENT_TIME, EVENT_TYPE, EVENT_MESSAGE + FROM SERVICE_EVENTLOG + WHERE SERVICE_NAME = ? + ORDER BY ID DESC + LIMIT ? + """)) { + + query.setString(1, serviceName); + query.setInt(2, n); + + List entries = new ArrayList<>(n); + var rs = query.executeQuery(); + while (rs.next()) { + entries.add(new EventLogEntry( + rs.getString("SERVICE_NAME"), + rs.getString("INSTANCE"), + rs.getTimestamp("EVENT_TIME").toLocalDateTime().toLocalTime().toString(), + rs.getString("EVENT_TYPE"), + rs.getString("EVENT_MESSAGE") + )); + } + return entries; + } + catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + + + public List getLastEntriesForInstance(String instance, int n) { + try (var conn = dataSource.getConnection(); + var query = conn.prepareStatement(""" + SELECT SERVICE_NAME, INSTANCE, EVENT_TIME, EVENT_TYPE, EVENT_MESSAGE + FROM SERVICE_EVENTLOG + WHERE INSTANCE = ? + ORDER BY ID DESC + LIMIT ? + """)) { + + query.setString(1, instance); + query.setInt(2, n); + + List entries = new ArrayList<>(n); + var rs = query.executeQuery(); + while (rs.next()) { + entries.add(new EventLogEntry( + rs.getString("SERVICE_NAME"), + rs.getString("INSTANCE"), + rs.getTimestamp("EVENT_TIME").toLocalDateTime().toLocalTime().toString(), + rs.getString("EVENT_TYPE"), + rs.getString("EVENT_MESSAGE") + )); + } + return entries; + } + catch (SQLException ex) { + throw new RuntimeException(ex); + } + } } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java index e1034921..009ccdc8 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java @@ -5,6 +5,8 @@ import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.BaseServiceParams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; import spark.utils.IOUtils; import javax.inject.Inject; @@ -21,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap; @Singleton public class ProcessService { private final Logger logger = LoggerFactory.getLogger(getClass()); + private final Marker processMarker = MarkerFactory.getMarker("PROCESS"); + private final ServiceEventLog eventLog; private final Path distPath; @@ -74,9 +78,9 @@ public class ProcessService { while (process.isAlive()) { if (es.ready()) - logger.warn("{}:{}", processId, es.readLine()); + logger.warn(processMarker, es.readLine()); if (os.ready()) - logger.debug("{}:{}", processId, os.readLine()); + logger.info(processMarker, os.readLine()); } return 0 == process.waitFor(); @@ -116,6 +120,9 @@ public class ProcessService { } opts.put("WMSA_HOME", WMSA_HOME); opts.put("JAVA_HOME", System.getenv("JAVA_HOME")); + opts.put("CONVERTER_OPTS", System.getenv("CONVERTER_OPTS")); + opts.put("LOADER_OPTS", System.getenv("LOADER_OPTS")); + opts.put("CRAWLER_OPTS", System.getenv("CRAWLER_OPTS")); return opts.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).toArray(String[]::new); } diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/index.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/index.hdb index 71647683..b1034529 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/index.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/index.hdb @@ -3,7 +3,7 @@ Control Service - + {{> control/partials/nav}} diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/partials/nav.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/partials/nav.hdb index e3f38897..974502d5 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/partials/nav.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/partials/nav.hdb @@ -1,8 +1,8 @@ \ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/partials/processes-table.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/partials/processes-table.hdb index 47d7dc64..4547e76b 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/partials/processes-table.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/partials/processes-table.hdb @@ -9,14 +9,14 @@ Last Seen (ms) {{#each processes}} - + {{processId}}    {{uuid}} {{status}} - {{#if progress}}{{progress}}%{{/if}} + {{#if progress}}{{progress}}%{{/if}} {{#unless isStopped}}{{lastSeenMillis}}{{/unless}} {{/each}} diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/partials/services-table.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/partials/services-table.hdb index 2137f1fe..5da46a83 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/partials/services-table.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/partials/services-table.hdb @@ -7,7 +7,7 @@ {{#each services}} - {{serviceId}} + {{serviceId}}    {{uuid}} diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/processes.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/processes.hdb index 7d348be1..114b340d 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/processes.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/processes.hdb @@ -3,7 +3,7 @@ Control Service - + {{> control/partials/nav}} diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/service-by-id.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/service-by-id.hdb new file mode 100644 index 00000000..5b1fe6b4 --- /dev/null +++ b/code/services-satellite/control-service/src/main/resources/templates/control/service-by-id.hdb @@ -0,0 +1,21 @@ + + + + Control Service + + + + + {{> control/partials/nav}} +
+

Services/{{id}}

+ {{> control/partials/events-table }} +
+ + + + \ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/services.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/services.hdb index 2c0542b9..2e73dd92 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/services.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/services.hdb @@ -3,7 +3,7 @@ Control Service - + {{> control/partials/nav}} diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb index 68410646..1674d6f5 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb @@ -3,12 +3,12 @@ Control Service - + {{> control/partials/nav}}
- +

Storage

{{#each storage}} diff --git a/run/env/service.env b/run/env/service.env index db871699..dfa012b3 100644 --- a/run/env/service.env +++ b/run/env/service.env @@ -1,2 +1,3 @@ WMSA_HOME=run/ -CONTROL_SERVICE_OPTS="-DdistPath=/dist" \ No newline at end of file +CONTROL_SERVICE_OPTS="-DdistPath=/dist" +CONVERTER_OPTS="-ea -Xmx16G -XX:-CompactStrings -XX:+UseParallelGC -XX:GCTimeRatio=14 -XX:ParallelGCThreads=15" \ No newline at end of file