From 2ad564404eafe67854930e49161f404d861754b7 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Tue, 23 Jul 2024 15:14:25 +0200 Subject: [PATCH] (loader) Add heartbeat to update domain-ids step --- .../nu/marginalia/loading/LoaderMain.java | 6 +++--- .../loading/domains/DomainLoaderService.java | 20 +++++++++++++++++-- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java index 43b22168..4171337f 100644 --- a/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java @@ -8,8 +8,6 @@ import lombok.Getter; import lombok.SneakyThrows; import nu.marginalia.ProcessConfiguration; import nu.marginalia.ProcessConfigurationModule; -import nu.marginalia.service.ProcessMainClass; -import nu.marginalia.storage.FileStorageService; import nu.marginalia.linkdb.docs.DocumentDbWriter; import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService; @@ -22,7 +20,9 @@ import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.inbox.MqInboxResponse; import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.process.control.ProcessHeartbeatImpl; +import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; +import nu.marginalia.storage.FileStorageService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +103,7 @@ public class LoaderMain extends ProcessMainClass { void run(LoadRequest instructions) { LoaderInputData inputData = instructions.getInputData(); - DomainIdRegistry domainIdRegistry = domainService.getOrCreateDomainIds(inputData); + DomainIdRegistry domainIdRegistry = domainService.getOrCreateDomainIds(heartbeat, inputData); try { var results = ForkJoinPool.commonPool() diff --git a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java index 8d72a50a..342645dd 100644 --- a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java +++ b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.*; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; @Singleton public class DomainLoaderService { @@ -36,21 +38,29 @@ public class DomainLoaderService { this.nodeId = processConfiguration.node(); } + enum Steps { + PREP_DATA, + INSERT_NEW, + FETCH_ALL, + DONE + } /** Read the domain names from each parquet file * compare with SQL domain database, fetch those * that exist, insert those that don't. */ - public DomainIdRegistry getOrCreateDomainIds(LoaderInputData inputData) + public DomainIdRegistry getOrCreateDomainIds(ProcessHeartbeatImpl heartbeat, LoaderInputData inputData) throws IOException, SQLException { Set domainNamesAll = new HashSet<>(100_000); DomainIdRegistry ret = new DomainIdRegistry(); try (var conn = dataSource.getConnection(); + var taskHeartbeat = heartbeat.createProcessTaskHeartbeat(Steps.class, "DOMAIN_IDS"); var selectStmt = conn.prepareStatement(""" SELECT ID FROM EC_DOMAIN WHERE DOMAIN_NAME=? """) ) { + taskHeartbeat.progress(Steps.PREP_DATA); try (var inserter = new DomainInserter(conn, nodeId)) { for (var domainWithIp : readBasicDomainInformation(inputData)) { @@ -65,12 +75,16 @@ public class DomainLoaderService { } } + taskHeartbeat.progress(Steps.INSERT_NEW); + try (var updater = new DomainAffinityAndIpUpdater(conn, nodeId)) { for (var domainWithIp : readBasicDomainInformation(inputData)) { updater.accept(new EdgeDomain(domainWithIp.domain), domainWithIp.ip); } } + taskHeartbeat.progress(Steps.FETCH_ALL); + selectStmt.setFetchSize(1000); for (var domain : domainNamesAll) { selectStmt.setString(1, domain); @@ -82,6 +96,8 @@ public class DomainLoaderService { logger.error("Unknown domain {}", domain); } } + + taskHeartbeat.progress(Steps.DONE); } return ret;