diff --git a/code/common/config/java/nu/marginalia/storage/FileStorageService.java b/code/common/config/java/nu/marginalia/storage/FileStorageService.java index 0b675272..6917a654 100644 --- a/code/common/config/java/nu/marginalia/storage/FileStorageService.java +++ b/code/common/config/java/nu/marginalia/storage/FileStorageService.java @@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory; import com.google.inject.Inject; import com.google.inject.Singleton; + import java.io.File; import java.io.IOException; import java.nio.file.*; @@ -29,7 +30,8 @@ public class FileStorageService { private static final DateTimeFormatter dirNameDatePattern = DateTimeFormatter.ofPattern("__uu-MM-dd'T'HH_mm_ss.SSS"); // filesystem safe ISO8601 @Inject - public FileStorageService(HikariDataSource dataSource, @Named("wmsa-system-node") Integer node) { + public FileStorageService(HikariDataSource dataSource, + @Named("wmsa-system-node") Integer node) { this.dataSource = dataSource; this.node = node; @@ -124,6 +126,7 @@ public class FileStorageService { } } + public void relateFileStorages(FileStorageId source, FileStorageId target) { try (var conn = dataSource.getConnection(); var stmt = conn.prepareStatement(""" diff --git a/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java b/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java index c09bdc27..a1636477 100644 --- a/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java +++ b/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java @@ -15,6 +15,7 @@ import nu.marginalia.service.discovery.ServiceRegistryIf; import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServicePartition; import nu.marginalia.service.ServiceId; +import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorageId; import org.slf4j.Logger; @@ -22,8 +23,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; -import java.net.URISyntaxException; -import java.net.URLEncoder; +import java.net.*; import java.nio.charset.StandardCharsets; import java.util.List; @@ -161,17 +161,17 @@ public class ExecutorClient { } } - public void transferFile(int node, FileStorageId fileId, String path, OutputStream destOutputStream) { - String uriPath = STR."/transfer/file/\{fileId.id()}"; + public URL remoteFileURL(FileStorage fileStorage, String path) { + String uriPath = STR."/transfer/file/\{fileStorage.id()}"; String uriQuery = STR."path=\{URLEncoder.encode(path, StandardCharsets.UTF_8)}"; - var service = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, node)) + var service = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, fileStorage.node())) .stream().findFirst().orElseThrow(); - try (var urlStream = service.endpoint().toURL(uriPath, uriQuery).openStream()) { - urlStream.transferTo(destOutputStream); + try { + return service.endpoint().toURL(uriPath, uriQuery); } - catch (IOException | URISyntaxException ex) { + catch (URISyntaxException|MalformedURLException ex) { throw new RuntimeException(ex); } } diff --git a/code/services-core/control-service/build.gradle b/code/services-core/control-service/build.gradle index ae793b89..6214672e 100644 --- a/code/services-core/control-service/build.gradle +++ b/code/services-core/control-service/build.gradle @@ -50,6 +50,8 @@ dependencies { implementation libs.zstd implementation libs.handlebars + implementation libs.duckdb + implementation libs.trove implementation libs.spark implementation libs.fastutil diff --git a/code/services-core/control-service/java/nu/marginalia/control/node/model/FileStorageWithRelatedEntries.java b/code/services-core/control-service/java/nu/marginalia/control/node/model/FileStorageWithRelatedEntries.java index cc648dd2..3a55cf9b 100644 --- a/code/services-core/control-service/java/nu/marginalia/control/node/model/FileStorageWithRelatedEntries.java +++ b/code/services-core/control-service/java/nu/marginalia/control/node/model/FileStorageWithRelatedEntries.java @@ -1,6 +1,7 @@ package nu.marginalia.control.node.model; import nu.marginalia.storage.model.FileStorage; +import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageType; import java.util.List; @@ -13,4 +14,7 @@ public record FileStorageWithRelatedEntries(FileStorageWithActions self, return self().storage().type(); } + public FileStorageId getId() { + return self.storage().id(); + } } diff --git a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlCrawlDataService.java b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlCrawlDataService.java new file mode 100644 index 00000000..659cf0a1 --- /dev/null +++ b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlCrawlDataService.java @@ -0,0 +1,198 @@ +package nu.marginalia.control.node.svc; + +import com.google.common.base.Strings; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.executor.client.ExecutorClient; +import nu.marginalia.nodecfg.NodeConfigurationService; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import spark.Request; +import spark.Response; + +import javax.annotation.Nullable; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +/** Service for inspecting crawl data within the control service. + * + * Uses remote calls to the executor service to fetch information about the crawl data. + * Both directly, when inspecting the crawler log, and indirectly via duckdb when + * inspecting the parquet files. The duckdb calls rely on range queries to fetch + * only the relevant data from the files, so that the UI remains responsive even when + * dealing with large (100MB+ files). + */ +@Singleton +public class ControlCrawlDataService { + private final ExecutorClient executorClient; + private final FileStorageService fileStorageService; + private final NodeConfigurationService nodeConfigurationService; + + private static final Logger logger = LoggerFactory.getLogger(ControlCrawlDataService.class); + + @Inject + public ControlCrawlDataService(ExecutorClient executorClient, + FileStorageService fileStorageService, NodeConfigurationService nodeConfigurationService) + { + this.executorClient = executorClient; + this.fileStorageService = fileStorageService; + this.nodeConfigurationService = nodeConfigurationService; + } + + + + public Object crawlParquetInfo(Request request, Response response) throws SQLException { + int nodeId = Integer.parseInt(request.params("id")); + var fsid = FileStorageId.parse(request.params("fid")); + + String path = request.queryParams("path"); + + var url = executorClient.remoteFileURL(fileStorageService.getStorage(fsid), path).toString(); + + List byStatusCode = new ArrayList<>(); + List byContentType = new ArrayList<>(); + + List records = new ArrayList<>(); + + String domain; + try (var conn = DriverManager.getConnection("jdbc:duckdb:"); + var stmt = conn.createStatement()) + { + ResultSet rs; + + rs = stmt.executeQuery(DUCKDB."SELECT domain FROM \{url} LIMIT 1"); + domain = rs.next() ? rs.getString(1) : "NO DOMAIN"; + + rs = stmt.executeQuery(DUCKDB.""" + SELECT httpStatus, COUNT(*) as cnt FROM \{url} + GROUP BY httpStatus + ORDER BY httpStatus + """); + while (rs.next()) { + byStatusCode.add(new SummaryStatusCode(rs.getInt(1), rs.getInt(2))); + } + + rs = stmt.executeQuery(DUCKDB.""" + SELECT contentType, COUNT(*) as cnt + FROM \{url} + GROUP BY contentType + ORDER BY contentType + """); + while (rs.next()) { + byContentType.add(new SummaryContentType(rs.getString(1), rs.getInt(2))); + } + + rs = stmt.executeQuery(DUCKDB.""" + SELECT url, contentType, httpStatus, body != '', etagHeader, lastModifiedHeader + FROM \{url} LIMIT 10 + """); + while (rs.next()) { + records.add(new CrawlDataRecordSummary(rs.getString(1), rs.getString(2), rs.getInt(3), rs.getBoolean(4), rs.getString(5), rs.getString(6))); + } + } + + return Map.of( + "tab", Map.of("storage", true), + "view", Map.of("crawl", true), + "node", nodeConfigurationService.get(nodeId), + "storage", fileStorageService.getStorage(fsid), + "path", path, + "domain", domain, + "byStatusCode", byStatusCode, + "byContentType", byContentType, + "records", records) + ; + } + + public ControlCrawlDataService.CrawlDataFileList getCrawlDataFiles(FileStorageId fsid, + @Nullable String filterDomain, + @Nullable String afterDomain) { + List crawlDataFiles = new ArrayList<>(); + + try (var br = new BufferedReader(new InputStreamReader((executorClient.remoteFileURL(fileStorageService.getStorage(fsid), "crawler.log").openStream())))) { + Stream str = br.lines() + .filter(s -> !s.isBlank()) + .filter(s -> !s.startsWith("#")) + .map(s -> { + String[] parts = s.split("\\s+"); + return new ControlCrawlDataService.CrawlDataFile(parts[0], parts[2], Integer.parseInt(parts[3])); + }); + + if (!Strings.isNullOrEmpty(afterDomain)) { + str = str.dropWhile(s -> !s.domain().equals(afterDomain)).skip(1); + } + + if (!Strings.isNullOrEmpty(filterDomain)) { + str = str.filter(s -> s.domain().toLowerCase().contains(filterDomain.toLowerCase())); + } + + str.limit(10).forEach(crawlDataFiles::add); + } + catch (Exception ex) { + logger.warn("Failed to fetch crawler.log", ex); + } + + return new ControlCrawlDataService.CrawlDataFileList( + crawlDataFiles, + filterDomain, + afterDomain); + } + + public record SummaryContentType(String contentType, int count) {} + + public record SummaryStatusCode(int statusCode, int count) {} + + public record CrawlDataRecordSummary(String url, String contentType, int httpStatus, boolean hasBody, String etag, String lastModified) {} + + public record CrawlDataFile(String domain, String path, int count) {} + + public record CrawlDataFileList(List files, + @Nullable String filter, + @Nullable String after) + { + + // Used by the template to determine if there are more files to show, + // looks unused in the IDE but it's not + public String nextAfter() { + if (files.isEmpty()) + return ""; + if (files.size() < 10) + return ""; + + return files.getLast().domain(); + } + } + + // DuckDB template processor that deals with quoting and escaping values + // in the SQL query; this offers a very basic protection against accidental SQL injection + static StringTemplate.Processor DUCKDB = st -> { + StringBuilder sb = new StringBuilder(); + Iterator fragmentsIter = st.fragments().iterator(); + + for (Object value : st.values()) { + sb.append(fragmentsIter.next()); + + if (value instanceof Number) { // don't quote numbers + sb.append(value); + } else { + String valueStr = value.toString().replace("'", "''"); + sb.append("'").append(valueStr).append("'"); + } + } + + sb.append(fragmentsIter.next()); + + return sb.toString(); + }; + +} diff --git a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlFileStorageService.java b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlFileStorageService.java index ea1b8d47..3757f9c5 100644 --- a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlFileStorageService.java +++ b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlFileStorageService.java @@ -60,8 +60,7 @@ public class ControlFileStorageService { return redirectToOverview(request); } - public Object downloadFileFromStorage(Request request, Response response) throws IOException { - int nodeId = Integer.parseInt(request.params("id")); + public Object downloadFileFromStorage(Request request, Response response) throws IOException, SQLException { var fileStorageId = FileStorageId.parse(request.params("fid")); String path = request.queryParams("path"); @@ -73,7 +72,11 @@ public class ControlFileStorageService { else response.type("application/octet-stream"); - executorClient.transferFile(nodeId, fileStorageId, path, response.raw().getOutputStream()); + var storage = fileStorageService.getStorage(fileStorageId); + + try (var urlStream = executorClient.remoteFileURL(storage, path).openStream()) { + urlStream.transferTo(response.raw().getOutputStream()); + } return ""; } @@ -100,6 +103,7 @@ public class ControlFileStorageService { return ""; } + public Object flagFileForDeletionRequest(Request request, Response response) throws SQLException { FileStorageId fid = new FileStorageId(Long.parseLong(request.params(":fid"))); fileStorageService.flagFileForDeletion(fid); diff --git a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeService.java b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeService.java index bc87a921..05d53f9f 100644 --- a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeService.java +++ b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeService.java @@ -1,5 +1,6 @@ package nu.marginalia.control.node.svc; +import com.google.common.base.Strings; import com.google.inject.Inject; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; @@ -23,10 +24,16 @@ import spark.Request; import spark.Response; import spark.Spark; +import javax.annotation.Nullable; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.nio.file.Path; +import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; +import java.util.stream.Stream; public class ControlNodeService { private final FileStorageService fileStorageService; @@ -39,6 +46,8 @@ public class ControlNodeService { private final RedirectControl redirectControl; private final NodeConfigurationService nodeConfigurationService; + private final ControlCrawlDataService crawlDataService; + private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject @@ -51,7 +60,7 @@ public class ControlNodeService { HikariDataSource dataSource, ServiceMonitors monitors, RedirectControl redirectControl, - NodeConfigurationService nodeConfigurationService) + NodeConfigurationService nodeConfigurationService, ControlCrawlDataService crawlDataService) { this.fileStorageService = fileStorageService; this.rendererFactory = rendererFactory; @@ -62,6 +71,7 @@ public class ControlNodeService { this.monitors = monitors; this.redirectControl = redirectControl; this.nodeConfigurationService = nodeConfigurationService; + this.crawlDataService = crawlDataService; } public void register() throws IOException { @@ -72,6 +82,7 @@ public class ControlNodeService { var storageConfRenderer = rendererFactory.renderer("control/node/node-storage-conf"); var storageListRenderer = rendererFactory.renderer("control/node/node-storage-list"); var storageDetailsRenderer = rendererFactory.renderer("control/node/node-storage-details"); + var storageCrawlParquetDetails = rendererFactory.renderer("control/node/node-storage-crawl-parquet-details"); var configRenderer = rendererFactory.renderer("control/node/node-config"); @@ -84,6 +95,8 @@ public class ControlNodeService { Spark.get("/nodes/:id/storage/conf", this::nodeStorageConfModel, storageConfRenderer::render); Spark.get("/nodes/:id/storage/details", this::nodeStorageDetailsModel, storageDetailsRenderer::render); + Spark.get("/nodes/:id/storage/:fid/crawl-parquet-info", crawlDataService::crawlParquetInfo, storageCrawlParquetDetails::render); + Spark.post("/nodes/:id/process/:processBase/stop", this::stopProcess, redirectControl.renderRedirectAcknowledgement("Stopping", "../..") ); @@ -210,7 +223,8 @@ public class ControlNodeService { private Object nodeStorageDetailsModel(Request request, Response response) throws SQLException { int nodeId = Integer.parseInt(request.params("id")); - var storage = getFileStorageWithRelatedEntries(nodeId, FileStorageId.parse(request.queryParams("fid"))); + var fsid = FileStorageId.parse(request.queryParams("fid")); + var storage = getFileStorageWithRelatedEntries(nodeId, fsid); String view = switch(storage.type()) { case BACKUP -> "backup"; @@ -221,14 +235,26 @@ public class ControlNodeService { default -> throw new IllegalStateException(storage.type().toString()); }; - return Map.of( - "tab", Map.of("storage", true), - "view", Map.of(view, true), - "node", nodeConfigurationService.get(nodeId), - "storage", storage - ); + var ret = new HashMap<>(); + + ret.put("tab", Map.of("storage", true)); + ret.put("view", Map.of(view, true)); + ret.put("node", nodeConfigurationService.get(nodeId)); + ret.put("storage", storage); + + if (storage.type() == FileStorageType.CRAWL_DATA) { + var cdFiles = crawlDataService.getCrawlDataFiles(fsid, + request.queryParams("filterDomain"), + request.queryParams("afterDomain") + ); + ret.put("crawlDataFiles", cdFiles); + } + + return ret; + } + private Object nodeConfigModel(Request request, Response response) throws SQLException { int nodeId = Integer.parseInt(request.params("id")); diff --git a/code/services-core/control-service/resources/templates/control/node/node-storage-crawl-parquet-details.hdb b/code/services-core/control-service/resources/templates/control/node/node-storage-crawl-parquet-details.hdb new file mode 100644 index 00000000..aa0b230a --- /dev/null +++ b/code/services-core/control-service/resources/templates/control/node/node-storage-crawl-parquet-details.hdb @@ -0,0 +1,89 @@ + + +{{> control/partials/head-includes }} +Control Service: Node {{node.id}} + +{{> control/partials/nav}} + +
+ + + + {{> control/node/partial-node-nav }} + +
+ {{>control/partials/storage-types}} + +

Crawl Parquet Info

+ +

Summary

+ + + + + + + + + + + {{#each byStatusCode}} + + + + + {{/each}} + + + + + {{#each byContentType}} + + + + + {{/each}} +
DomainFilename
{{domain}} + {{path}} +
HTTP StatusCount
{{statusCode}}{{count}}
Content TypeCount
{{contentType}}{{count}}
+ +

Contents

+ + + + + + + + + + + + {{#each records}} + + + + + + + + + + + {{/each}} +
URLContent TypeHTTP StatusHas Body
ETagLast Modified
+ {{url}} + {{contentType}}{{httpStatus}}{{#if hasBody}}✓{{/if}}
{{etag}}{{lastModified}}
+
+ + + +
+ +{{> control/partials/foot-includes }} + \ No newline at end of file diff --git a/code/services-core/control-service/resources/templates/control/node/node-storage-details.hdb b/code/services-core/control-service/resources/templates/control/node/node-storage-details.hdb index 0bb1e14d..60b18bb8 100644 --- a/code/services-core/control-service/resources/templates/control/node/node-storage-details.hdb +++ b/code/services-core/control-service/resources/templates/control/node/node-storage-details.hdb @@ -62,6 +62,39 @@ {{/with}} + {{#if view.crawl}} +

Crawl Data

+ + + + + + + + + + + + + + {{#each crawlDataFiles.files}} + + + + + {{/each}} + + {{#if crawlDataFiles.nextAfter}} + + + + + {{/if}} + +
DomainCount
{{domain}}{{count}}
+ Next +
+ {{/if}} {{#with storage}} {{>control/partials/storage-details/related}} diff --git a/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorFileTransferService.java b/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorFileTransferService.java new file mode 100644 index 00000000..7111c3b3 --- /dev/null +++ b/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorFileTransferService.java @@ -0,0 +1,105 @@ +package nu.marginalia.executor; + +import com.google.inject.Inject; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageId; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import spark.Request; +import spark.Response; +import spark.Spark; + +import javax.servlet.ServletOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.SQLException; + +public class ExecutorFileTransferService { + private final FileStorageService fileStorageService; + private static final Logger logger = LoggerFactory.getLogger(ExecutorFileTransferService.class); + + @Inject + public ExecutorFileTransferService(FileStorageService fileStorageService) { + this.fileStorageService = fileStorageService; + } + + /** Allows transfer of files from each partition */ + public Object transferFile(Request request, Response response) throws SQLException, IOException { + + FileStorageId fileStorageId = FileStorageId.parse(request.params("fid")); + + var fileStorage = fileStorageService.getStorage(fileStorageId); + + Path basePath = fileStorage.asPath(); + + String path = request.queryParams("path").replaceAll("%2F", "/"); + Path filePath = basePath.resolve(path).normalize(); + + // ensure filePath is within basePath + // even though this is an internal API, it's better to be safe than sorry + if (!filePath.startsWith(basePath)) { + response.status(403); + return "Forbidden"; + } + + // Announce that we support byte ranges + response.header("Accept-Ranges", "bytes"); + + // Set the content type to binary + response.type("application/octet-stream"); + + String range = request.headers("Range"); + if (range != null) { + String[] ranges = StringUtils.split(range, '='); + if (ranges.length != 2 || !"bytes".equals(ranges[0])) { + logger.warn("Invalid range header in {}: {}", filePath, range); + Spark.halt(400, "Invalid range header"); + } + + String[] rangeValues = StringUtils.split(ranges[1], '-'); + if (rangeValues.length != 2) { + logger.warn("Invalid range header in {}: {}", filePath, range); + Spark.halt(400, "Invalid range header"); + } + + long start = Long.parseLong(rangeValues[0].trim()); + long end = Long.parseLong(rangeValues[1].trim()); + long contentLength = end - start + 1; + response.header("Content-Range", "bytes " + start + "-" + end + "/" + Files.size(filePath)); + response.header("Content-Length", String.valueOf(contentLength)); + response.status(206); + + if ("HEAD".equalsIgnoreCase(request.requestMethod())) { + return ""; + } + + serveFile(filePath, response.raw().getOutputStream(), start, end + 1); + } else { + response.header("Content-Length", String.valueOf(Files.size(filePath))); + response.status(200); + + if ("HEAD".equalsIgnoreCase(request.requestMethod())) { + return ""; + } + + serveFile(filePath, response.raw().getOutputStream()); + } + + return ""; + } + + private void serveFile(Path filePath, ServletOutputStream outputStream) throws IOException { + try (var is = Files.newInputStream(filePath)) { + IOUtils.copy(is, outputStream); + } + } + + private void serveFile(Path filePath, ServletOutputStream outputStream, long start, long end) throws IOException { + try (var is = Files.newInputStream(filePath)) { + IOUtils.copyLarge(is, outputStream, start, end - start); + } + } +} diff --git a/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorSvc.java b/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorSvc.java index a84eebd3..6d7bda5c 100644 --- a/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorSvc.java +++ b/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorSvc.java @@ -6,18 +6,10 @@ import nu.marginalia.service.discovery.property.ServicePartition; import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.Service; import nu.marginalia.service.server.mq.MqRequest; -import nu.marginalia.storage.FileStorageService; -import nu.marginalia.storage.model.FileStorageId; -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import spark.Request; -import spark.Response; import spark.Spark; -import java.io.IOException; -import java.nio.file.Path; -import java.sql.SQLException; import java.util.List; // Weird name for this one to not have clashes with java.util.concurrent.ExecutorService @@ -25,7 +17,6 @@ public class ExecutorSvc extends Service { private static final Logger logger = LoggerFactory.getLogger(ExecutorSvc.class); private final ExecutionInit executionInit; - private final FileStorageService fileStorageService; @Inject public ExecutorSvc(BaseServiceParams params, @@ -34,7 +25,7 @@ public class ExecutorSvc extends Service { ExecutorSideloadGrpcService executorSideloadGrpcService, ExecutorExportGrpcService executorExportGrpcService, ExecutionInit executionInit, - FileStorageService fileStorageService) + ExecutorFileTransferService fileTransferService) { super(params, ServicePartition.partition(params.configuration.node()), @@ -45,9 +36,9 @@ public class ExecutorSvc extends Service { ); this.executionInit = executionInit; - this.fileStorageService = fileStorageService; - Spark.get("/transfer/file/:fid", this::transferFile); + Spark.get("/transfer/file/:fid", fileTransferService::transferFile); + Spark.head("/transfer/file/:fid", fileTransferService::transferFile); } @MqRequest(endpoint="FIRST-BOOT") @@ -57,19 +48,6 @@ public class ExecutorSvc extends Service { executionInit.initDefaultActors(); } - /** Allows transfer of files from each partition */ - private Object transferFile(Request request, Response response) throws SQLException, IOException { - FileStorageId fileStorageId = FileStorageId.parse(request.params("fid")); - var fileStorage = fileStorageService.getStorage(fileStorageId); - - Path basePath = fileStorage.asPath(); - // This is not a public API so injection isn't a concern - Path filePath = basePath.resolve(request.queryParams("path")); - - response.type("application/octet-stream"); - FileUtils.copyFile(filePath.toFile(), response.raw().getOutputStream()); - return ""; - } } diff --git a/code/services-core/executor-service/test/nu/marginalia/executor/ExecutorFileTransferServiceTest.java b/code/services-core/executor-service/test/nu/marginalia/executor/ExecutorFileTransferServiceTest.java new file mode 100644 index 00000000..4fa6d881 --- /dev/null +++ b/code/services-core/executor-service/test/nu/marginalia/executor/ExecutorFileTransferServiceTest.java @@ -0,0 +1,54 @@ +package nu.marginalia.executor; + +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorage; +import nu.marginalia.storage.model.FileStorageId; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import spark.Spark; + +import java.sql.DriverManager; +import java.sql.SQLException; + +import static org.mockito.Mockito.when; + +class ExecutorFileTransferServiceTest { + + @Test + public void test() throws SQLException, InterruptedException { + var fileStorage = Mockito.mock(FileStorageService.class); + + when(fileStorage.getStorage(Mockito.any(FileStorageId.class))).thenReturn(new FileStorage(null, + null, + null, + null, + "/tmp", + null, + null)); + + var svc = new ExecutorFileTransferService(fileStorage); + + Spark.port(9998); + Spark.get("/transfer/file/:fid", svc::transferFile); + Spark.head("/transfer/file/:fid", svc::transferFile); + + Spark.init(); + + Thread.sleep(1000); + + + try (var conn = DriverManager.getConnection("jdbc:duckdb:"); + var stmt = conn.createStatement()) + { + var rs = stmt.executeQuery(""" + SELECT COUNT(*) AS cnt, httpStatus + FROM 'http://hostname:9998/transfer/file/0?path=crawl.parquet' + GROUP BY httpStatus + """); + while (rs.next()) { + System.out.println(rs.getInt("CNT") + " " + rs.getInt("httpStatus")); + } + } + for(;;); + } +} \ No newline at end of file