From 45d3e6aa716996f53c09a2f5cd9970708ae23ba9 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Fri, 4 Oct 2024 13:19:09 +0200 Subject: [PATCH] (download-sample) Break apart actor for better error recovery Change also adds logged events to give more feedback that something is happening. --- .../service/control/ServiceEventLog.java | 3 + .../actor/task/DownloadSampleActor.java | 151 ++++++++++++------ 2 files changed, 103 insertions(+), 51 deletions(-) diff --git a/code/common/service/java/nu/marginalia/service/control/ServiceEventLog.java b/code/common/service/java/nu/marginalia/service/control/ServiceEventLog.java index d5768917..c6b9321a 100644 --- a/code/common/service/java/nu/marginalia/service/control/ServiceEventLog.java +++ b/code/common/service/java/nu/marginalia/service/control/ServiceEventLog.java @@ -36,6 +36,9 @@ public class ServiceEventLog { logEvent("SVC-START", serviceName); } + public void logEvent(Class type, String message) { + logEvent(type.getSimpleName(), message); + } public void logEvent(String type, String message) { try (var conn = dataSource.getConnection(); diff --git a/code/execution/java/nu/marginalia/actor/task/DownloadSampleActor.java b/code/execution/java/nu/marginalia/actor/task/DownloadSampleActor.java index 48c5262c..e8f2d228 100644 --- a/code/execution/java/nu/marginalia/actor/task/DownloadSampleActor.java +++ b/code/execution/java/nu/marginalia/actor/task/DownloadSampleActor.java @@ -4,17 +4,21 @@ import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; +import nu.marginalia.actor.state.ActorResumeBehavior; import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.actor.state.Resume; +import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.storage.FileStorageService; -import nu.marginalia.storage.model.*; +import nu.marginalia.storage.model.FileStorage; +import nu.marginalia.storage.model.FileStorageId; +import nu.marginalia.storage.model.FileStorageState; +import nu.marginalia.storage.model.FileStorageType; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.io.*; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; @@ -27,9 +31,20 @@ import java.nio.file.attribute.PosixFilePermissions; public class DownloadSampleActor extends RecordActorPrototype { private final FileStorageService storageService; + private final ServiceEventLog eventLog; private final Logger logger = LoggerFactory.getLogger(getClass()); + @Resume(behavior = ActorResumeBehavior.ERROR) public record Run(String setName) implements ActorStep {} + @Resume(behavior = ActorResumeBehavior.RETRY) + public record Download(FileStorageId fileStorageId, String url, String fileName) implements ActorStep {} + @Resume(behavior = ActorResumeBehavior.ERROR) + public record Extract(FileStorageId fileStorageId, String tarFile) implements ActorStep {} + @Resume(behavior = ActorResumeBehavior.ERROR) + public record SuccessCleanup(FileStorageId fileStorageId, String tarFile) implements ActorStep {} + @Resume(behavior = ActorResumeBehavior.ERROR) + public record ErrorCleanup(FileStorageId id) implements ActorStep {} + @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { @@ -40,65 +55,97 @@ public class DownloadSampleActor extends RecordActorPrototype { "Sample " + setName); storageService.setFileStorageState(newStorage.id(), FileStorageState.NEW); + String downloadURI = getDownloadURL(setName).toString(); + String tarFileName = Files.createTempFile(newStorage.asPath(), "download", ".tar").toString(); - URL downloadURI = getDownloadURL(setName); + yield new Download(newStorage.id(), downloadURI, tarFileName); + } + case Download(FileStorageId fileStorageId, String downloadURI, String tarFileName) -> { - try { - downloadArchive(downloadURI, newStorage.asPath()); + eventLog.logEvent(DownloadSampleActor.class, "Downloading sample from " + downloadURI); + + Files.deleteIfExists(Path.of(tarFileName)); + + try (var is = new BufferedInputStream(new URI(downloadURI).toURL().openStream()); + var os = new BufferedOutputStream(Files.newOutputStream(Path.of(tarFileName), StandardOpenOption.CREATE))) { + is.transferTo(os); } - catch (IOException ex) { + catch (Exception ex) { + eventLog.logEvent(DownloadSampleActor.class, "Error downloading sample"); logger.error("Error downloading sample", ex); - storageService.flagFileForDeletion(newStorage.id()); yield new Error(); } - finally { - storageService.setFileStorageState(newStorage.id(), FileStorageState.UNSET); + + eventLog.logEvent(DownloadSampleActor.class, "Download complete"); + yield new Extract(fileStorageId, tarFileName); + } + case Extract(FileStorageId fileStorageId, String tarFileName) -> { + Path outputPath = storageService.getStorage(fileStorageId).asPath(); + + eventLog.logEvent(getClass().getSimpleName(), "Extracting sample to " + outputPath); + try (var tar = new TarArchiveInputStream(Files.newInputStream(Path.of(tarFileName)))) { + TarArchiveEntry nextEntry; + byte[] buffer = new byte[8192]; + + while ((nextEntry = tar.getNextEntry()) != null) { + // Poll for interruption, to ensure this can be cancelled + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + if (nextEntry.isDirectory()) { + continue; + } + + Path outputFile = outputPath.resolve(nextEntry.getName()); + Files.createDirectories(outputFile.getParent(), + PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxr-xr-x")) + ); + + long size = nextEntry.getSize(); + + // Extract tar entry + try (var fos = Files.newOutputStream(outputFile, StandardOpenOption.CREATE)) { + transferBytes(tar, fos, buffer, size); + } + + if (Files.isDirectory(outputFile)) { + Files.setPosixFilePermissions(outputFile, PosixFilePermissions.fromString("rwxr-xr-x")); + } + else { + Files.setPosixFilePermissions(outputFile, PosixFilePermissions.fromString("rw-r--r--")); + } + } + } + catch (Exception ex) { + logger.error("Error extracting sample", ex); + eventLog.logEvent(DownloadSampleActor.class, "Error extracting sample"); + + storageService.flagFileForDeletion(fileStorageId); + yield new ErrorCleanup(fileStorageId); } + eventLog.logEvent(DownloadSampleActor.class, "Extraction complete"); + yield new SuccessCleanup(fileStorageId, tarFileName); + } + case SuccessCleanup(FileStorageId fileStorageId, String tarFile) -> { + Files.deleteIfExists(Path.of(tarFile)); + storageService.setFileStorageState(fileStorageId, FileStorageState.UNSET); yield new End(); } + case ErrorCleanup(FileStorageId id) -> { + storageService.flagFileForDeletion(id); + yield new Error(); + } default -> new Error(); }; } - private void downloadArchive(URL downloadURI, Path outputPath) throws IOException, InterruptedException { - // See the documentation for commons compress: - // https://commons.apache.org/proper/commons-compress/examples.html - - try (var tar = new TarArchiveInputStream(downloadURI.openStream())) { - TarArchiveEntry nextEntry; - byte[] buffer = new byte[8192]; - - while ((nextEntry = tar.getNextEntry()) != null) { - // Poll for interruption, to ensure this can be cancelled - if (Thread.interrupted()) { - throw new InterruptedException(); - } - - if (nextEntry.isDirectory()) { - continue; - } - - Path outputFile = outputPath.resolve(nextEntry.getName()); - Files.createDirectories(outputFile.getParent(), - PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxr-xr-x")) - ); - - long size = nextEntry.getSize(); - - // Extract tar entry - try (var fos = Files.newOutputStream(outputFile, StandardOpenOption.CREATE)) { - transferBytes(tar, fos, buffer, size); - } - - if (Files.isDirectory(outputPath)) { - Files.setPosixFilePermissions(outputPath, PosixFilePermissions.fromString("rwxr-xr-x")); - } - else { - Files.setPosixFilePermissions(outputPath, PosixFilePermissions.fromString("rw-r--r--")); - } - } - } + enum States { + DOWNLOADING, + EXTRACTING, + CLEANUP, + END } private void transferBytes(InputStream inputStream, OutputStream outputStream, byte[] buffer, long size) @@ -119,7 +166,7 @@ public class DownloadSampleActor extends RecordActorPrototype { private URL getDownloadURL(String setName) throws MalformedURLException { - return URI.create(STR."https://downloads.marginalia.nu/samples/\{setName}.tar").toURL(); + return URI.create("https://downloads.marginalia.nu/samples/" + setName + ".tar").toURL(); } @Override @@ -129,10 +176,12 @@ public class DownloadSampleActor extends RecordActorPrototype { @Inject public DownloadSampleActor(Gson gson, - FileStorageService storageService) + FileStorageService storageService, + ServiceEventLog eventLog) { super(gson); this.storageService = storageService; + this.eventLog = eventLog; } }