diff --git a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java index ac1fc763..fb6af988 100644 --- a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java +++ b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java @@ -39,7 +39,7 @@ public class DomainLoaderService { enum Steps { PREP_DATA, - INSERT_NEW, + UPDATE_AFFINITY_AND_IP, FETCH_ALL, DONE } @@ -61,42 +61,68 @@ public class DomainLoaderService { ) { taskHeartbeat.progress(Steps.PREP_DATA); - // Add domain names from this data set with the current node affinity - for (SlopPageRef page : inputData.listDomainPages()) { + Collection> domainPageRefs = inputData.listDomainPages(); + Collection> domainLinkPageRefs = inputData.listDomainLinkPages(); - try (var inserter = new DomainInserter(conn, nodeId); - var reader = new SlopDomainRecord.DomainNameReader(page) - ) { - while (reader.hasMore()) { - String domainName = reader.next(); - inserter.accept(new EdgeDomain(domainName)); - domainNamesAll.add(domainName); + // Ensure that the domains we've just crawled are in the domain database to this node + try (var inserter = new DomainInserter(conn, nodeId); + var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_CRAWLED_DOMAINS")) { + // Add domain names from this data set with the current node affinity + int pageIdx = 0; + + for (SlopPageRef page : inputData.listDomainPages()) { + processHeartbeat.progress("INSERT", pageIdx++, domainPageRefs.size()); + + try (var reader = new SlopDomainRecord.DomainNameReader(page)) { + while (reader.hasMore()) { + String domainName = reader.next(); + if (domainNamesAll.add(domainName)) { + inserter.accept(new EdgeDomain(domainName)); + } + } } } } - // Add linked domains, but with -1 affinity meaning they can be grabbed by any index node - for (SlopPageRef page : inputData.listDomainLinkPages()) { - try (var inserter = new DomainInserter(conn, -1); - var reader = new SlopDomainLinkRecord.Reader(page)) { - while (reader.hasMore()) { - SlopDomainLinkRecord record = reader.next(); - inserter.accept(new EdgeDomain(record.dest())); - domainNamesAll.add(record.dest()); + // Add domains that are linked to from the domains we've just crawled, but with -1 affinity meaning they + // can be grabbed by any index node + try (var inserter = new DomainInserter(conn, -1); + var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_LINKED_DOMAINS")) { + // Add linked domains, but with -1 affinity meaning they can be grabbed by any index node + int pageIdx = 0; + + for (SlopPageRef page : inputData.listDomainLinkPages()) { + processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size()); + + try (var reader = new SlopDomainLinkRecord.Reader(page)) { + while (reader.hasMore()) { + SlopDomainLinkRecord record = reader.next(); + String domainName = record.dest(); + if (domainNamesAll.add(domainName)) { + inserter.accept(new EdgeDomain(domainName)); + } + } } } } - taskHeartbeat.progress(Steps.INSERT_NEW); + taskHeartbeat.progress(Steps.UPDATE_AFFINITY_AND_IP); - // Update the node affinity and IP address for each domain - for (SlopPageRef page : inputData.listDomainPages()) { - try (var updater = new DomainAffinityAndIpUpdater(conn, nodeId); - var reader = new SlopDomainRecord.DomainWithIpReader(page) - ) { - while (reader.hasMore()) { - var domainWithIp = reader.next(); - updater.accept(new EdgeDomain(domainWithIp.domain()), domainWithIp.ip()); + // Update the node affinity and IP address for each domain we have information about + try (var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("UPDATE_AFFINITY_AND_IP")) { + // Update the node affinity and IP address for each domain + int pageIdx = 0; + + for (SlopPageRef page : inputData.listDomainPages()) { + processHeartbeat.progress("UPDATE", pageIdx++, domainPageRefs.size()); + + try (var updater = new DomainAffinityAndIpUpdater(conn, nodeId); + var reader = new SlopDomainRecord.DomainWithIpReader(page) + ) { + while (reader.hasMore()) { + var domainWithIp = reader.next(); + updater.accept(new EdgeDomain(domainWithIp.domain()), domainWithIp.ip()); + } } } }