(loader) Add heartbeat to update domain-ids step

This commit is contained in:
Viktor Lofgren 2024-07-23 15:14:25 +02:00
parent 2bb9f18411
commit 2ad564404e
2 changed files with 21 additions and 5 deletions

View File

@ -8,8 +8,6 @@ import lombok.Getter;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.ProcessConfiguration; import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.linkdb.docs.DocumentDbWriter; import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService;
@ -22,7 +20,9 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.inbox.MqInboxResponse; import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -103,7 +103,7 @@ public class LoaderMain extends ProcessMainClass {
void run(LoadRequest instructions) { void run(LoadRequest instructions) {
LoaderInputData inputData = instructions.getInputData(); LoaderInputData inputData = instructions.getInputData();
DomainIdRegistry domainIdRegistry = domainService.getOrCreateDomainIds(inputData); DomainIdRegistry domainIdRegistry = domainService.getOrCreateDomainIds(heartbeat, inputData);
try { try {
var results = ForkJoinPool.commonPool() var results = ForkJoinPool.commonPool()

View File

@ -19,7 +19,9 @@ import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.*; import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@Singleton @Singleton
public class DomainLoaderService { public class DomainLoaderService {
@ -36,21 +38,29 @@ public class DomainLoaderService {
this.nodeId = processConfiguration.node(); this.nodeId = processConfiguration.node();
} }
enum Steps {
PREP_DATA,
INSERT_NEW,
FETCH_ALL,
DONE
}
/** Read the domain names from each parquet file /** Read the domain names from each parquet file
* compare with SQL domain database, fetch those * compare with SQL domain database, fetch those
* that exist, insert those that don't. * that exist, insert those that don't.
*/ */
public DomainIdRegistry getOrCreateDomainIds(LoaderInputData inputData) public DomainIdRegistry getOrCreateDomainIds(ProcessHeartbeatImpl heartbeat, LoaderInputData inputData)
throws IOException, SQLException throws IOException, SQLException
{ {
Set<String> domainNamesAll = new HashSet<>(100_000); Set<String> domainNamesAll = new HashSet<>(100_000);
DomainIdRegistry ret = new DomainIdRegistry(); DomainIdRegistry ret = new DomainIdRegistry();
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var taskHeartbeat = heartbeat.createProcessTaskHeartbeat(Steps.class, "DOMAIN_IDS");
var selectStmt = conn.prepareStatement(""" var selectStmt = conn.prepareStatement("""
SELECT ID FROM EC_DOMAIN WHERE DOMAIN_NAME=? SELECT ID FROM EC_DOMAIN WHERE DOMAIN_NAME=?
""") """)
) { ) {
taskHeartbeat.progress(Steps.PREP_DATA);
try (var inserter = new DomainInserter(conn, nodeId)) { try (var inserter = new DomainInserter(conn, nodeId)) {
for (var domainWithIp : readBasicDomainInformation(inputData)) { for (var domainWithIp : readBasicDomainInformation(inputData)) {
@ -65,12 +75,16 @@ public class DomainLoaderService {
} }
} }
taskHeartbeat.progress(Steps.INSERT_NEW);
try (var updater = new DomainAffinityAndIpUpdater(conn, nodeId)) { try (var updater = new DomainAffinityAndIpUpdater(conn, nodeId)) {
for (var domainWithIp : readBasicDomainInformation(inputData)) { for (var domainWithIp : readBasicDomainInformation(inputData)) {
updater.accept(new EdgeDomain(domainWithIp.domain), domainWithIp.ip); updater.accept(new EdgeDomain(domainWithIp.domain), domainWithIp.ip);
} }
} }
taskHeartbeat.progress(Steps.FETCH_ALL);
selectStmt.setFetchSize(1000); selectStmt.setFetchSize(1000);
for (var domain : domainNamesAll) { for (var domain : domainNamesAll) {
selectStmt.setString(1, domain); selectStmt.setString(1, domain);
@ -82,6 +96,8 @@ public class DomainLoaderService {
logger.error("Unknown domain {}", domain); logger.error("Unknown domain {}", domain);
} }
} }
taskHeartbeat.progress(Steps.DONE);
} }
return ret; return ret;