(loader) Additional tracking for the control GUI

This commit is contained in:
Viktor Lofgren 2024-07-28 21:19:45 +02:00
parent 314a901bf0
commit 261dcdadc8

View File

@ -39,7 +39,7 @@ public class DomainLoaderService {
enum Steps { enum Steps {
PREP_DATA, PREP_DATA,
INSERT_NEW, UPDATE_AFFINITY_AND_IP,
FETCH_ALL, FETCH_ALL,
DONE DONE
} }
@ -61,36 +61,61 @@ public class DomainLoaderService {
) { ) {
taskHeartbeat.progress(Steps.PREP_DATA); taskHeartbeat.progress(Steps.PREP_DATA);
// Add domain names from this data set with the current node affinity Collection<SlopPageRef<SlopDomainRecord>> domainPageRefs = inputData.listDomainPages();
for (SlopPageRef<SlopDomainRecord> page : inputData.listDomainPages()) { Collection<SlopPageRef<SlopDomainLinkRecord>> domainLinkPageRefs = inputData.listDomainLinkPages();
// Ensure that the domains we've just crawled are in the domain database to this node
try (var inserter = new DomainInserter(conn, nodeId); try (var inserter = new DomainInserter(conn, nodeId);
var reader = new SlopDomainRecord.DomainNameReader(page) var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_CRAWLED_DOMAINS")) {
) { // Add domain names from this data set with the current node affinity
int pageIdx = 0;
for (SlopPageRef<SlopDomainRecord> page : inputData.listDomainPages()) {
processHeartbeat.progress("INSERT", pageIdx++, domainPageRefs.size());
try (var reader = new SlopDomainRecord.DomainNameReader(page)) {
while (reader.hasMore()) { while (reader.hasMore()) {
String domainName = reader.next(); String domainName = reader.next();
if (domainNamesAll.add(domainName)) {
inserter.accept(new EdgeDomain(domainName)); inserter.accept(new EdgeDomain(domainName));
domainNamesAll.add(domainName); }
}
} }
} }
} }
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node // Add domains that are linked to from the domains we've just crawled, but with -1 affinity meaning they
for (SlopPageRef<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) { // can be grabbed by any index node
try (var inserter = new DomainInserter(conn, -1); try (var inserter = new DomainInserter(conn, -1);
var reader = new SlopDomainLinkRecord.Reader(page)) { var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("INSERT_LINKED_DOMAINS")) {
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node
int pageIdx = 0;
for (SlopPageRef<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) {
processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size());
try (var reader = new SlopDomainLinkRecord.Reader(page)) {
while (reader.hasMore()) { while (reader.hasMore()) {
SlopDomainLinkRecord record = reader.next(); SlopDomainLinkRecord record = reader.next();
inserter.accept(new EdgeDomain(record.dest())); String domainName = record.dest();
domainNamesAll.add(record.dest()); if (domainNamesAll.add(domainName)) {
inserter.accept(new EdgeDomain(domainName));
}
}
} }
} }
} }
taskHeartbeat.progress(Steps.INSERT_NEW); taskHeartbeat.progress(Steps.UPDATE_AFFINITY_AND_IP);
// Update the node affinity and IP address for each domain we have information about
try (var processHeartbeat = heartbeat.createAdHocTaskHeartbeat("UPDATE_AFFINITY_AND_IP")) {
// Update the node affinity and IP address for each domain // Update the node affinity and IP address for each domain
int pageIdx = 0;
for (SlopPageRef<SlopDomainRecord> page : inputData.listDomainPages()) { for (SlopPageRef<SlopDomainRecord> page : inputData.listDomainPages()) {
processHeartbeat.progress("UPDATE", pageIdx++, domainPageRefs.size());
try (var updater = new DomainAffinityAndIpUpdater(conn, nodeId); try (var updater = new DomainAffinityAndIpUpdater(conn, nodeId);
var reader = new SlopDomainRecord.DomainWithIpReader(page) var reader = new SlopDomainRecord.DomainWithIpReader(page)
) { ) {
@ -100,6 +125,7 @@ public class DomainLoaderService {
} }
} }
} }
}
taskHeartbeat.progress(Steps.FETCH_ALL); taskHeartbeat.progress(Steps.FETCH_ALL);
selectStmt.setFetchSize(1000); selectStmt.setFetchSize(1000);