(heartbeat) Task heartbeats

This commit is contained in:
Viktor Lofgren 2023-08-04 14:40:06 +02:00
parent 1d0cea1d55
commit 624b78ec3a
22 changed files with 515 additions and 73 deletions

View File

@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS TASK_HEARTBEAT (
TASK_NAME VARCHAR(255) PRIMARY KEY COMMENT "Full name of the task, including node id if applicable, e.g. reconvert:0",
TASK_BASE VARCHAR(255) NOT NULL COMMENT "Base name of the task, e.g. reconvert",
INSTANCE VARCHAR(255) NOT NULL COMMENT "UUID of the task instance",
SERVICE_INSTANCE VARCHAR(255) NOT NULL COMMENT "UUID of the parent service",
STATUS ENUM ('STARTING', 'RUNNING', 'STOPPED') NOT NULL DEFAULT 'STARTING' COMMENT "Status of the task",
PROGRESS INT NOT NULL DEFAULT 0 COMMENT "Progress of the task",
STAGE_NAME VARCHAR(255) DEFAULT "",
HEARTBEAT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT "Task was last seen at this point"
);

View File

@ -10,7 +10,8 @@ import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
/** This service sends a heartbeat to the database every 5 seconds.
/** This service sends a heartbeat to the database every 5 seconds,
* updating the control service with the liveness information for the service.
*/
@Singleton
public class ServiceHeartbeat {
@ -18,6 +19,7 @@ public class ServiceHeartbeat {
private final String serviceName;
private final String serviceBase;
private final String instanceUUID;
private final ServiceConfiguration configuration;
private final HikariDataSource dataSource;
@ -32,6 +34,7 @@ public class ServiceHeartbeat {
{
this.serviceName = configuration.serviceName() + ":" + configuration.node();
this.serviceBase = configuration.serviceName();
this.configuration = configuration;
this.dataSource = dataSource;
this.instanceUUID = configuration.instanceUuid().toString();
@ -41,6 +44,11 @@ public class ServiceHeartbeat {
Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown));
}
public <T extends Enum<T>> ServiceTaskHeartbeat<T> createServiceProcessHeartbeat(Class<T> steps, String processName) {
return new ServiceTaskHeartbeat<>(steps, configuration, processName, dataSource);
}
public void start() {
if (!running) {
runnerThread.start();
@ -142,4 +150,5 @@ public class ServiceHeartbeat {
}
}
}
}

View File

@ -0,0 +1,184 @@
package nu.marginalia.service.control;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/** This object sends a heartbeat to the database every few seconds,
* updating with the progress of a task within a service. Progress is tracked by providing
* enumerations corresponding to the steps in the task. It's important they're arranged in the same
* order as the steps in the task in order to get an accurate progress tracking.
*/
public class ServiceTaskHeartbeat<T extends Enum<T>> implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(ServiceTaskHeartbeat.class);
private final String taskName;
private final String taskBase;
private final String instanceUUID;
private final HikariDataSource dataSource;
private final Thread runnerThread;
private final int heartbeatInterval = Integer.getInteger("mcp.heartbeat.interval", 1);
private final String serviceInstanceUUID;
private final int stepCount;
private volatile boolean running = false;
private volatile int stepNum = 0;
private volatile String step = "-";
ServiceTaskHeartbeat(Class<T> stepClass,
ServiceConfiguration configuration,
String taskName,
HikariDataSource dataSource)
{
this.taskName = configuration.serviceName() + "." + taskName + ":" + configuration.node();
this.taskBase = configuration.serviceName() + "." + taskName;
this.dataSource = dataSource;
this.instanceUUID = UUID.randomUUID().toString();
this.serviceInstanceUUID = configuration.instanceUuid().toString();
this.stepCount = stepClass.getEnumConstants().length;
runnerThread = new Thread(this::run);
runnerThread.start();
}
/** Update the progress of the task. This is a fast function that doesn't block;
* the actual update is done in a separate thread.
*
* @param step The current step in the task.
*/
public void progress(T step) {
this.step = step.name();
// off by one since we calculate the progress based on the number of steps,
// and Enum.ordinal() is zero-based (so the 5th step in a 5 step task is 4, not 5; resulting in the
// final progress being 80% and not 100%)
this.stepNum = 1 + step.ordinal();
logger.info("ServiceTask {} progress: {}", taskBase, step.name());
}
public void shutDown() {
if (!running)
return;
running = false;
try {
runnerThread.join();
heartbeatStop();
}
catch (InterruptedException|SQLException ex) {
logger.warn("ServiceHeartbeat shutdown failed", ex);
}
}
private void run() {
if (!running)
running = true;
else
return;
try {
heartbeatInit();
while (running) {
try {
heartbeatUpdate();
}
catch (SQLException ex) {
logger.warn("ServiceHeartbeat failed to update", ex);
}
TimeUnit.SECONDS.sleep(heartbeatInterval);
}
}
catch (InterruptedException|SQLException ex) {
logger.error("ServiceHeartbeat caught irrecoverable exception, killing service", ex);
System.exit(255);
}
}
private void heartbeatInit() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
INSERT INTO TASK_HEARTBEAT (TASK_NAME, TASK_BASE, INSTANCE, SERVICE_INSTANCE, HEARTBEAT_TIME, STATUS)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP(6), 'STARTING')
ON DUPLICATE KEY UPDATE
INSTANCE = ?,
SERVICE_INSTANCE = ?,
HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
STATUS = 'STARTING'
"""
))
{
stmt.setString(1, taskName);
stmt.setString(2, taskBase);
stmt.setString(3, instanceUUID);
stmt.setString(4, serviceInstanceUUID);
stmt.setString(5, instanceUUID);
stmt.setString(6, serviceInstanceUUID);
stmt.executeUpdate();
}
}
}
private void heartbeatUpdate() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE TASK_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
STATUS = 'RUNNING',
PROGRESS = ?,
STAGE_NAME = ?
WHERE INSTANCE = ?
""")
)
{
stmt.setInt(1, (int) Math.round(100 * stepNum / (double) stepCount));
stmt.setString(2, step);
stmt.setString(3, instanceUUID);
stmt.executeUpdate();
}
}
}
private void heartbeatStop() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE TASK_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
STATUS='STOPPED',
PROGRESS = ?,
STAGE_NAME = ?
WHERE INSTANCE = ?
""")
)
{
stmt.setInt(1, (int) Math.round(100 * stepNum / (double) stepCount));
stmt.setString( 2, step);
stmt.setString( 3, instanceUUID);
stmt.executeUpdate();
}
}
}
@Override
public void close() {
shutDown();
}
}

View File

@ -18,6 +18,7 @@ dependencies {
implementation project(':code:features-index:index-journal')
implementation project(':code:features-index:lexicon')
implementation project(':code:common:model')
implementation project(':code:common:service')
implementation project(':third-party:uppend')

View File

@ -7,6 +7,7 @@ import nu.marginalia.array.LongArray;
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.service.control.ServiceHeartbeat;
import org.roaringbitmap.IntConsumer;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
@ -19,6 +20,7 @@ import java.nio.file.Path;
public class ForwardIndexConverter {
private final ServiceHeartbeat heartbeat;
private final File inputFile;
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -28,18 +30,27 @@ public class ForwardIndexConverter {
private final DomainRankings domainRankings;
public ForwardIndexConverter(
public ForwardIndexConverter(ServiceHeartbeat heartbeat,
File inputFile,
Path outputFileDocsId,
Path outputFileDocsData,
DomainRankings domainRankings
) {
this.heartbeat = heartbeat;
this.inputFile = inputFile;
this.outputFileDocsId = outputFileDocsId;
this.outputFileDocsData = outputFileDocsData;
this.domainRankings = domainRankings;
}
public enum TaskSteps {
GET_DOC_IDS,
GATHER_OFFSETS,
SUPPLEMENTAL_INDEXES,
FORCE,
FINISHED
}
public void convert() throws IOException {
deleteOldFiles();
@ -53,18 +64,21 @@ public class ForwardIndexConverter {
logger.info("Domain Rankings size = {}", domainRankings.size());
try {
try (var progress = heartbeat.createServiceProcessHeartbeat(TaskSteps.class, "forwardIndexConverter")) {
progress.progress(TaskSteps.GET_DOC_IDS);
LongArray docsFileId = getDocIds(outputFileDocsId, journalReader);
progress.progress(TaskSteps.GATHER_OFFSETS);
// doc ids -> sorted list of ids
logger.info("Gathering Offsets");
Long2IntOpenHashMap docIdToIdx = new Long2IntOpenHashMap((int) docsFileId.size());
docsFileId.forEach(0, docsFileId.size(), (pos, val) -> docIdToIdx.put(val, (int) pos));
// docIdToIdx -> file offset for id
progress.progress(TaskSteps.SUPPLEMENTAL_INDEXES);
logger.info("Creating Supplementary Indexes");
// docIdToIdx -> file offset for id
LongArray docFileData = LongArray.mmapForWriting(outputFileDocsData, ForwardIndexParameters.ENTRY_SIZE * docsFileId.size());
@ -78,11 +92,15 @@ public class ForwardIndexConverter {
docFileData.set(entryOffset + ForwardIndexParameters.DOMAIN_OFFSET, entry.domainId());
});
progress.progress(TaskSteps.FORCE);
docFileData.force();
docsFileId.force();
docFileData.advice(NativeIO.Advice.DontNeed);
docsFileId.advice(NativeIO.Advice.DontNeed);
progress.progress(TaskSteps.FINISHED);
} catch (IOException ex) {
logger.error("Failed to convert", ex);
throw ex;

View File

@ -20,6 +20,7 @@ dependencies {
implementation project(':code:features-index:index-journal')
implementation project(':code:features-index:lexicon')
implementation project(':code:common:model')
implementation project(':code:common:service')
implementation libs.lombok
annotationProcessor libs.lombok

View File

@ -21,11 +21,14 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import nu.marginalia.service.control.ServiceHeartbeat;
import static nu.marginalia.index.full.ReverseIndexFullParameters.bTreeContext;
public class ReverseIndexFullConverter {
private static final int RWF_BIN_SIZE = 10_000_000;
private final ServiceHeartbeat heartbeat;
private final Path tmpFileDir;
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -36,11 +39,13 @@ public class ReverseIndexFullConverter {
private final Path outputFileDocs;
private final SortingContext sortingContext;
public ReverseIndexFullConverter(Path tmpFileDir,
public ReverseIndexFullConverter(ServiceHeartbeat heartbeat,
Path tmpFileDir,
IndexJournalReader journalReader,
DomainRankings domainRankings,
Path outputFileWords,
Path outputFileDocs) {
this.heartbeat = heartbeat;
this.tmpFileDir = tmpFileDir;
this.journalReader = journalReader;
this.domainRankings = domainRankings;
@ -49,6 +54,18 @@ public class ReverseIndexFullConverter {
this.sortingContext = new SortingContext(tmpFileDir, 64_000);
}
public enum TaskSteps {
ACCUMULATE_STATISTICS,
INCREMENT_OFFSETS,
COUNT_OFFSETS,
CREATE_INTERMEDIATE_DOCS,
SORT_INTERMEDIATE_DOCS,
SIZING,
FINALIZING_DOCS,
FORCE,
FINISHED,
}
public void convert() throws IOException {
deleteOldFiles();
@ -57,28 +74,32 @@ public class ReverseIndexFullConverter {
return;
}
final IndexJournalStatistics statistics = journalReader.getStatistics();
final Path intermediateUrlsFile = Files.createTempFile(tmpFileDir, "urls-sorted", ".dat");
try (var progress = heartbeat.createServiceProcessHeartbeat(TaskSteps.class, "reverseIndexFullConverter")) {
progress.progress(TaskSteps.ACCUMULATE_STATISTICS);
try {
final IndexJournalStatistics statistics = journalReader.getStatistics();
final long wordsFileSize = statistics.highestWord() + 1;
progress.progress(TaskSteps.INCREMENT_OFFSETS);
logger.debug("Words file size: {}", wordsFileSize);
// Create a count of how many documents has contains each word
final LongArray wordsOffsets = LongArray.allocate(wordsFileSize);
logger.info("Gathering Offsets");
journalReader.forEachWordId(wordsOffsets::increment);
progress.progress(TaskSteps.COUNT_OFFSETS);
wordsOffsets.transformEach(0, wordsFileSize, new CountToOffsetTransformer(ReverseIndexFullParameters.ENTRY_SIZE));
progress.progress(TaskSteps.CREATE_INTERMEDIATE_DOCS);
// Construct an intermediate representation of the reverse documents index
try (FileChannel intermediateDocChannel =
(FileChannel) Files.newByteChannel(intermediateUrlsFile,
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE))
{
logger.info("Creating Intermediate Docs File");
// Construct intermediate index
try (RandomWriteFunnel intermediateDocumentWriteFunnel = new RandomWriteFunnel(tmpFileDir, RWF_BIN_SIZE);
@ -89,8 +110,7 @@ public class ReverseIndexFullConverter {
intermediateDocumentWriteFunnel.write(intermediateDocChannel);
}
intermediateDocChannel.force(false);
logger.info("Sorting Intermediate Docs File");
progress.progress(TaskSteps.SORT_INTERMEDIATE_DOCS);
// Sort each segment of the intermediate file
{
@ -102,28 +122,29 @@ public class ReverseIndexFullConverter {
intermediateDocs.force();
}
logger.info("Sizing");
progress.progress(TaskSteps.SIZING);
IndexSizeEstimator sizeEstimator = new IndexSizeEstimator(
ReverseIndexFullParameters.bTreeContext,
ReverseIndexFullParameters.ENTRY_SIZE);
wordsOffsets.fold(0, 0, wordsOffsets.size(), sizeEstimator);
logger.info("Finalizing Docs File");
progress.progress(TaskSteps.FINALIZING_DOCS);
LongArray finalDocs = LongArray.mmapForWriting(outputFileDocs, sizeEstimator.size);
// Construct the proper reverse index
wordsOffsets.transformEachIO(0, wordsOffsets.size(), new ReverseIndexBTreeTransformer(finalDocs, ReverseIndexFullParameters.ENTRY_SIZE, bTreeContext, intermediateDocChannel));
wordsOffsets.write(outputFileWords);
progress.progress(TaskSteps.FORCE);
// Attempt to clean up before forcing (important disk space preservation)
Files.deleteIfExists(intermediateUrlsFile);
wordsOffsets.force();
finalDocs.force();
logger.info("Done");
progress.progress(TaskSteps.FINISHED);
}
} catch (IOException ex) {

View File

@ -12,6 +12,7 @@ import nu.marginalia.index.journal.model.IndexJournalStatistics;
import nu.marginalia.index.journal.reader.IndexJournalReader;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.rwf.RandomWriteFunnel;
import nu.marginalia.service.control.ServiceHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -21,9 +22,12 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import static nu.marginalia.index.priority.ReverseIndexPriorityParameters.bTreeContext;
public class ReverseIndexPriorityConverter {
private static final int RWF_BIN_SIZE = 10_000_000;
private final ServiceHeartbeat heartbeat;
private final Path tmpFileDir;
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -34,11 +38,13 @@ public class ReverseIndexPriorityConverter {
private final Path outputFileDocs;
private final SortingContext sortingContext;
public ReverseIndexPriorityConverter(Path tmpFileDir,
public ReverseIndexPriorityConverter(ServiceHeartbeat heartbeat,
Path tmpFileDir,
IndexJournalReader journalReader,
DomainRankings domainRankings,
Path outputFileWords,
Path outputFileDocs) {
this.heartbeat = heartbeat;
this.tmpFileDir = tmpFileDir;
this.journalReader = journalReader;
this.domainRankings = domainRankings;
@ -47,6 +53,18 @@ public class ReverseIndexPriorityConverter {
this.sortingContext = new SortingContext(tmpFileDir, 64_000);
}
public enum TaskSteps {
ACCUMULATE_STATISTICS,
INCREMENT_OFFSETS,
COUNT_OFFSETS,
CREATE_INTERMEDIATE_DOCS,
SORT_INTERMEDIATE_DOCS,
SIZING,
FINALIZING_DOCS,
FORCE,
FINISHED,
}
public void convert() throws IOException {
deleteOldFiles();
@ -55,28 +73,32 @@ public class ReverseIndexPriorityConverter {
return;
}
final IndexJournalStatistics statistics = journalReader.getStatistics();
final Path intermediateUrlsFile = Files.createTempFile(tmpFileDir, "urls-sorted", ".dat");
try (var progress = heartbeat.createServiceProcessHeartbeat(TaskSteps.class, "reverseIndexPriorityConverter")) {
progress.progress(TaskSteps.ACCUMULATE_STATISTICS);
try {
final IndexJournalStatistics statistics = journalReader.getStatistics();
final long wordsFileSize = statistics.highestWord() + 1;
progress.progress(TaskSteps.INCREMENT_OFFSETS);
logger.debug("Words file size: {}", wordsFileSize);
// Create a count of how many documents has contains each word
final LongArray wordsOffsets = LongArray.allocate(wordsFileSize);
logger.info("Gathering Offsets");
journalReader.forEachWordId(wordsOffsets::increment);
progress.progress(TaskSteps.COUNT_OFFSETS);
wordsOffsets.transformEach(0, wordsFileSize, new CountToOffsetTransformer(ReverseIndexPriorityParameters.ENTRY_SIZE));
progress.progress(TaskSteps.CREATE_INTERMEDIATE_DOCS);
// Construct an intermediate representation of the reverse documents index
try (FileChannel intermediateDocChannel =
(FileChannel) Files.newByteChannel(intermediateUrlsFile,
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE))
{
logger.info("Creating Intermediate Docs File");
// Construct intermediate index
try (RandomWriteFunnel intermediateDocumentWriteFunnel = new RandomWriteFunnel(tmpFileDir, RWF_BIN_SIZE);
@ -87,8 +109,7 @@ public class ReverseIndexPriorityConverter {
intermediateDocumentWriteFunnel.write(intermediateDocChannel);
}
intermediateDocChannel.force(false);
logger.info("Sorting Intermediate Docs File");
progress.progress(TaskSteps.SORT_INTERMEDIATE_DOCS);
// Sort each segment of the intermediate file
{
@ -100,32 +121,29 @@ public class ReverseIndexPriorityConverter {
intermediateDocs.force();
}
progress.progress(TaskSteps.SIZING);
logger.info("Sizing");
IndexSizeEstimator indexSizeEstimator = new IndexSizeEstimator(
ReverseIndexPriorityParameters.bTreeContext,
IndexSizeEstimator sizeEstimator = new IndexSizeEstimator(
bTreeContext,
ReverseIndexPriorityParameters.ENTRY_SIZE);
wordsOffsets.fold(0, 0, wordsOffsets.size(), indexSizeEstimator);
wordsOffsets.fold(0, 0, wordsOffsets.size(), sizeEstimator);
progress.progress(TaskSteps.FINALIZING_DOCS);
logger.info("Finalizing Docs File");
LongArray finalDocs = LongArray.mmapForWriting(outputFileDocs, indexSizeEstimator.size);
LongArray finalDocs = LongArray.mmapForWriting(outputFileDocs, sizeEstimator.size);
// Construct the proper reverse index
wordsOffsets.transformEachIO(0, wordsOffsets.size(),
new ReverseIndexBTreeTransformer(finalDocs,
ReverseIndexPriorityParameters.ENTRY_SIZE,
ReverseIndexPriorityParameters.bTreeContext,
intermediateDocChannel));
wordsOffsets.transformEachIO(0, wordsOffsets.size(), new ReverseIndexBTreeTransformer(finalDocs, ReverseIndexPriorityParameters.ENTRY_SIZE, bTreeContext, intermediateDocChannel));
wordsOffsets.write(outputFileWords);
progress.progress(TaskSteps.FORCE);
// Attempt to clean up before forcing (important disk space preservation)
Files.deleteIfExists(intermediateUrlsFile);
wordsOffsets.force();
finalDocs.force();
logger.info("Done");
progress.progress(TaskSteps.FINISHED);
}
} catch (IOException ex) {

View File

@ -13,9 +13,11 @@ import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.test.TestUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -84,7 +86,9 @@ class ReverseIndexFullConverterTest {
var docsFile = dataDir.resolve("docs.dat");
var journalReader = new IndexJournalReaderSingleCompressedFile(indexFile);
new ReverseIndexFullConverter(tmpDir, journalReader, new DomainRankings(), wordsFile, docsFile)
new ReverseIndexFullConverter(
Mockito.mock(ServiceHeartbeat.class),
tmpDir, journalReader, new DomainRankings(), wordsFile, docsFile)
.convert();
var reverseIndexReader = new ReverseIndexFullReader(wordsFile, docsFile);

View File

@ -14,10 +14,12 @@ import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.test.TestUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -117,7 +119,7 @@ class ReverseIndexFullConverterTest2 {
Path tmpDir = Path.of("/tmp");
new ReverseIndexFullConverter(tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert();
new ReverseIndexFullConverter(Mockito.mock(ServiceHeartbeat.class), tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert();
var reverseReader = new ReverseIndexFullReader(wordsFile, docsFile);
@ -142,7 +144,7 @@ class ReverseIndexFullConverterTest2 {
Path tmpDir = Path.of("/tmp");
new ReverseIndexFullConverter(tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert();
new ReverseIndexFullConverter(Mockito.mock(ServiceHeartbeat.class), tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert();
var reverseReader = new ReverseIndexFullReader(wordsFile, docsFile);

View File

@ -14,10 +14,12 @@ import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.test.TestUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -117,7 +119,7 @@ class ReverseIndexPriorityConverterTest2 {
Path tmpDir = Path.of("/tmp");
new ReverseIndexPriorityConverter(tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert();
new ReverseIndexPriorityConverter(Mockito.mock(ServiceHeartbeat.class), tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert();
var reverseReader = new ReverseIndexPriorityReader(wordsFile, docsFile);
@ -142,7 +144,7 @@ class ReverseIndexPriorityConverterTest2 {
Path tmpDir = Path.of("/tmp");
new ReverseIndexPriorityConverter(tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert();
new ReverseIndexPriorityConverter(Mockito.mock(ServiceHeartbeat.class), tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert();
var reverseReader = new ReverseIndexPriorityReader(wordsFile, docsFile);

View File

@ -266,11 +266,11 @@ public class ControlService extends Service {
}
private Object processesModel(Request request, Response response) {
var heartbeatsAll = heartbeatService.getProcessHeartbeats();
var byIsJob = heartbeatsAll.stream().collect(Collectors.partitioningBy(ProcessHeartbeat::isServiceJob));
var processes = heartbeatService.getProcessHeartbeats();
var jobs = heartbeatService.getTaskHeartbeats();
return Map.of("processes", byIsJob.get(false),
"jobs", byIsJob.get(true),
return Map.of("processes", processes,
"jobs", jobs,
"actors", controlActorService.getActorStates(),
"messages", messageQueueViewService.getLastEntries(20));
}

View File

@ -3,6 +3,7 @@ package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.model.ProcessHeartbeat;
import nu.marginalia.control.model.ServiceHeartbeat;
import nu.marginalia.control.svc.HeartbeatService;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.mqsm.StateFactory;
@ -11,6 +12,7 @@ import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Singleton
public class ProcessLivenessMonitorActor extends AbstractStateGraph {
@ -46,12 +48,33 @@ public class ProcessLivenessMonitorActor extends AbstractStateGraph {
public void monitor() throws Exception {
for (;;) {
var processHeartbeats = heartbeatService.getProcessHeartbeats();
for (var heartbeat : heartbeatService.getProcessHeartbeats()) {
if (!heartbeat.isRunning()) {
continue;
}
var processId = heartbeat.getProcessId();
if (null == processId)
continue;
if (processService.isRunning(processId) && heartbeat.lastSeenMillis() < 10000) {
continue;
}
heartbeatService.flagProcessAsStopped(heartbeat);
}
var livingServices = heartbeatService.getServiceHeartbeats().stream()
.filter(ServiceHeartbeat::alive)
.map(ServiceHeartbeat::uuidFull)
.collect(Collectors.toSet());
for (var heartbeat : heartbeatService.getTaskHeartbeats()) {
if (!livingServices.contains(heartbeat.serviceUuuidFull())) {
heartbeatService.removeTaskHeartbeat(heartbeat);
}
}
processHeartbeats.stream()
.filter(ProcessHeartbeat::isRunning)
.filter(p -> !processService.isRunning(p.getProcessId()))
.forEach(heartbeatService::flagProcessAsStopped);
TimeUnit.SECONDS.sleep(60);
}

View File

@ -44,7 +44,17 @@ public record ProcessHeartbeat(
case "loader" -> ProcessService.ProcessId.LOADER;
case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR;
case "crawl-job-extractor" -> ProcessService.ProcessId.CRAWL_JOB_EXTRACTOR;
default -> throw new RuntimeException("Unknown process base: " + processBase);
default -> null;
};
}
public String displayName() {
var pid = getProcessId();
if (pid != null) {
return pid.name();
}
else {
return processBase;
}
}
}

View File

@ -0,0 +1,29 @@
package nu.marginalia.control.model;
public record TaskHeartbeat(
String taskName,
String taskBase,
String serviceUuuidFull,
double lastSeenMillis,
Integer progress,
String stage,
String status
) {
public boolean isStopped() {
return "STOPPED".equals(status);
}
public boolean isRunning() {
return "RUNNING".equals(status);
}
public String progressStyle() {
if ("RUNNING".equals(status) && progress != null) {
return """
background: linear-gradient(90deg, #ccc 0%%, #ccc %d%%, #fff %d%%)
""".formatted(progress, progress, progress);
}
return "";
}
}

View File

@ -5,6 +5,7 @@ import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.control.model.ProcessHeartbeat;
import nu.marginalia.control.model.ServiceHeartbeat;
import nu.marginalia.control.model.TaskHeartbeat;
import nu.marginalia.service.control.ServiceEventLog;
import java.sql.SQLException;
@ -51,6 +52,49 @@ public class HeartbeatService {
return heartbeats;
}
public List<TaskHeartbeat> getTaskHeartbeats() {
List<TaskHeartbeat> heartbeats = new ArrayList<>();
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT TASK_NAME, TASK_BASE, SERVICE_INSTANCE, STATUS, STAGE_NAME, PROGRESS, TIMESTAMPDIFF(MICROSECOND, TASK_HEARTBEAT.HEARTBEAT_TIME, CURRENT_TIMESTAMP(6)) AS TSDIFF
FROM TASK_HEARTBEAT
INNER JOIN SERVICE_HEARTBEAT ON SERVICE_HEARTBEAT.`INSTANCE` = SERVICE_INSTANCE
""")) {
var rs = stmt.executeQuery();
while (rs.next()) {
int progress = rs.getInt("PROGRESS");
heartbeats.add(new TaskHeartbeat(
rs.getString("TASK_NAME"),
rs.getString("TASK_BASE"),
rs.getString("SERVICE_INSTANCE"),
rs.getLong("TSDIFF") / 1000.,
progress < 0 ? null : progress,
rs.getString("STAGE_NAME"),
rs.getString("STATUS")
));
}
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
return heartbeats;
}
public void removeTaskHeartbeat(TaskHeartbeat heartbeat) {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
DELETE FROM TASK_HEARTBEAT
WHERE SERVICE_INSTANCE = ?
""")) {
stmt.setString(1, heartbeat.serviceUuuidFull());
stmt.executeUpdate();
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
public List<ProcessHeartbeat> getProcessHeartbeats() {
List<ProcessHeartbeat> heartbeats = new ArrayList<>();
@ -99,5 +143,4 @@ public class HeartbeatService {
throw new RuntimeException(ex);
}
}
}

View File

@ -32,8 +32,7 @@ public class ProcessService {
CONVERTER("converter-process/bin/converter-process"),
LOADER("loader-process/bin/loader-process"),
ADJACENCIES_CALCULATOR("website-adjacencies-calculator/bin/website-adjacencies-calculator"),
CRAWL_JOB_EXTRACTOR("crawl-job-extractor-process/bin/crawl-job-extractor-process"),
CRAWL_JOB_EXTRACTOR("crawl-job-extractor-process/bin/crawl-job-extractor-process")
;
public final String path;

View File

@ -16,7 +16,7 @@
<script src="/refresh.js"></script>
<script>
window.setInterval(() => {
refresh(["processes", "actors", "queue"]);
refresh(["processes", "jobs", "actors", "queue"]);
}, 2000);
</script>
</html>

View File

@ -1,5 +1,6 @@
<h1>Processes</h1>
<table id="processes">
<tr>
<th>Process ID</th>
@ -10,7 +11,7 @@
</tr>
{{#each processes}}
<tr class="{{#if isMissing}}missing{{/if}}">
<td>{{processId}}</td>
<td>{{displayName}}</td>
<td title="{{uuidFull}}">
<span style="background-color: {{uuidColor}}" class="uuidPip">&nbsp;</span><span style="background-color: {{uuidColor2}}" class="uuidPip">&nbsp;</span>
{{uuid}}
@ -21,3 +22,21 @@
</tr>
{{/each}}
</table>
<h1>Jobs</h1>
<table id="jobs">
<tr>
<th>Process ID</th>
<th>Status</th>
<th>Progress</th>
<th>Last Seen (ms)</th>
</tr>
{{#each jobs}}
<tr class="{{#if isMissing}}missing{{/if}}">
<td>{{taskBase}}</td>
<td>{{status}}</td>
<td style="{{progressStyle}}">{{#if progress}}{{progress}}%{{/if}} {{stage}}</td>
<td>{{#unless isStopped}}{{lastSeenMillis}}{{/unless}}</td>
</tr>
{{/each}}
</table>

View File

@ -15,9 +15,9 @@ import nu.marginalia.index.full.ReverseIndexFullConverter;
import nu.marginalia.index.priority.ReverseIndexPriorityReader;
import nu.marginalia.index.priority.ReverseIndexPriorityParameters;
import nu.marginalia.index.full.ReverseIndexFullReader;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.index.index.SearchIndexReader;
import nu.marginalia.service.control.ServiceHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,6 +33,7 @@ import java.util.stream.Stream;
@Singleton
public class IndexServicesFactory {
private final Path tmpFileDir;
private final ServiceHeartbeat heartbeat;
private final Path liveStorage;
private final Path stagingStorage;
@ -55,8 +56,10 @@ public class IndexServicesFactory {
@Inject
public IndexServicesFactory(
ServiceHeartbeat heartbeat,
FileStorageService fileStorageService
) throws IOException, SQLException {
this.heartbeat = heartbeat;
liveStorage = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE).asPath();
stagingStorage = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING).asPath();
@ -100,17 +103,34 @@ public class IndexServicesFactory {
).noneMatch(Files::exists);
}
enum ConvertSteps {
FORWARD_INDEX,
FULL_REVERSE_INDEX,
PRIORITY_REVERSE_INDEX,
FINISHED
}
public void convertIndex(DomainRankings domainRankings) throws IOException {
try (var hb = heartbeat.createServiceProcessHeartbeat(ConvertSteps.class, "index-conversion")) {
hb.progress(ConvertSteps.FORWARD_INDEX);
convertForwardIndex(domainRankings);
hb.progress(ConvertSteps.FULL_REVERSE_INDEX);
convertFullReverseIndex(domainRankings);
hb.progress(ConvertSteps.PRIORITY_REVERSE_INDEX);
convertPriorityReverseIndex(domainRankings);
hb.progress(ConvertSteps.FINISHED);
}
}
private void convertFullReverseIndex(DomainRankings domainRankings) throws IOException {
logger.info("Converting full reverse index {}", writerIndexFile);
var journalReader = new IndexJournalReaderSingleCompressedFile(writerIndexFile);
var converter = new ReverseIndexFullConverter(tmpFileDir,
var converter = new ReverseIndexFullConverter(
heartbeat,
tmpFileDir,
journalReader,
domainRankings,
revIndexWords.get(NEXT_PART).toPath(),
@ -128,7 +148,8 @@ public class IndexServicesFactory {
var journalReader = new IndexJournalReaderSingleCompressedFile(writerIndexFile, null,
ReverseIndexPriorityParameters::filterPriorityRecord);
var converter = new ReverseIndexPriorityConverter(tmpFileDir,
var converter = new ReverseIndexPriorityConverter(heartbeat,
tmpFileDir,
journalReader,
domainRankings,
revPrioIndexWords.get(NEXT_PART).toPath(),
@ -144,7 +165,8 @@ public class IndexServicesFactory {
logger.info("Converting forward index data {}", writerIndexFile);
new ForwardIndexConverter(writerIndexFile.toFile(),
new ForwardIndexConverter(heartbeat,
writerIndexFile.toFile(),
fwdIndexDocId.get(NEXT_PART).toPath(),
fwdIndexDocData.get(NEXT_PART).toPath(),
domainRankings)

View File

@ -21,6 +21,7 @@ import nu.marginalia.index.config.RankingSettings;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.index.client.model.query.SearchSetIdentifier;
import nu.marginalia.index.db.DbUpdateRanks;
import nu.marginalia.service.control.ServiceHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,6 +31,7 @@ import java.io.IOException;
public class IndexSearchSetsService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final DomainTypes domainTypes;
private final ServiceHeartbeat heartbeat;
private final DbUpdateRanks dbUpdateRanks;
private final RankingDomainFetcher similarityDomains;
private final RankingSettings rankingSettings;
@ -47,12 +49,14 @@ public class IndexSearchSetsService {
@Inject
public IndexSearchSetsService(DomainTypes domainTypes,
ServiceHeartbeat heartbeat,
RankingDomainFetcher rankingDomains,
RankingDomainFetcherForSimilarityData similarityDomains,
RankingSettings rankingSettings,
IndexServicesFactory servicesFactory,
DbUpdateRanks dbUpdateRanks) throws IOException {
this.domainTypes = domainTypes;
this.heartbeat = heartbeat;
this.dbUpdateRanks = dbUpdateRanks;
@ -90,12 +94,34 @@ public class IndexSearchSetsService {
};
}
enum RepartitionSteps {
UPDATE_ACADEMIA,
UPDATE_RETRO,
UPDATE_SMALL_WEB,
UPDATE_BLOGS,
UPDATE_RANKINGS,
FINISHED
}
public void recalculateAll() {
try (var processHeartbeat = heartbeat.createServiceProcessHeartbeat(RepartitionSteps.class, "repartitionAll")) {
processHeartbeat.progress(RepartitionSteps.UPDATE_ACADEMIA);
updateAcademiaDomainsSet();
processHeartbeat.progress(RepartitionSteps.UPDATE_RETRO);
updateRetroDomainsSet();
processHeartbeat.progress(RepartitionSteps.UPDATE_SMALL_WEB);
updateSmallWebDomainsSet();
processHeartbeat.progress(RepartitionSteps.UPDATE_BLOGS);
updateBlogsSet();
processHeartbeat.progress(RepartitionSteps.UPDATE_RANKINGS);
updateDomainRankings();
processHeartbeat.progress(RepartitionSteps.FINISHED);
}
}
private void updateDomainRankings() {

View File

@ -63,6 +63,7 @@ public class IndexQueryServiceIntegrationTestModule extends AbstractModule {
when(fileStorageServiceMock.getStorageByType(FileStorageType.INDEX_STAGING)).thenReturn(new FileStorage(null, null, null, slowDir.toString(), null));
var servicesFactory = new IndexServicesFactory(
Mockito.mock(ServiceHeartbeat.class),
fileStorageServiceMock
);
bind(IndexServicesFactory.class).toInstance(servicesFactory);