mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 13:09:00 +00:00
(download-sample) Break apart actor for better error recovery
Change also adds logged events to give more feedback that something is happening.
This commit is contained in:
parent
d84a2c183f
commit
45d3e6aa71
@ -36,6 +36,9 @@ public class ServiceEventLog {
|
|||||||
logEvent("SVC-START", serviceName);
|
logEvent("SVC-START", serviceName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void logEvent(Class<?> type, String message) {
|
||||||
|
logEvent(type.getSimpleName(), message);
|
||||||
|
}
|
||||||
public void logEvent(String type, String message) {
|
public void logEvent(String type, String message) {
|
||||||
|
|
||||||
try (var conn = dataSource.getConnection();
|
try (var conn = dataSource.getConnection();
|
||||||
|
@ -4,17 +4,21 @@ import com.google.gson.Gson;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||||
|
import nu.marginalia.actor.state.ActorResumeBehavior;
|
||||||
import nu.marginalia.actor.state.ActorStep;
|
import nu.marginalia.actor.state.ActorStep;
|
||||||
|
import nu.marginalia.actor.state.Resume;
|
||||||
|
import nu.marginalia.service.control.ServiceEventLog;
|
||||||
import nu.marginalia.storage.FileStorageService;
|
import nu.marginalia.storage.FileStorageService;
|
||||||
import nu.marginalia.storage.model.*;
|
import nu.marginalia.storage.model.FileStorage;
|
||||||
|
import nu.marginalia.storage.model.FileStorageId;
|
||||||
|
import nu.marginalia.storage.model.FileStorageState;
|
||||||
|
import nu.marginalia.storage.model.FileStorageType;
|
||||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||||
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.*;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
@ -27,9 +31,20 @@ import java.nio.file.attribute.PosixFilePermissions;
|
|||||||
public class DownloadSampleActor extends RecordActorPrototype {
|
public class DownloadSampleActor extends RecordActorPrototype {
|
||||||
|
|
||||||
private final FileStorageService storageService;
|
private final FileStorageService storageService;
|
||||||
|
private final ServiceEventLog eventLog;
|
||||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||||
|
|
||||||
|
@Resume(behavior = ActorResumeBehavior.ERROR)
|
||||||
public record Run(String setName) implements ActorStep {}
|
public record Run(String setName) implements ActorStep {}
|
||||||
|
@Resume(behavior = ActorResumeBehavior.RETRY)
|
||||||
|
public record Download(FileStorageId fileStorageId, String url, String fileName) implements ActorStep {}
|
||||||
|
@Resume(behavior = ActorResumeBehavior.ERROR)
|
||||||
|
public record Extract(FileStorageId fileStorageId, String tarFile) implements ActorStep {}
|
||||||
|
@Resume(behavior = ActorResumeBehavior.ERROR)
|
||||||
|
public record SuccessCleanup(FileStorageId fileStorageId, String tarFile) implements ActorStep {}
|
||||||
|
@Resume(behavior = ActorResumeBehavior.ERROR)
|
||||||
|
public record ErrorCleanup(FileStorageId id) implements ActorStep {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActorStep transition(ActorStep self) throws Exception {
|
public ActorStep transition(ActorStep self) throws Exception {
|
||||||
return switch(self) {
|
return switch(self) {
|
||||||
@ -40,65 +55,97 @@ public class DownloadSampleActor extends RecordActorPrototype {
|
|||||||
"Sample " + setName);
|
"Sample " + setName);
|
||||||
|
|
||||||
storageService.setFileStorageState(newStorage.id(), FileStorageState.NEW);
|
storageService.setFileStorageState(newStorage.id(), FileStorageState.NEW);
|
||||||
|
String downloadURI = getDownloadURL(setName).toString();
|
||||||
|
String tarFileName = Files.createTempFile(newStorage.asPath(), "download", ".tar").toString();
|
||||||
|
|
||||||
URL downloadURI = getDownloadURL(setName);
|
yield new Download(newStorage.id(), downloadURI, tarFileName);
|
||||||
|
}
|
||||||
|
case Download(FileStorageId fileStorageId, String downloadURI, String tarFileName) -> {
|
||||||
|
|
||||||
try {
|
eventLog.logEvent(DownloadSampleActor.class, "Downloading sample from " + downloadURI);
|
||||||
downloadArchive(downloadURI, newStorage.asPath());
|
|
||||||
|
Files.deleteIfExists(Path.of(tarFileName));
|
||||||
|
|
||||||
|
try (var is = new BufferedInputStream(new URI(downloadURI).toURL().openStream());
|
||||||
|
var os = new BufferedOutputStream(Files.newOutputStream(Path.of(tarFileName), StandardOpenOption.CREATE))) {
|
||||||
|
is.transferTo(os);
|
||||||
}
|
}
|
||||||
catch (IOException ex) {
|
catch (Exception ex) {
|
||||||
|
eventLog.logEvent(DownloadSampleActor.class, "Error downloading sample");
|
||||||
logger.error("Error downloading sample", ex);
|
logger.error("Error downloading sample", ex);
|
||||||
storageService.flagFileForDeletion(newStorage.id());
|
|
||||||
yield new Error();
|
yield new Error();
|
||||||
}
|
}
|
||||||
finally {
|
|
||||||
storageService.setFileStorageState(newStorage.id(), FileStorageState.UNSET);
|
eventLog.logEvent(DownloadSampleActor.class, "Download complete");
|
||||||
|
yield new Extract(fileStorageId, tarFileName);
|
||||||
|
}
|
||||||
|
case Extract(FileStorageId fileStorageId, String tarFileName) -> {
|
||||||
|
Path outputPath = storageService.getStorage(fileStorageId).asPath();
|
||||||
|
|
||||||
|
eventLog.logEvent(getClass().getSimpleName(), "Extracting sample to " + outputPath);
|
||||||
|
try (var tar = new TarArchiveInputStream(Files.newInputStream(Path.of(tarFileName)))) {
|
||||||
|
TarArchiveEntry nextEntry;
|
||||||
|
byte[] buffer = new byte[8192];
|
||||||
|
|
||||||
|
while ((nextEntry = tar.getNextEntry()) != null) {
|
||||||
|
// Poll for interruption, to ensure this can be cancelled
|
||||||
|
if (Thread.interrupted()) {
|
||||||
|
throw new InterruptedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nextEntry.isDirectory()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Path outputFile = outputPath.resolve(nextEntry.getName());
|
||||||
|
Files.createDirectories(outputFile.getParent(),
|
||||||
|
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxr-xr-x"))
|
||||||
|
);
|
||||||
|
|
||||||
|
long size = nextEntry.getSize();
|
||||||
|
|
||||||
|
// Extract tar entry
|
||||||
|
try (var fos = Files.newOutputStream(outputFile, StandardOpenOption.CREATE)) {
|
||||||
|
transferBytes(tar, fos, buffer, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Files.isDirectory(outputFile)) {
|
||||||
|
Files.setPosixFilePermissions(outputFile, PosixFilePermissions.fromString("rwxr-xr-x"));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Files.setPosixFilePermissions(outputFile, PosixFilePermissions.fromString("rw-r--r--"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
logger.error("Error extracting sample", ex);
|
||||||
|
eventLog.logEvent(DownloadSampleActor.class, "Error extracting sample");
|
||||||
|
|
||||||
|
storageService.flagFileForDeletion(fileStorageId);
|
||||||
|
yield new ErrorCleanup(fileStorageId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eventLog.logEvent(DownloadSampleActor.class, "Extraction complete");
|
||||||
|
yield new SuccessCleanup(fileStorageId, tarFileName);
|
||||||
|
}
|
||||||
|
case SuccessCleanup(FileStorageId fileStorageId, String tarFile) -> {
|
||||||
|
Files.deleteIfExists(Path.of(tarFile));
|
||||||
|
storageService.setFileStorageState(fileStorageId, FileStorageState.UNSET);
|
||||||
yield new End();
|
yield new End();
|
||||||
}
|
}
|
||||||
|
case ErrorCleanup(FileStorageId id) -> {
|
||||||
|
storageService.flagFileForDeletion(id);
|
||||||
|
yield new Error();
|
||||||
|
}
|
||||||
default -> new Error();
|
default -> new Error();
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private void downloadArchive(URL downloadURI, Path outputPath) throws IOException, InterruptedException {
|
enum States {
|
||||||
// See the documentation for commons compress:
|
DOWNLOADING,
|
||||||
// https://commons.apache.org/proper/commons-compress/examples.html
|
EXTRACTING,
|
||||||
|
CLEANUP,
|
||||||
try (var tar = new TarArchiveInputStream(downloadURI.openStream())) {
|
END
|
||||||
TarArchiveEntry nextEntry;
|
|
||||||
byte[] buffer = new byte[8192];
|
|
||||||
|
|
||||||
while ((nextEntry = tar.getNextEntry()) != null) {
|
|
||||||
// Poll for interruption, to ensure this can be cancelled
|
|
||||||
if (Thread.interrupted()) {
|
|
||||||
throw new InterruptedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nextEntry.isDirectory()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Path outputFile = outputPath.resolve(nextEntry.getName());
|
|
||||||
Files.createDirectories(outputFile.getParent(),
|
|
||||||
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxr-xr-x"))
|
|
||||||
);
|
|
||||||
|
|
||||||
long size = nextEntry.getSize();
|
|
||||||
|
|
||||||
// Extract tar entry
|
|
||||||
try (var fos = Files.newOutputStream(outputFile, StandardOpenOption.CREATE)) {
|
|
||||||
transferBytes(tar, fos, buffer, size);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Files.isDirectory(outputPath)) {
|
|
||||||
Files.setPosixFilePermissions(outputPath, PosixFilePermissions.fromString("rwxr-xr-x"));
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
Files.setPosixFilePermissions(outputPath, PosixFilePermissions.fromString("rw-r--r--"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void transferBytes(InputStream inputStream, OutputStream outputStream, byte[] buffer, long size)
|
private void transferBytes(InputStream inputStream, OutputStream outputStream, byte[] buffer, long size)
|
||||||
@ -119,7 +166,7 @@ public class DownloadSampleActor extends RecordActorPrototype {
|
|||||||
|
|
||||||
|
|
||||||
private URL getDownloadURL(String setName) throws MalformedURLException {
|
private URL getDownloadURL(String setName) throws MalformedURLException {
|
||||||
return URI.create(STR."https://downloads.marginalia.nu/samples/\{setName}.tar").toURL();
|
return URI.create("https://downloads.marginalia.nu/samples/" + setName + ".tar").toURL();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -129,10 +176,12 @@ public class DownloadSampleActor extends RecordActorPrototype {
|
|||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public DownloadSampleActor(Gson gson,
|
public DownloadSampleActor(Gson gson,
|
||||||
FileStorageService storageService)
|
FileStorageService storageService,
|
||||||
|
ServiceEventLog eventLog)
|
||||||
{
|
{
|
||||||
super(gson);
|
super(gson);
|
||||||
this.storageService = storageService;
|
this.storageService = storageService;
|
||||||
|
this.eventLog = eventLog;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user