(actor) Reset NEW flag earlier when auto-deletion is disabled

Don't wait until the loader step is finished to reset the NEW flag, as this leaves manually processed (but not yet loaded) crawl data stuck in "CREATING" in the GUI.
This commit is contained in:
Viktor Lofgren 2024-07-31 10:31:03 +02:00
parent dc5c668940
commit 2ef66ce0ca

View File

@ -113,6 +113,12 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
if (rsp.state() != MqMessageState.OK) if (rsp.state() != MqMessageState.OK)
yield new Error("Converter failed"); yield new Error("Converter failed");
if (!shouldAutoClean()) {
// If we're not auto-cleaning, we need to clean the NEW flag for the processed storage
storageService.setFileStorageState(processedId, FileStorageState.UNSET);
// (if we do auto-clean, we skip this step and purge the items after loading)
}
yield new Load(List.of(processedId)); yield new Load(List.of(processedId));
} }
case Load(List<FileStorageId> processedIds, long msgId) when msgId < 0 -> { case Load(List<FileStorageId> processedIds, long msgId) when msgId < 0 -> {
@ -140,9 +146,20 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
if (rsp.state() != MqMessageState.OK) { if (rsp.state() != MqMessageState.OK) {
yield new Error("Loader failed"); yield new Error("Loader failed");
} else {
cleanProcessedStorage(processedIds);
} }
// If we're auto-cleaning, flag the processed files for deletion if they have the NEW flag,
// indicating they've recently been created. We need to check this, so we don't delete archived
// stuff that's being loaded manually
if (shouldAutoClean()) {
for (var id : processedIds) {
if (FileStorageState.NEW.equals(storageService.getStorage(id).state())) {
storageService.flagFileForDeletion(id);
}
}
}
yield new Backup(processedIds); yield new Backup(processedIds);
} }
case Backup(List<FileStorageId> processedIds) -> { case Backup(List<FileStorageId> processedIds) -> {
@ -204,6 +221,16 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
return mqIndexConstructorOutbox.sendAsync(new CreateIndexRequest(index)); return mqIndexConstructorOutbox.sendAsync(new CreateIndexRequest(index));
} }
private boolean shouldAutoClean() {
try {
return nodeConfigurationService.get(nodeId).autoClean();
}
catch (SQLException ex) {
logger.error("Error getting node configuration", ex);
return false; // safe dafault
}
}
@Override @Override
public String describe() { public String describe() {
@ -233,24 +260,5 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
this.nodeId = serviceConfiguration.node(); this.nodeId = serviceConfiguration.node();
} }
private void cleanProcessedStorage(List<FileStorageId> processedStorageId) {
try {
var config = nodeConfigurationService.get(nodeId);
for (var id : processedStorageId) {
if (FileStorageState.NEW.equals(storageService.getStorage(id).state())) {
if (config.autoClean()) {
storageService.flagFileForDeletion(id);
}
else {
storageService.setFileStorageState(id, FileStorageState.UNSET);
}
}
}
}
catch (SQLException ex) {
logger.error("Error in clean-up", ex);
}
}
} }