(*) Refactor Control Service and processes

This commit is contained in:
Viktor Lofgren 2023-07-17 21:20:31 +02:00
parent bca4bbb6c8
commit d7ab21fe34
50 changed files with 585 additions and 204 deletions

View File

@ -3,5 +3,6 @@ package nu.marginalia.db.storage.model;
public enum FileStorageBaseType {
SSD_INDEX,
SSD_WORK,
SLOW
SLOW,
BACKUP
}

View File

@ -8,5 +8,6 @@ public enum FileStorageType {
LEXICON_STAGING,
INDEX_LIVE,
LEXICON_LIVE,
BACKUP,
SEARCH_SETS
}

View File

@ -2,7 +2,7 @@ CREATE TABLE IF NOT EXISTS FILE_STORAGE_BASE (
ID BIGINT PRIMARY KEY AUTO_INCREMENT,
NAME VARCHAR(255) NOT NULL UNIQUE,
PATH VARCHAR(255) NOT NULL UNIQUE COMMENT 'The path to the storage base',
TYPE ENUM ('SSD_INDEX', 'SSD_WORK', 'SLOW') NOT NULL,
TYPE ENUM ('SSD_INDEX', 'SSD_WORK', 'SLOW', 'BACKUP') NOT NULL,
MUST_CLEAN BOOLEAN NOT NULL DEFAULT FALSE COMMENT 'If true, the storage must be cleaned after use',
PERMIT_TEMP BOOLEAN NOT NULL DEFAULT FALSE COMMENT 'If true, the storage can be used for temporary files'
)
@ -14,7 +14,7 @@ CREATE TABLE IF NOT EXISTS FILE_STORAGE (
BASE_ID BIGINT NOT NULL,
PATH VARCHAR(255) NOT NULL COMMENT 'The path to the storage relative to the base',
DESCRIPTION VARCHAR(255) NOT NULL,
TYPE ENUM ('CRAWL_SPEC', 'CRAWL_DATA', 'PROCESSED_DATA', 'INDEX_STAGING', 'LEXICON_STAGING', 'INDEX_LIVE', 'LEXICON_LIVE', 'SEARCH_SETS') NOT NULL,
TYPE ENUM ('CRAWL_SPEC', 'CRAWL_DATA', 'PROCESSED_DATA', 'INDEX_STAGING', 'LEXICON_STAGING', 'INDEX_LIVE', 'LEXICON_LIVE', 'SEARCH_SETS', 'BACKUP') NOT NULL,
DO_PURGE BOOLEAN NOT NULL DEFAULT FALSE COMMENT 'If true, the storage may be cleaned',
CREATE_DATE TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
CONSTRAINT CONS UNIQUE (BASE_ID, PATH),

View File

@ -129,6 +129,18 @@ public class StateMachine {
smOutbox.notify(transition.state(), transition.message());
}
/** Initialize the state machine. */
public void initFrom(String firstState) throws Exception {
var transition = StateTransition.to(firstState);
synchronized (this) {
this.state = allStates.get(transition.state());
notifyAll();
}
smOutbox.notify(transition.state(), transition.message());
}
/** Initialize the state machine. */
public void init(String jsonEncodedArgument) throws Exception {
var transition = StateTransition.to("INITIAL", jsonEncodedArgument);
@ -141,6 +153,18 @@ public class StateMachine {
smOutbox.notify(transition.state(), transition.message());
}
/** Initialize the state machine. */
public void initFrom(String state, String jsonEncodedArgument) throws Exception {
var transition = StateTransition.to(state, jsonEncodedArgument);
synchronized (this) {
this.state = allStates.get(transition.state());
notifyAll();
}
smOutbox.notify(transition.state(), transition.message());
}
/** Resume the state machine from the last known state. */
private void resume() {

View File

@ -93,4 +93,8 @@ public class WorkLog implements AutoCloseable {
logWriter.flush();
logWriter.close();
}
public int countFinishedJobs() {
return finishedJobs.size();
}
}

View File

@ -90,8 +90,8 @@ public class DatabaseModule extends AbstractModule {
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.setMaximumPoolSize(100);
config.setMinimumIdle(10);
config.setMaximumPoolSize(20);
config.setMinimumIdle(2);
config.setMaxLifetime(Duration.ofMinutes(9).toMillis());

View File

@ -4,6 +4,22 @@ appender.console.type = Console
appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %style{%-8markerSimpleName}{FG_Cyan} %highlight{%-5level}{FATAL=red, ERROR=red, WARN=yellow} %-24t %-20c{1} -- %msg{nolookups}%n
appender.console.filter.process.type = MarkerFilter
appender.console.filter.process.onMismatch=ACCEPT
appender.console.filter.process.onMatch=DENY
appender.console.filter.process.marker=PROCESS
appender.console.filter.http.type = MarkerFilter
appender.console.filter.http.onMismatch=ACCEPT
appender.console.filter.http.onMatch=DENY
appender.console.filter.http.marker=HTTP
appender.processconsole.type = Console
appender.processconsole.name = ProcessLogToConsole
appender.processconsole.layout.type = PatternLayout
appender.processconsole.layout.pattern = %msg{nolookups}%n
appender.processconsole.filter.process.type = MarkerFilter
appender.processconsole.filter.process.onMismatch=DENY
appender.processconsole.filter.process.onMatch=ACCEPT
appender.processconsole.filter.process.marker=PROCESS
appender.rolling.type = RollingFile
appender.rolling.name = RollingFile
appender.rolling.fileName = /var/log/wmsa/wmsa-${sys:service-name}.log
@ -23,6 +39,27 @@ appender.rolling.filter.http.type = MarkerFilter
appender.rolling.filter.http.onMismatch=ACCEPT
appender.rolling.filter.http.onMatch=DENY
appender.rolling.filter.http.marker=HTTP
appender.rolling.filter.process.type = MarkerFilter
appender.rolling.filter.process.onMismatch=ACCEPT
appender.rolling.filter.process.onMatch=DENY
appender.rolling.filter.process.marker=PROCESS
appender.process.type = RollingFile
appender.process.name = ProcessFile
appender.process.fileName = /var/log/wmsa/process.log
appender.process.filePattern = /var/log/wmsa/process-log-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz
appender.process.layout.pattern = %msg{nolookups}%n
appender.process.layout.type = PatternLayout
appender.process.policies.type = Policies
appender.process.policies.size.type = SizeBasedTriggeringPolicy
appender.process.policies.size.size=10MB
appender.process.strategy.type = DefaultRolloverStrategy
appender.process.strategy.max = 10
appender.process.filter.process.type = MarkerFilter
appender.process.filter.process.onMismatch=DENY
appender.process.filter.process.onMatch=ACCEPT
appender.process.filter.process.marker=PROCESS
rootLogger.level = info
rootLogger.appenderRef.console.ref = LogToConsole
rootLogger.appenderRef.rolling.ref = RollingFile
rootLogger.appenderRef.processconsole.ref = ProcessLogToConsole
rootLogger.appenderRef.rolling.ref = RollingFile
rootLogger.appenderRef.process.ref = ProcessFile

View File

@ -5,6 +5,7 @@ import nu.marginalia.dict.OffHeapDictionaryHashMap;
import nu.marginalia.index.journal.model.IndexJournalEntry;
import nu.marginalia.index.journal.writer.IndexJournalWriterImpl;
import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
@ -45,7 +46,7 @@ class ForwardIndexConverterTest {
dictionaryFile = Files.createTempFile("tmp", ".dict");
dictionaryFile.toFile().deleteOnExit();
keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile()));
keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile(), KeywordLexiconJournalMode.READ_WRITE));
keywordLexicon.getOrInsert("0");
indexFile = Files.createTempFile("tmp", ".idx");

View File

@ -27,7 +27,8 @@ public class IndexJournalWriterImpl implements IndexJournalWriter{
this.lexicon = lexicon;
this.outputFile = outputFile;
var fileStream = Files.newOutputStream(outputFile, StandardOpenOption.CREATE);
var fileStream = Files.newOutputStream(outputFile, StandardOpenOption.CREATE,
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
writeHeaderPlaceholder(fileStream);

View File

@ -8,6 +8,7 @@ import nu.marginalia.index.journal.model.IndexJournalEntry;
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile;
import nu.marginalia.index.journal.writer.IndexJournalWriterImpl;
import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
@ -42,7 +43,7 @@ class ReverseIndexFullConverterTest {
dictionaryFile = Files.createTempFile("tmp", ".dict");
dictionaryFile.toFile().deleteOnExit();
keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile()));
keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile(), KeywordLexiconJournalMode.READ_WRITE));
keywordLexicon.getOrInsert("0");
indexFile = Files.createTempFile("tmp", ".idx");

View File

@ -10,6 +10,7 @@ import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile
import nu.marginalia.index.journal.writer.IndexJournalWriterImpl;
import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.index.priority.ReverseIndexPriorityParameters;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
@ -52,7 +53,7 @@ class ReverseIndexFullConverterTest2 {
dictionaryFile = Files.createTempFile("tmp", ".dict");
dictionaryFile.toFile().deleteOnExit();
keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile()));
keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile(), KeywordLexiconJournalMode.READ_WRITE));
keywordLexicon.getOrInsert("0");
indexFile = Files.createTempFile("tmp", ".idx");

View File

@ -12,6 +12,7 @@ import nu.marginalia.index.priority.ReverseIndexPriorityConverter;
import nu.marginalia.index.priority.ReverseIndexPriorityParameters;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.test.TestUtil;
import org.junit.jupiter.api.AfterEach;
@ -52,7 +53,7 @@ class ReverseIndexPriorityConverterTest2 {
dictionaryFile = Files.createTempFile("tmp", ".dict");
dictionaryFile.toFile().deleteOnExit();
keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile()));
keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(dictionaryFile.toFile(), KeywordLexiconJournalMode.READ_WRITE));
keywordLexicon.getOrInsert("0");
indexFile = Files.createTempFile("tmp", ".idx");

View File

@ -1,9 +1,19 @@
package nu.marginalia.dict;
/** Backing store for the KeywordLexicon, available in on and off-heap versions.
* <p>
* The off-heap version is necessary when loading a lexicon that is too large to fit in RAM, due
* to Java's 2GB limit on the size of a single array. It is slower and less optimized than the on-heap version.
* <p>
* The off-heap version is on the precipice of being deprecated and its use is discouraged.
*/
public interface DictionaryMap {
int NO_VALUE = Integer.MIN_VALUE;
static DictionaryMap create() {
// Default to on-heap version
// TODO: Make this configurable
return new OnHeapDictionaryMap();
}

View File

@ -6,6 +6,7 @@ import io.prometheus.client.Gauge;
import lombok.SneakyThrows;
import nu.marginalia.dict.DictionaryMap;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalFingerprint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -16,6 +17,19 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** The keyword lexicon is used to map keywords to unique numeric IDs.
* This class is used to both construct the lexicon, and to read from it.
* <p>
* Readers will want to use the KeywordLexiconReadOnlyView wrapper, as it
* only exposes read-only methods and hides the mutating methods.
* <p>
* Between instances, the lexicon is stored in a journal file, exactly in the
* order they were received by the writer. The journal file is then replayed
* on startup to reconstruct the lexicon, giving each term an ID according to
* the order they are loaded. It is therefore important that the journal file
* is not tampered with, as this will cause the lexicon to be corrupted.
* */
public class KeywordLexicon implements AutoCloseable {
private final DictionaryMap reverseIndex;
@ -30,6 +44,8 @@ public class KeywordLexicon implements AutoCloseable {
.register();
private final KeywordLexiconJournal journal;
private volatile KeywordLexiconJournalFingerprint fingerprint = null;
@SneakyThrows
public KeywordLexicon(KeywordLexiconJournal keywordLexiconJournal) {
@ -42,21 +58,36 @@ public class KeywordLexicon implements AutoCloseable {
logger.error("MULTIPLE LEXICON INSTANCES!");
}
journal.loadFile(bytes -> reverseIndex.put(hashFunction.hashBytes(bytes).padToLong()));
reload();
logger.info("Done creating dictionary writer");
}
public void reload() throws IOException {
logger.info("Reloading dictionary writer");
journal.loadFile(bytes -> reverseIndex.put(hashFunction.hashBytes(bytes).padToLong()));
logger.info("Done reloading dictionary writer");
public boolean needsReload() throws IOException {
var newFingerprint = journal.journalFingerprint();
return !newFingerprint.equals(fingerprint);
}
/** Reload the lexicon from the journal */
public void reload() throws IOException {
var lock = memoryLock.writeLock();
lock.lock();
try {
reverseIndex.clear();
journal.loadFile(bytes -> reverseIndex.put(hashFunction.hashBytes(bytes).padToLong()));
fingerprint = journal.journalFingerprint();
}
finally {
lock.unlock();
}
}
/** Get method that inserts the word into the lexicon if it is not present */
public int getOrInsert(String macroWord) {
return getOrInsert(macroWord.getBytes(StandardCharsets.UTF_8));
}
/** Get method that inserts the word into the lexicon if it is not present */
@SneakyThrows
private int getOrInsert(byte[] bytes) {
if (bytes.length >= Byte.MAX_VALUE) {
@ -96,11 +127,13 @@ public class KeywordLexicon implements AutoCloseable {
}
}
/** Get method that does not modify the lexicon if the word is not present */
public int getReadOnly(String word) {
final byte[] bytes = word.getBytes(StandardCharsets.UTF_8);
return getReadOnly(hashFunction.hashBytes(bytes).padToLong());
}
/** Get method that does not modify the lexicon if the word is not present */
public int getReadOnly(long hashedKey) {
Lock lock = memoryLock.readLock();
try {

View File

@ -3,13 +3,19 @@ package nu.marginalia.lexicon;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/** A read-only view of a keyword lexicon.
*
* @see KeywordLexicon
* */
public class KeywordLexiconReadOnlyView {
private final KeywordLexicon writer;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Cache<String, Integer> cache = CacheBuilder.newBuilder().maximumSize(10_000).expireAfterAccess(60, TimeUnit.SECONDS).build();
@SneakyThrows
@ -22,8 +28,15 @@ public class KeywordLexiconReadOnlyView {
return cache.get(word, () -> writer.getReadOnly(word));
}
public boolean reload() throws IOException {
writer.reload();
public boolean suggestReload() throws IOException {
if (writer.needsReload()) {
logger.info("Reloading lexicon");
writer.reload();
cache.invalidateAll();
}
else {
logger.info("Foregoing lexicon reload");
}
return true;
}
}

View File

@ -5,35 +5,70 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.List;
import java.util.function.Consumer;
/** The journal for the keyword lexicon.
* It's used both for writing the lexicon, but also for reconstructing it for reading later.
*/
public class KeywordLexiconJournal {
private static final boolean noCommit = Boolean.getBoolean("DictionaryJournal.noCommit");
private final KeywordLexiconJournalCommitQueue commitQueue;
private final KeywordLexiconJournalFile journalFile;
private KeywordLexiconJournalFile journalFile;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Thread commitToDiskThread;
private volatile boolean running = true;
private final Path journalFilePath;
public KeywordLexiconJournal(File file) throws IOException {
commitQueue = new KeywordLexiconJournalCommitQueue();
journalFile = new KeywordLexiconJournalFile(file);
/** Create a new journal.
*
* @param file The file to use for the journal.
* @param mode The mode to use for the journal. If READ_ONLY, the journal will be read-only and refuse
* to accept new entries.
*/
public KeywordLexiconJournal(File file, KeywordLexiconJournalMode mode) throws IOException {
journalFilePath = file.toPath();
commitToDiskThread = new Thread(this::commitToDiskRunner, "CommitToDiskThread");
commitToDiskThread.start();
if (mode == KeywordLexiconJournalMode.READ_WRITE) {
commitQueue = new KeywordLexiconJournalCommitQueue();
journalFile = new KeywordLexiconJournalFile(file);
Runtime.getRuntime().addShutdownHook(new Thread(this::commitToDisk));
commitToDiskThread = new Thread(this::commitToDiskRunner, "CommitToDiskThread");
commitToDiskThread.start();
Runtime.getRuntime().addShutdownHook(new Thread(this::commitToDisk));
}
else {
journalFile = new KeywordLexiconJournalFile(file);
commitQueue = null;
commitToDiskThread = null;
}
}
public void enqueue(byte[] word) throws InterruptedException {
if (null == commitQueue)
throw new UnsupportedOperationException("Lexicon journal is read-only");
commitQueue.enqueue(word);
}
public KeywordLexiconJournalFingerprint journalFingerprint() throws IOException {
var attributes = Files.readAttributes(journalFilePath, BasicFileAttributes.class);
long cTime = attributes.creationTime().toMillis();
long mTime = attributes.lastModifiedTime().toMillis();
long size = attributes.size();
return new KeywordLexiconJournalFingerprint(cTime, mTime, size);
}
public void commitToDiskRunner() {
if (noCommit) return;
@ -57,14 +92,23 @@ public class KeywordLexiconJournal {
public void close() throws Exception {
logger.info("Closing Journal");
running = false;
commitToDiskThread.join();
commitToDisk();
journalFile.close();
if (commitToDiskThread != null) {
commitToDiskThread.join();
commitToDisk();
}
if (journalFile != null) {
journalFile.close();
}
}
public void loadFile(Consumer<byte[]> loadJournalEntry) throws IOException {
journalFile.rewind();
if (journalFile != null) {
journalFile.close();
}
journalFile = new KeywordLexiconJournalFile(journalFilePath.toFile());
journalFile.loadFile(loadJournalEntry);
}
}

View File

@ -7,6 +7,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/** An in-memory queue for lexicon journal entries used to improve the performance of
* large bursts of insert-operations.
*/
class KeywordLexiconJournalCommitQueue {
private final ArrayList<byte[]> commitQueue = new ArrayList<>(10_000);
private final Logger logger = LoggerFactory.getLogger(getClass());

View File

@ -1,6 +1,5 @@
package nu.marginalia.lexicon.journal;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -0,0 +1,10 @@
package nu.marginalia.lexicon.journal;
/** Contains values used to assess whether the lexicon is in sync with the journal
* or if it has been replaced with a newer version and should be reloaded
* */
public record KeywordLexiconJournalFingerprint(long createdTime,
long mTime,
long sizeBytes)
{
}

View File

@ -0,0 +1,6 @@
package nu.marginalia.lexicon.journal;
public enum KeywordLexiconJournalMode {
READ_ONLY,
READ_WRITE
}

View File

@ -2,6 +2,7 @@ package nu.marginalia.lexicon;
import nu.marginalia.dict.OnHeapDictionaryMap;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@ -23,7 +24,7 @@ public class KeywordLexiconTest {
public void setUp() throws IOException {
journalFile = Files.createTempFile(getClass().getSimpleName(), ".dat");
var lexiconJournal = new KeywordLexiconJournal(journalFile.toFile());
var lexiconJournal = new KeywordLexiconJournal(journalFile.toFile(), KeywordLexiconJournalMode.READ_WRITE);
lexicon = new KeywordLexicon(lexiconJournal);
}
@ -64,7 +65,7 @@ public class KeywordLexiconTest {
int c = lexicon.getOrInsert("ccc");
lexicon.commitToDisk();
var lexiconJournal = new KeywordLexiconJournal(journalFile.toFile());
var lexiconJournal = new KeywordLexiconJournal(journalFile.toFile(), KeywordLexiconJournalMode.READ_WRITE);
try (var anotherLexicon = new KeywordLexicon(lexiconJournal)) {
assertEquals(a, anotherLexicon.getReadOnly("aaa"));
assertEquals(b, anotherLexicon.getReadOnly("bbb"));

View File

@ -109,7 +109,7 @@ public class CrawlPlan {
return WorkLog.iterableMap(crawl.getLogFile(),
entry -> {
if (!idPredicate.test(entry.path())) {
if (!idPredicate.test(entry.id())) {
return Optional.empty();
}

View File

@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX;
@ -135,7 +136,12 @@ public class ConverterMain {
};
for (var domain : plan.domainsIterable(id -> !processLog.isJobFinished(id))) {
// Advance the progress bar to the current position if this is a resumption
processedDomains.set(processLog.countFinishedJobs());
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
for (var domain : plan.domainsIterable(id -> !processLog.isJobFinished(id)))
{
pipe.accept(domain);
}

View File

@ -7,6 +7,7 @@ import com.google.inject.Injector;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.loading.loader.IndexLoadKeywords;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
@ -40,6 +41,7 @@ public class LoaderMain {
private final ProcessHeartbeat heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final IndexLoadKeywords indexLoadKeywords;
private final Gson gson;
private volatile boolean running = true;
@ -65,6 +67,7 @@ public class LoaderMain {
ProcessHeartbeat heartbeat,
MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService,
IndexLoadKeywords indexLoadKeywords,
Gson gson
) {
@ -73,6 +76,7 @@ public class LoaderMain {
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.indexLoadKeywords = indexLoadKeywords;
this.gson = gson;
heartbeat.start();
@ -122,6 +126,9 @@ public class LoaderMain {
running = false;
processorThread.join();
instructions.ok();
// This needs to be done in order to have a readable index journal
indexLoadKeywords.close();
}
catch (Exception ex) {
logger.error("Failed to load", ex);

View File

@ -144,12 +144,4 @@ public class Loader implements Interpreter {
sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
}
public void close() {
try {
indexLoadKeywords.close();
}
catch (Exception ex) {
logger.error("Error when closing the index loader", ex);
}
}
}

View File

@ -12,6 +12,7 @@ import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
@ -20,6 +21,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermissions;
import java.sql.SQLException;
import java.util.Arrays;
@ -38,7 +41,13 @@ public class LoaderIndexJournalWriter {
var lexiconPath = lexiconArea.asPath().resolve("dictionary.dat");
var indexPath = indexArea.asPath().resolve("page-index.dat");
lexicon = new KeywordLexicon(new KeywordLexiconJournal(lexiconPath.toFile()));
Files.deleteIfExists(lexiconPath);
Files.deleteIfExists(indexPath);
Files.createFile(indexPath, PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
Files.createFile(lexiconPath, PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
lexicon = new KeywordLexicon(new KeywordLexiconJournal(lexiconPath.toFile(), KeywordLexiconJournalMode.READ_WRITE));
indexWriter = new IndexJournalWriterImpl(lexicon, indexPath);
}

View File

@ -11,6 +11,7 @@ import nu.marginalia.WmsaHome;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.KeywordLexiconReadOnlyView;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.service.control.ServiceEventLog;
import java.nio.file.Path;
@ -32,7 +33,7 @@ public class IndexModule extends AbstractModule {
var area = fileStorageService.getStorageByType(FileStorageType.LEXICON_LIVE);
var path = area.asPath().resolve("dictionary.dat");
return new KeywordLexiconReadOnlyView(new KeywordLexicon(new KeywordLexiconJournal(path.toFile())));
return new KeywordLexiconReadOnlyView(new KeywordLexicon(new KeywordLexiconJournal(path.toFile(), KeywordLexiconJournalMode.READ_ONLY)));
}
finally {
eventLog.logEvent("INDEX-LEXICON-LOAD-OK", "");

View File

@ -38,7 +38,7 @@ public class IndexServicesFactory {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final PartitionedDataFile writerIndexFile;
private final Path writerIndexFile;
private final PartitionedDataFile fwdIndexDocId;
private final PartitionedDataFile fwdIndexDocData;
@ -67,7 +67,7 @@ public class IndexServicesFactory {
Files.createDirectories(tmpFileDir);
}
writerIndexFile = new PartitionedDataFile(stagingStorage, "page-index.dat");
writerIndexFile = stagingStorage.resolve("page-index.dat");
fwdIndexDocId = new PartitionedDataFile(liveStorage, "fwd-doc-id.dat");
fwdIndexDocData = new PartitionedDataFile(liveStorage, "fwd-doc-data.dat");
@ -85,7 +85,7 @@ public class IndexServicesFactory {
public boolean isPreconvertedIndexPresent() {
return Stream.of(
writerIndexFile.get(LIVE_PART).toPath()
writerIndexFile
).allMatch(Files::exists);
}
@ -100,10 +100,6 @@ public class IndexServicesFactory {
).noneMatch(Files::exists);
}
public IndexJournalWriter createIndexJournalWriter(KeywordLexicon lexicon) throws IOException {
return new IndexJournalWriterImpl(lexicon, writerIndexFile.get(LIVE_PART).toPath());
}
public void convertIndex(DomainRankings domainRankings) throws IOException {
convertForwardIndex(domainRankings);
convertFullReverseIndex(domainRankings);
@ -111,11 +107,9 @@ public class IndexServicesFactory {
}
private void convertFullReverseIndex(DomainRankings domainRankings) throws IOException {
var source = writerIndexFile.get(0).toPath();
logger.info("Converting full reverse index {}", writerIndexFile);
logger.info("Converting full reverse index {}", source);
var journalReader = new IndexJournalReaderSingleCompressedFile(source);
var journalReader = new IndexJournalReaderSingleCompressedFile(writerIndexFile);
var converter = new ReverseIndexFullConverter(tmpFileDir,
journalReader,
domainRankings,
@ -129,11 +123,9 @@ public class IndexServicesFactory {
private void convertPriorityReverseIndex(DomainRankings domainRankings) throws IOException {
var source = writerIndexFile.get(0).toPath();
logger.info("Converting priority reverse index {}", writerIndexFile);
logger.info("Converting priority reverse index {}", source);
var journalReader = new IndexJournalReaderSingleCompressedFile(source, null,
var journalReader = new IndexJournalReaderSingleCompressedFile(writerIndexFile, null,
ReverseIndexPriorityParameters::filterPriorityRecord);
var converter = new ReverseIndexPriorityConverter(tmpFileDir,
@ -149,11 +141,10 @@ public class IndexServicesFactory {
private void convertForwardIndex(DomainRankings domainRankings) throws IOException {
var source = writerIndexFile.get(0);
logger.info("Converting forward index data {}", source);
logger.info("Converting forward index data {}", writerIndexFile);
new ForwardIndexConverter(source,
new ForwardIndexConverter(writerIndexFile.toFile(),
fwdIndexDocId.get(NEXT_PART).toPath(),
fwdIndexDocData.get(NEXT_PART).toPath(),
domainRankings)

View File

@ -9,7 +9,6 @@ import spark.Response;
import spark.Spark;
import javax.annotation.CheckReturnValue;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantLock;
@ -39,10 +38,13 @@ public class IndexOpsService {
return run(searchSetService::recalculateAll);
}
public boolean reindex() throws Exception {
return run(index::switchIndex).isPresent();
return run(() -> {
return index.switchIndex() && lexicon.suggestReload();
}).isPresent();
}
public boolean reloadLexicon() throws Exception {
return run(lexicon::reload).isPresent();
return run(lexicon::suggestReload).isPresent();
}
public Object repartitionEndpoint(Request request, Response response) throws Exception {

View File

@ -6,9 +6,11 @@ import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.index.IndexServicesFactory;
import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.index.journal.writer.IndexJournalWriterImpl;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.KeywordLexiconReadOnlyView;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.index.svc.searchset.SearchSetAny;
import nu.marginalia.index.util.TestUtil;
@ -70,15 +72,19 @@ public class IndexQueryServiceIntegrationTestModule extends AbstractModule {
when(setsServiceMock.getDomainRankings()).thenReturn(new DomainRankings());
bind(IndexSearchSetsService.class).toInstance(setsServiceMock);
var keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(slowDir.resolve("dictionary.dat").toFile()));
var keywordLexicon = new KeywordLexicon(new KeywordLexiconJournal(
slowDir.resolve("dictionary.dat").toFile(),
KeywordLexiconJournalMode.READ_WRITE)
);
bind(KeywordLexicon.class).toInstance(keywordLexicon);
bind(KeywordLexiconReadOnlyView.class).toInstance(new KeywordLexiconReadOnlyView(keywordLexicon));
bind(IndexJournalWriter.class).toInstance(servicesFactory.createIndexJournalWriter(keywordLexicon));
bind(ServiceEventLog.class).toInstance(Mockito.mock(ServiceEventLog.class));
bind(ServiceHeartbeat.class).toInstance(Mockito.mock(ServiceHeartbeat.class));
bind(IndexJournalWriter.class).toInstance(new IndexJournalWriterImpl(keywordLexicon,
slowDir.resolve("page-index.dat")));
bind(ServiceConfiguration.class).toInstance(new ServiceConfiguration(
ServiceId.Index,
0,

View File

@ -3,10 +3,7 @@ package nu.marginalia.control;
import com.google.gson.Gson;
import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors;
import nu.marginalia.control.model.ControlProcess;
import nu.marginalia.control.fsm.ControlFSMs;
import nu.marginalia.control.svc.*;
import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.renderer.MustacheRenderer;
import nu.marginalia.renderer.RendererFactory;
@ -26,11 +23,12 @@ public class ControlService extends Service {
private final Gson gson = GsonFactory.get();
private final ServiceMonitors monitors;
private final MustacheRenderer<Object> indexRenderer;
private final MustacheRenderer<Map<?,?>> servicesRenderer;
private final MustacheRenderer<Map<?,?>> processesRenderer;
private final MustacheRenderer<Map<?,?>> storageRenderer;
private final HeartbeatService heartbeatService;
private final EventLogService eventLogService;
private final ControlFsmService controlFsmService;
private final StaticResources staticResources;
private final MessageQueueViewService messageQueueViewService;
private final ControlFileStorageService controlFileStorageService;
@Inject
@ -39,7 +37,7 @@ public class ControlService extends Service {
HeartbeatService heartbeatService,
EventLogService eventLogService,
RendererFactory rendererFactory,
ControlFSMs controlFSMs,
ControlFsmService controlFsmService,
StaticResources staticResources,
MessageQueueViewService messageQueueViewService,
ControlFileStorageService controlFileStorageService
@ -47,13 +45,20 @@ public class ControlService extends Service {
super(params);
this.monitors = monitors;
this.heartbeatService = heartbeatService;
this.eventLogService = eventLogService;
indexRenderer = rendererFactory.renderer("control/index");
servicesRenderer = rendererFactory.renderer("control/services");
processesRenderer = rendererFactory.renderer("control/processes");
storageRenderer = rendererFactory.renderer("control/storage");
var indexRenderer = rendererFactory.renderer("control/index");
var servicesRenderer = rendererFactory.renderer("control/services");
var serviceByIdRenderer = rendererFactory.renderer("control/service-by-id");
var processesRenderer = rendererFactory.renderer("control/processes");
var storageRenderer = rendererFactory.renderer("control/storage");
this.controlFsmService = controlFsmService;
this.staticResources = staticResources;
this.messageQueueViewService = messageQueueViewService;
this.controlFileStorageService = controlFileStorageService;
Spark.get("/public/heartbeats", (req, res) -> {
res.type("application/json");
@ -62,45 +67,21 @@ public class ControlService extends Service {
Spark.get("/public/", (req, rsp) -> indexRenderer.render(Map.of()));
Spark.get("/public/services",
(req, rsp) -> Map.of("services", heartbeatService.getServiceHeartbeats(),
"events", eventLogService.getLastEntries(20)),
(map) -> servicesRenderer.render((Map<?, ?>) map));
Spark.get("/public/processes",
(req, rsp) -> Map.of("processes", heartbeatService.getProcessHeartbeats(),
"fsms", controlFSMs.getFsmStates(),
"messages", messageQueueViewService.getLastEntries(20)),
(map) -> processesRenderer.render((Map<?, ?>) map));
Spark.get("/public/storage",
(req, rsp) -> Map.of("storage", controlFileStorageService.getStorageList()),
(map) -> storageRenderer.render((Map<?, ?>) map));
Spark.get("/public/services", this::servicesModel, servicesRenderer::render);
Spark.get("/public/services/:id", this::serviceModel, serviceByIdRenderer::render);
Spark.get("/public/processes", this::processesModel, processesRenderer::render);
Spark.get("/public/storage", this::storageModel, storageRenderer::render);
final HtmlRedirect redirectToServices = new HtmlRedirect("/services");
final HtmlRedirect redirectToProcesses = new HtmlRedirect("/processes");
final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage");
Spark.post("/public/fsms/:fsm/start", (req, rsp) -> {
controlFSMs.start(ControlProcess.valueOf(req.params("fsm").toUpperCase()));
return "";
}, redirectToProcesses);
Spark.post("/public/fsms/:fsm/start", controlFsmService::startFsm, redirectToProcesses);
Spark.post("/public/fsms/:fsm/stop", controlFsmService::stopFsm, redirectToProcesses);
Spark.post("/public/fsms/:fsm/stop", (req, rsp) -> {
controlFSMs.stop(ControlProcess.valueOf(req.params("fsm").toUpperCase()));
return "";
}, redirectToProcesses);
Spark.post("/public/storage/:fid/process", controlFsmService::triggerProcessing, redirectToProcesses);
Spark.post("/public/storage/:fid/load", controlFsmService::loadProcessedData, redirectToProcesses);
// TODO: This should be a POST
Spark.get("/public/repartition", (req, rsp) -> {
controlFSMs.start(ControlProcess.REPARTITION_REINDEX);
return "";
} , redirectToProcesses);
Spark.post("/public/storage/:fid/process", (req, rsp) -> {
controlFSMs.start(ControlProcess.RECONVERT_LOAD, FileStorageId.of(Integer.parseInt(req.params("fid"))));
return "";
}, redirectToProcesses);
Spark.post("/public/storage/:fid/delete", controlFileStorageService::flagFileForDeletionRequest, redirectToStorage);
Spark.get("/public/:resource", this::serveStatic);
@ -108,6 +89,28 @@ public class ControlService extends Service {
monitors.subscribe(this::logMonitorStateChange);
}
private Object serviceModel(Request request, Response response) {
String serviceName = request.params("id");
return Map.of(
"id", serviceName,
"events", eventLogService.getLastEntriesForService(serviceName, 20));
}
private Object storageModel(Request request, Response response) {
return Map.of("storage", controlFileStorageService.getStorageList());
}
private Object servicesModel(Request request, Response response) {
return Map.of("services", heartbeatService.getServiceHeartbeats(),
"events", eventLogService.getLastEntries(20));
}
private Object processesModel(Request request, Response response) {
return Map.of("processes", heartbeatService.getProcessHeartbeats(),
"fsms", controlFsmService.getFsmStates(),
"messages", messageQueueViewService.getLastEntries(20));
}
private Object serveStatic(Request request, Response response) {
String resource = request.params("resource");

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@Singleton
public class ControlFSMs {
@ -68,33 +69,39 @@ public class ControlFSMs {
eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state);
}
public void startFrom(ControlProcess process, String state) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state);
}
public void start(ControlProcess process) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init();
}
public <T> void startFrom(ControlProcess process, String state, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, gson.toJson(arg));
}
public <T> void start(ControlProcess process, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(gson.toJson(arg));
}
public List<ControlProcessState> getFsmStates() {
return stateMachines.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> {
final MachineState state = e.getValue().getState();
final String machineName = e.getKey().name();
final String stateName = state.name();
final boolean terminal = state.isFinal();
return new ControlProcessState(machineName, stateName, terminal);
}).toList();
}
@SneakyThrows
public void stop(ControlProcess fsm) {
stateMachines.get(fsm).abortExecution();
}
public Map<ControlProcess, MachineState> getMachineStates() {
return stateMachines.entrySet().stream().collect(
Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().getState())
);
}
}

View File

@ -58,7 +58,7 @@ public class AbstractProcessSpawnerFSM extends AbstractStateGraph {
}
}
@GraphState(name = RUN)
@GraphState(name = RUN, resume = ResumeBehavior.RESTART)
public void run(Integer attempts) throws Exception {
try {
processService.trigger(processId);

View File

@ -23,7 +23,13 @@ import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import nu.marginalia.search.client.SearchClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -32,23 +38,19 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
// STATES
private static final String INITIAL = "INITIAL";
private static final String RECONVERT = "RECONVERT";
private static final String RECONVERT_WAIT = "RECONVERT_WAIT";
private static final String LOAD = "LOAD";
private static final String LOAD_WAIT = "LOAD_WAIT";
private static final String MOVE_INDEX_FILES = "MOVE_INDEX_FILES";
private static final String RELOAD_LEXICON = "RELOAD_LEXICON";
private static final String RELOAD_LEXICON_WAIT = "RELOAD_LEXICON_WAIT";
private static final String FLUSH_CACHES = "FLUSH_CACHES";
private static final String END = "END";
public static final String INITIAL = "INITIAL";
public static final String RECONVERT = "RECONVERT";
public static final String RECONVERT_WAIT = "RECONVERT-WAIT";
public static final String LOAD = "LOAD";
public static final String LOAD_WAIT = "LOAD-WAIT";
public static final String SWAP_LEXICON = "SWAP-LEXICON";
public static final String END = "END";
private final ProcessService processService;
private final MqOutbox mqIndexOutbox;
private final MqOutbox mqSearchOutbox;
private final MqOutbox mqConverterOutbox;
private final MqOutbox mqLoaderOutbox;
private final FileStorageService storageService;
private final Gson gson;
private final Logger logger = LoggerFactory.getLogger(getClass());
@AllArgsConstructor @With @NoArgsConstructor
@ -62,17 +64,13 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
@Inject
public ReconvertAndLoadFSM(StateFactory stateFactory,
ProcessService processService,
IndexClient indexClient,
ProcessOutboxFactory processOutboxFactory,
SearchClient searchClient,
FileStorageService storageService,
Gson gson
)
{
super(stateFactory);
this.processService = processService;
this.mqIndexOutbox = indexClient.outbox();
this.mqSearchOutbox = searchClient.outbox();
this.mqConverterOutbox = processOutboxFactory.createConverterOutbox();
this.mqLoaderOutbox = processOutboxFactory.createLoaderOutbox();
this.storageService = storageService;
@ -92,8 +90,12 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
@GraphState(name = RECONVERT, next = RECONVERT_WAIT, resume = ResumeBehavior.ERROR)
public Message reconvert(Message message) throws Exception {
// Create processed data area
var toProcess = storageService.getStorage(message.crawlStorageId);
var base = storageService.getStorageBase(FileStorageBaseType.SLOW);
var processedArea = storageService.allocateTemporaryStorage(base, FileStorageType.PROCESSED_DATA, "processed-data", "Processed Data");
var processedArea = storageService.allocateTemporaryStorage(base, FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Data; " + toProcess.description());
// Pre-send convert request
var request = new ConvertRequest(message.crawlStorageId, processedArea.id());
@ -124,7 +126,7 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
}
@GraphState(name = LOAD_WAIT, next = END, resume = ResumeBehavior.RETRY)
@GraphState(name = LOAD_WAIT, next = SWAP_LEXICON, resume = ResumeBehavior.RETRY)
public void loadWait(Message message) throws Exception {
var rsp = waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, message.loaderMsgId);
@ -132,6 +134,33 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
error("Loader failed");
}
@GraphState(name = SWAP_LEXICON, next = END, resume = ResumeBehavior.RETRY)
public void swapLexicon(Message message) throws Exception {
var live = storageService.getStorageByType(FileStorageType.LEXICON_LIVE);
var staging = storageService.getStorageByType(FileStorageType.LEXICON_STAGING);
var fromSource = staging.asPath().resolve("dictionary.dat");
var liveDest = live.asPath().resolve("dictionary.dat");
// Backup live lexicon
var backupBase = storageService.getStorageBase(FileStorageBaseType.BACKUP);
var backup = storageService.allocateTemporaryStorage(backupBase, FileStorageType.BACKUP,
"lexicon", "Lexicon Backup; " + LocalDateTime.now());
Path backupDest = backup.asPath().resolve("dictionary.dat");
logger.info("Moving " + liveDest + " to " + backupDest);
Files.move(liveDest, backupDest);
// Swap in new lexicon
logger.info("Moving " + fromSource + " to " + liveDest);
Files.move(fromSource, liveDest, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
}
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long id) throws Exception {
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
error("Process " + processId + " did not launch");
@ -162,37 +191,4 @@ public class ReconvertAndLoadFSM extends AbstractStateGraph {
return false;
}
// @GraphState(name = MOVE_INDEX_FILES, next = RELOAD_LEXICON, resume = ResumeBehavior.ERROR)
// public void moveIndexFiles(String crawlJob) throws Exception {
// Path indexData = Path.of("/vol/index.dat");
// Path indexDest = Path.of("/vol/iw/0/page-index.dat");
//
// if (!Files.exists(indexData))
// error("Index data not found");
//
// Files.move(indexData, indexDest, StandardCopyOption.REPLACE_EXISTING);
// }
//
// @GraphState(name = RELOAD_LEXICON, next = RELOAD_LEXICON_WAIT, resume = ResumeBehavior.ERROR)
// public long reloadLexicon() throws Exception {
// return mqIndexOutbox.sendAsync(IndexMqEndpoints.INDEX_RELOAD_LEXICON, "");
// }
//
// @GraphState(name = RELOAD_LEXICON_WAIT, next = FLUSH_CACHES, resume = ResumeBehavior.RETRY)
// public void reloadLexiconWait(long id) throws Exception {
// var rsp = mqIndexOutbox.waitResponse(id);
//
// if (rsp.state() != MqMessageState.OK) {
// error("RELOAD_LEXICON failed");
// }
// }
//
// @GraphState(name = FLUSH_CACHES, next = END, resume = ResumeBehavior.RETRY)
// public void flushCaches() throws Exception {
// var rsp = mqSearchOutbox.send(SearchMqEndpoints.FLUSH_CACHES, "");
//
// if (rsp.state() != MqMessageState.OK) {
// error("FLUSH_CACHES failed");
// }
// }
}

View File

@ -18,12 +18,12 @@ public class RepartitionReindexFSM extends AbstractStateGraph {
// STATES
private static final String INITIAL = "INITIAL";
private static final String REPARTITION = "REPARTITION";
private static final String REPARTITION_REPLY = "REPARTITION-REPLY";
private static final String REINDEX = "REINDEX";
private static final String REINDEX_REPLY = "REINDEX-REPLY";
private static final String END = "END";
public static final String INITIAL = "INITIAL";
public static final String REPARTITION = "REPARTITION";
public static final String REPARTITION_WAIT = "REPARTITION-WAIT";
public static final String REINDEX = "REINDEX";
public static final String REINDEX_WAIT = "REINDEX-WAIT";
public static final String END = "END";
@Inject
@ -43,12 +43,12 @@ public class RepartitionReindexFSM extends AbstractStateGraph {
}
}
@GraphState(name = REPARTITION, next = REPARTITION_REPLY)
@GraphState(name = REPARTITION, next = REPARTITION_WAIT)
public Long repartition() throws Exception {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, "");
}
@GraphState(name = REPARTITION_REPLY, next = REINDEX, resume = ResumeBehavior.RETRY)
@GraphState(name = REPARTITION_WAIT, next = REINDEX, resume = ResumeBehavior.RETRY)
public void repartitionReply(Long id) throws Exception {
var rsp = indexOutbox.waitResponse(id);
@ -57,12 +57,12 @@ public class RepartitionReindexFSM extends AbstractStateGraph {
}
}
@GraphState(name = REINDEX, next = REINDEX_REPLY)
@GraphState(name = REINDEX, next = REINDEX_WAIT)
public Long reindex() throws Exception {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, "");
}
@GraphState(name = REINDEX_REPLY, next = END, resume = ResumeBehavior.RETRY)
@GraphState(name = REINDEX_WAIT, next = END, resume = ResumeBehavior.RETRY)
public void reindexReply(Long id) throws Exception {
var rsp = indexOutbox.waitResponse(id);

View File

@ -5,6 +5,12 @@ public record ControlProcessState(String name, String state, boolean terminal) {
if (terminal) {
return "\uD83D\uDE34";
}
else if (state.equals("MONITOR")) {
return "\uD83D\uDD26";
}
else if (state.endsWith("WAIT") || state.endsWith("REPLY")) {
return "\uD83D\uDD59";
}
else {
return "\uD83C\uDFC3";
}

View File

@ -11,6 +11,7 @@ public record FileStorageWithActions(FileStorage storage) {
return storage.type() == FileStorageType.CRAWL_DATA;
}
public boolean isDeletable() {
return storage.type() == FileStorageType.PROCESSED_DATA;
return storage.type() == FileStorageType.PROCESSED_DATA
|| storage.type() == FileStorageType.BACKUP;
}
}

View File

@ -31,7 +31,7 @@ public record ProcessHeartbeat(
public String progressStyle() {
if ("RUNNING".equals(status) && progress != null) {
return """
background: linear-gradient(90deg, #fff 0%%, #ccc %d%%, #fff %d%%)
background: linear-gradient(90deg, #ccc 0%%, #ccc %d%%, #fff %d%%)
""".formatted(progress, progress, progress);
}
return "";

View File

@ -0,0 +1,72 @@
package nu.marginalia.control.svc;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.fsm.ControlFSMs;
import nu.marginalia.control.fsm.task.ReconvertAndLoadFSM;
import nu.marginalia.control.model.ControlProcess;
import nu.marginalia.control.model.ControlProcessState;
import nu.marginalia.db.storage.model.FileStorageId;
import nu.marginalia.mqsm.state.MachineState;
import spark.Request;
import spark.Response;
import java.util.List;
import java.util.Map;
@Singleton
public class ControlFsmService {
private final ControlFSMs controlFSMs;
@Inject
public ControlFsmService(ControlFSMs controlFSMs) {
this.controlFSMs = controlFSMs;
}
public Object startFsm(Request req, Response rsp) throws Exception {
controlFSMs.start(
ControlProcess.valueOf(req.params("fsm").toUpperCase())
);
return "";
}
public Object stopFsm(Request req, Response rsp) throws Exception {
controlFSMs.stop(
ControlProcess.valueOf(req.params("fsm").toUpperCase())
);
return "";
}
public Object triggerProcessing(Request request, Response response) throws Exception {
controlFSMs.start(
ControlProcess.RECONVERT_LOAD,
FileStorageId.of(Integer.parseInt(request.params("fid")))
);
return "";
}
public Object loadProcessedData(Request request, Response response) throws Exception {
var fid = FileStorageId.of(Integer.parseInt(request.params("fid")));
// Start the FSM from the intermediate state that triggers the load
controlFSMs.startFrom(
ControlProcess.RECONVERT_LOAD,
ReconvertAndLoadFSM.LOAD,
new ReconvertAndLoadFSM.Message(null, fid, 0L, 0L)
);
return "";
}
public Object getFsmStates() {
return controlFSMs.getMachineStates().entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> {
final MachineState state = e.getValue();
final String machineName = e.getKey().name();
final String stateName = state.name();
final boolean terminal = state.isFinal();
return new ControlProcessState(machineName, stateName, terminal);
}).toList();
}
}

View File

@ -45,4 +45,66 @@ public class EventLogService {
}
}
public List<EventLogEntry> getLastEntriesForService(String serviceName, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT SERVICE_NAME, INSTANCE, EVENT_TIME, EVENT_TYPE, EVENT_MESSAGE
FROM SERVICE_EVENTLOG
WHERE SERVICE_NAME = ?
ORDER BY ID DESC
LIMIT ?
""")) {
query.setString(1, serviceName);
query.setInt(2, n);
List<EventLogEntry> entries = new ArrayList<>(n);
var rs = query.executeQuery();
while (rs.next()) {
entries.add(new EventLogEntry(
rs.getString("SERVICE_NAME"),
rs.getString("INSTANCE"),
rs.getTimestamp("EVENT_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getString("EVENT_TYPE"),
rs.getString("EVENT_MESSAGE")
));
}
return entries;
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
public List<EventLogEntry> getLastEntriesForInstance(String instance, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT SERVICE_NAME, INSTANCE, EVENT_TIME, EVENT_TYPE, EVENT_MESSAGE
FROM SERVICE_EVENTLOG
WHERE INSTANCE = ?
ORDER BY ID DESC
LIMIT ?
""")) {
query.setString(1, instance);
query.setInt(2, n);
List<EventLogEntry> entries = new ArrayList<>(n);
var rs = query.executeQuery();
while (rs.next()) {
entries.add(new EventLogEntry(
rs.getString("SERVICE_NAME"),
rs.getString("INSTANCE"),
rs.getTimestamp("EVENT_TIME").toLocalDateTime().toLocalTime().toString(),
rs.getString("EVENT_TYPE"),
rs.getString("EVENT_MESSAGE")
));
}
return entries;
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
}

View File

@ -5,6 +5,8 @@ import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import spark.utils.IOUtils;
import javax.inject.Inject;
@ -21,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Singleton
public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Marker processMarker = MarkerFactory.getMarker("PROCESS");
private final ServiceEventLog eventLog;
private final Path distPath;
@ -74,9 +78,9 @@ public class ProcessService {
while (process.isAlive()) {
if (es.ready())
logger.warn("{}:{}", processId, es.readLine());
logger.warn(processMarker, es.readLine());
if (os.ready())
logger.debug("{}:{}", processId, os.readLine());
logger.info(processMarker, os.readLine());
}
return 0 == process.waitFor();
@ -116,6 +120,9 @@ public class ProcessService {
}
opts.put("WMSA_HOME", WMSA_HOME);
opts.put("JAVA_HOME", System.getenv("JAVA_HOME"));
opts.put("CONVERTER_OPTS", System.getenv("CONVERTER_OPTS"));
opts.put("LOADER_OPTS", System.getenv("LOADER_OPTS"));
opts.put("CRAWLER_OPTS", System.getenv("CRAWLER_OPTS"));
return opts.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).toArray(String[]::new);
}

View File

@ -3,7 +3,7 @@
<head>
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
<link rel="stylesheet" href="/style.css" />
</head>
<body>
{{> control/partials/nav}}

View File

@ -1,8 +1,8 @@
<nav>
<ul>
<li><a href="/">Overview</a></li>
<li><a href="services">Services</a></li>
<li><a href="processes">Processes</a></li>
<li><a href="storage">Storage</a></li>
<li><a href="/services">Services</a></li>
<li><a href="/processes">Processes</a></li>
<li><a href="/storage">Storage</a></li>
</ul>
</nav>

View File

@ -9,14 +9,14 @@
<th>Last Seen (ms)</th>
</tr>
{{#each processes}}
<tr class="{{#if isMissing}}missing{{/if}}" style="{{progressStyle}}">
<tr class="{{#if isMissing}}missing{{/if}}">
<td>{{processId}}</td>
<td title="{{uuidFull}}">
<span style="background-color: {{uuidColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{uuidColor2}}" class="uuidPip">&nbsp;</span>
{{uuid}}
</td>
<td>{{status}}</td>
<td>{{#if progress}}{{progress}}%{{/if}}</td>
<td style="{{progressStyle}}">{{#if progress}}{{progress}}%{{/if}}</td>
<td>{{#unless isStopped}}{{lastSeenMillis}}{{/unless}}</td>
</tr>
{{/each}}

View File

@ -7,7 +7,7 @@
</tr>
{{#each services}}
<tr class="{{#if isMissing}}missing{{/if}} {{#unless alive}}terminated{{/unless}}">
<td>{{serviceId}}</td>
<td><a href="/services/{{serviceId}}">{{serviceId}}</a></td>
<td title="{{uuidFull}}">
<span style="background-color: {{uuidColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{uuidColor2}}" class="uuidPip">&nbsp;</span>
{{uuid}}

View File

@ -3,7 +3,7 @@
<head>
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
<link rel="stylesheet" href="/style.css" />
</head>
<body>
{{> control/partials/nav}}

View File

@ -0,0 +1,21 @@
<!DOCTYPE html>
<html>
<head>
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="/style.css" />
</head>
<body>
{{> control/partials/nav}}
<section>
<h1>Services/{{id}}</h1>
{{> control/partials/events-table }}
</section>
</body>
<script src="/refresh.js"></script>
<script>
window.setInterval(() => {
refresh(["services", "events"]);
}, 5000);
</script>
</html>

View File

@ -3,7 +3,7 @@
<head>
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
<link rel="stylesheet" href="/style.css" />
</head>
<body>
{{> control/partials/nav}}

View File

@ -3,12 +3,12 @@
<head>
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
<link rel="stylesheet" href="/style.css" />
</head>
<body>
{{> control/partials/nav}}
<section>
<h1>Storage</h1>
<table>
{{#each storage}}
<tr>

3
run/env/service.env vendored
View File

@ -1,2 +1,3 @@
WMSA_HOME=run/
CONTROL_SERVICE_OPTS="-DdistPath=/dist"
CONTROL_SERVICE_OPTS="-DdistPath=/dist"
CONVERTER_OPTS="-ea -Xmx16G -XX:-CompactStrings -XX:+UseParallelGC -XX:GCTimeRatio=14 -XX:ParallelGCThreads=15"