(actor) Improve resilience for the migration actor

This commit is contained in:
Viktor Lofgren 2025-01-29 14:43:09 +01:00
parent 39cd1c18f8
commit 6ece6a6cfb

View File

@ -60,30 +60,35 @@ public class MigrateCrawlDataActor extends RecordActorPrototype {
for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) {
var entry = item.getKey();
var path = item.getValue();
final WorkLogEntry entry = item.getKey();
final Path inputPath = item.getValue();
heartbeat.progress("Migrating" + path.toFile().getName(), entryIdx++, totalEntries);
Path outputPath = inputPath;
heartbeat.progress("Migrating" + inputPath.getFileName(), entryIdx++, totalEntries);
if (path.toFile().getName().endsWith(".parquet") && Files.exists(path)) {
try {
if (inputPath.toString().endsWith(".parquet")) {
String domain = entry.id();
String id = Integer.toHexString(domain.hashCode());
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
outputPath = CrawlerOutputFile.createSlopPath(root, id, domain);
SlopCrawlDataRecord.convertFromParquet(path, outputFile);
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
}
catch (Exception ex) {
logger.error("Failed to convert " + path, ex);
if (Files.exists(inputPath)) {
try {
SlopCrawlDataRecord.convertFromParquet(inputPath, outputPath);
} catch (Exception ex) {
outputPath = inputPath; // don't update the work log on error
logger.error("Failed to convert " + inputPath, ex);
}
}
else {
workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt());
else if (!Files.exists(inputPath) && !Files.exists(outputPath)) {
// if the input file is missing, and the output file is missing, we just write the log
// record identical to the old one
outputPath = inputPath;
}
}
// Write a log entry for the (possibly) converted file
workLog.setJobToFinished(entry.id(), outputPath.toString(), entry.cnt());
}
}