(actor) Improve logging and error handling for data migration actor

This commit is contained in:
Viktor Lofgren 2025-01-28 15:34:36 +01:00
parent fb673de370
commit 1e50e392c6
2 changed files with 33 additions and 12 deletions

View File

@ -10,7 +10,9 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.util.*;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
/** WorkLog is a journal of work done by a process,
@ -61,6 +63,12 @@ public class WorkLog implements AutoCloseable, Closeable {
return new WorkLoadIterable<>(logFile, mapper);
}
public static int countEntries(Path crawlerLog) throws IOException{
try (var linesStream = Files.lines(crawlerLog)) {
return (int) linesStream.filter(WorkLogEntry::isJobId).count();
}
}
// Use synchro over concurrent set to avoid competing writes
// - correct is better than fast here, it's sketchy enough to use
// a PrintWriter

View File

@ -8,6 +8,7 @@ import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.process.log.WorkLogEntry;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.slop.SlopCrawlDataRecord;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage;
@ -26,14 +27,15 @@ import java.util.function.Function;
public class MigrateCrawlDataActor extends RecordActorPrototype {
private final FileStorageService fileStorageService;
private final ServiceHeartbeat serviceHeartbeat;
private static final Logger logger = LoggerFactory.getLogger(MigrateCrawlDataActor.class);
@Inject
public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService) {
public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService, ServiceHeartbeat serviceHeartbeat) {
super(gson);
this.fileStorageService = fileStorageService;
this.serviceHeartbeat = serviceHeartbeat;
}
public record Run(long fileStorageId) implements ActorStep {}
@ -49,28 +51,39 @@ public class MigrateCrawlDataActor extends RecordActorPrototype {
Path crawlerLog = root.resolve("crawler.log");
Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log");
try (WorkLog workLog = new WorkLog(newCrawlerLog)) {
int totalEntries = WorkLog.countEntries(crawlerLog);
try (WorkLog workLog = new WorkLog(newCrawlerLog);
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Migrating")
) {
int entryIdx = 0;
for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) {
var entry = item.getKey();
var path = item.getValue();
logger.info("Converting {}", entry.id());
heartbeat.progress("Migrating" + path.toFile().getName(), entryIdx++, totalEntries);
if (path.toFile().getName().endsWith(".parquet") && Files.exists(path)) {
try {
String domain = entry.id();
String id = Integer.toHexString(domain.hashCode());
if (path.toFile().getName().endsWith(".parquet")) {
String domain = entry.id();
String id = Integer.toHexString(domain.hashCode());
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
SlopCrawlDataRecord.convertFromParquet(path, outputFile);
SlopCrawlDataRecord.convertFromParquet(path, outputFile);
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
}
catch (Exception ex) {
logger.error("Failed to convert " + path, ex);
}
}
else {
workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt());
}
}
}