mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 13:09:00 +00:00
(control) Partial implementation of inspection utility for crawl data
Uses duckdb and range queries to read the parquet files directly from the index partitions. UX is a bit rough but is in working order.
This commit is contained in:
parent
4fcd4a8197
commit
17dc00d05f
@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.*;
|
||||
@ -29,7 +30,8 @@ public class FileStorageService {
|
||||
private static final DateTimeFormatter dirNameDatePattern = DateTimeFormatter.ofPattern("__uu-MM-dd'T'HH_mm_ss.SSS"); // filesystem safe ISO8601
|
||||
|
||||
@Inject
|
||||
public FileStorageService(HikariDataSource dataSource, @Named("wmsa-system-node") Integer node) {
|
||||
public FileStorageService(HikariDataSource dataSource,
|
||||
@Named("wmsa-system-node") Integer node) {
|
||||
this.dataSource = dataSource;
|
||||
this.node = node;
|
||||
|
||||
@ -124,6 +126,7 @@ public class FileStorageService {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void relateFileStorages(FileStorageId source, FileStorageId target) {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var stmt = conn.prepareStatement("""
|
||||
|
@ -15,6 +15,7 @@ import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
@ -22,8 +23,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URLEncoder;
|
||||
import java.net.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
|
||||
@ -161,17 +161,17 @@ public class ExecutorClient {
|
||||
}
|
||||
}
|
||||
|
||||
public void transferFile(int node, FileStorageId fileId, String path, OutputStream destOutputStream) {
|
||||
String uriPath = STR."/transfer/file/\{fileId.id()}";
|
||||
public URL remoteFileURL(FileStorage fileStorage, String path) {
|
||||
String uriPath = STR."/transfer/file/\{fileStorage.id()}";
|
||||
String uriQuery = STR."path=\{URLEncoder.encode(path, StandardCharsets.UTF_8)}";
|
||||
|
||||
var service = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, node))
|
||||
var service = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, fileStorage.node()))
|
||||
.stream().findFirst().orElseThrow();
|
||||
|
||||
try (var urlStream = service.endpoint().toURL(uriPath, uriQuery).openStream()) {
|
||||
urlStream.transferTo(destOutputStream);
|
||||
try {
|
||||
return service.endpoint().toURL(uriPath, uriQuery);
|
||||
}
|
||||
catch (IOException | URISyntaxException ex) {
|
||||
catch (URISyntaxException|MalformedURLException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
@ -50,6 +50,8 @@ dependencies {
|
||||
implementation libs.zstd
|
||||
implementation libs.handlebars
|
||||
|
||||
implementation libs.duckdb
|
||||
|
||||
implementation libs.trove
|
||||
implementation libs.spark
|
||||
implementation libs.fastutil
|
||||
|
@ -1,6 +1,7 @@
|
||||
package nu.marginalia.control.node.model;
|
||||
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageType;
|
||||
|
||||
import java.util.List;
|
||||
@ -13,4 +14,7 @@ public record FileStorageWithRelatedEntries(FileStorageWithActions self,
|
||||
return self().storage().type();
|
||||
}
|
||||
|
||||
public FileStorageId getId() {
|
||||
return self.storage().id();
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,198 @@
|
||||
package nu.marginalia.control.node.svc;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.executor.client.ExecutorClient;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/** Service for inspecting crawl data within the control service.
|
||||
*
|
||||
* Uses remote calls to the executor service to fetch information about the crawl data.
|
||||
* Both directly, when inspecting the crawler log, and indirectly via duckdb when
|
||||
* inspecting the parquet files. The duckdb calls rely on range queries to fetch
|
||||
* only the relevant data from the files, so that the UI remains responsive even when
|
||||
* dealing with large (100MB+ files).
|
||||
*/
|
||||
@Singleton
|
||||
public class ControlCrawlDataService {
|
||||
private final ExecutorClient executorClient;
|
||||
private final FileStorageService fileStorageService;
|
||||
private final NodeConfigurationService nodeConfigurationService;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ControlCrawlDataService.class);
|
||||
|
||||
@Inject
|
||||
public ControlCrawlDataService(ExecutorClient executorClient,
|
||||
FileStorageService fileStorageService, NodeConfigurationService nodeConfigurationService)
|
||||
{
|
||||
this.executorClient = executorClient;
|
||||
this.fileStorageService = fileStorageService;
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public Object crawlParquetInfo(Request request, Response response) throws SQLException {
|
||||
int nodeId = Integer.parseInt(request.params("id"));
|
||||
var fsid = FileStorageId.parse(request.params("fid"));
|
||||
|
||||
String path = request.queryParams("path");
|
||||
|
||||
var url = executorClient.remoteFileURL(fileStorageService.getStorage(fsid), path).toString();
|
||||
|
||||
List<SummaryStatusCode> byStatusCode = new ArrayList<>();
|
||||
List<SummaryContentType> byContentType = new ArrayList<>();
|
||||
|
||||
List<CrawlDataRecordSummary> records = new ArrayList<>();
|
||||
|
||||
String domain;
|
||||
try (var conn = DriverManager.getConnection("jdbc:duckdb:");
|
||||
var stmt = conn.createStatement())
|
||||
{
|
||||
ResultSet rs;
|
||||
|
||||
rs = stmt.executeQuery(DUCKDB."SELECT domain FROM \{url} LIMIT 1");
|
||||
domain = rs.next() ? rs.getString(1) : "NO DOMAIN";
|
||||
|
||||
rs = stmt.executeQuery(DUCKDB."""
|
||||
SELECT httpStatus, COUNT(*) as cnt FROM \{url}
|
||||
GROUP BY httpStatus
|
||||
ORDER BY httpStatus
|
||||
""");
|
||||
while (rs.next()) {
|
||||
byStatusCode.add(new SummaryStatusCode(rs.getInt(1), rs.getInt(2)));
|
||||
}
|
||||
|
||||
rs = stmt.executeQuery(DUCKDB."""
|
||||
SELECT contentType, COUNT(*) as cnt
|
||||
FROM \{url}
|
||||
GROUP BY contentType
|
||||
ORDER BY contentType
|
||||
""");
|
||||
while (rs.next()) {
|
||||
byContentType.add(new SummaryContentType(rs.getString(1), rs.getInt(2)));
|
||||
}
|
||||
|
||||
rs = stmt.executeQuery(DUCKDB."""
|
||||
SELECT url, contentType, httpStatus, body != '', etagHeader, lastModifiedHeader
|
||||
FROM \{url} LIMIT 10
|
||||
""");
|
||||
while (rs.next()) {
|
||||
records.add(new CrawlDataRecordSummary(rs.getString(1), rs.getString(2), rs.getInt(3), rs.getBoolean(4), rs.getString(5), rs.getString(6)));
|
||||
}
|
||||
}
|
||||
|
||||
return Map.of(
|
||||
"tab", Map.of("storage", true),
|
||||
"view", Map.of("crawl", true),
|
||||
"node", nodeConfigurationService.get(nodeId),
|
||||
"storage", fileStorageService.getStorage(fsid),
|
||||
"path", path,
|
||||
"domain", domain,
|
||||
"byStatusCode", byStatusCode,
|
||||
"byContentType", byContentType,
|
||||
"records", records)
|
||||
;
|
||||
}
|
||||
|
||||
public ControlCrawlDataService.CrawlDataFileList getCrawlDataFiles(FileStorageId fsid,
|
||||
@Nullable String filterDomain,
|
||||
@Nullable String afterDomain) {
|
||||
List<ControlCrawlDataService.CrawlDataFile> crawlDataFiles = new ArrayList<>();
|
||||
|
||||
try (var br = new BufferedReader(new InputStreamReader((executorClient.remoteFileURL(fileStorageService.getStorage(fsid), "crawler.log").openStream())))) {
|
||||
Stream<CrawlDataFile> str = br.lines()
|
||||
.filter(s -> !s.isBlank())
|
||||
.filter(s -> !s.startsWith("#"))
|
||||
.map(s -> {
|
||||
String[] parts = s.split("\\s+");
|
||||
return new ControlCrawlDataService.CrawlDataFile(parts[0], parts[2], Integer.parseInt(parts[3]));
|
||||
});
|
||||
|
||||
if (!Strings.isNullOrEmpty(afterDomain)) {
|
||||
str = str.dropWhile(s -> !s.domain().equals(afterDomain)).skip(1);
|
||||
}
|
||||
|
||||
if (!Strings.isNullOrEmpty(filterDomain)) {
|
||||
str = str.filter(s -> s.domain().toLowerCase().contains(filterDomain.toLowerCase()));
|
||||
}
|
||||
|
||||
str.limit(10).forEach(crawlDataFiles::add);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to fetch crawler.log", ex);
|
||||
}
|
||||
|
||||
return new ControlCrawlDataService.CrawlDataFileList(
|
||||
crawlDataFiles,
|
||||
filterDomain,
|
||||
afterDomain);
|
||||
}
|
||||
|
||||
public record SummaryContentType(String contentType, int count) {}
|
||||
|
||||
public record SummaryStatusCode(int statusCode, int count) {}
|
||||
|
||||
public record CrawlDataRecordSummary(String url, String contentType, int httpStatus, boolean hasBody, String etag, String lastModified) {}
|
||||
|
||||
public record CrawlDataFile(String domain, String path, int count) {}
|
||||
|
||||
public record CrawlDataFileList(List<CrawlDataFile> files,
|
||||
@Nullable String filter,
|
||||
@Nullable String after)
|
||||
{
|
||||
|
||||
// Used by the template to determine if there are more files to show,
|
||||
// looks unused in the IDE but it's not
|
||||
public String nextAfter() {
|
||||
if (files.isEmpty())
|
||||
return "";
|
||||
if (files.size() < 10)
|
||||
return "";
|
||||
|
||||
return files.getLast().domain();
|
||||
}
|
||||
}
|
||||
|
||||
// DuckDB template processor that deals with quoting and escaping values
|
||||
// in the SQL query; this offers a very basic protection against accidental SQL injection
|
||||
static StringTemplate.Processor<String, IllegalArgumentException> DUCKDB = st -> {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
Iterator<String> fragmentsIter = st.fragments().iterator();
|
||||
|
||||
for (Object value : st.values()) {
|
||||
sb.append(fragmentsIter.next());
|
||||
|
||||
if (value instanceof Number) { // don't quote numbers
|
||||
sb.append(value);
|
||||
} else {
|
||||
String valueStr = value.toString().replace("'", "''");
|
||||
sb.append("'").append(valueStr).append("'");
|
||||
}
|
||||
}
|
||||
|
||||
sb.append(fragmentsIter.next());
|
||||
|
||||
return sb.toString();
|
||||
};
|
||||
|
||||
}
|
@ -60,8 +60,7 @@ public class ControlFileStorageService {
|
||||
return redirectToOverview(request);
|
||||
}
|
||||
|
||||
public Object downloadFileFromStorage(Request request, Response response) throws IOException {
|
||||
int nodeId = Integer.parseInt(request.params("id"));
|
||||
public Object downloadFileFromStorage(Request request, Response response) throws IOException, SQLException {
|
||||
var fileStorageId = FileStorageId.parse(request.params("fid"));
|
||||
|
||||
String path = request.queryParams("path");
|
||||
@ -73,7 +72,11 @@ public class ControlFileStorageService {
|
||||
else
|
||||
response.type("application/octet-stream");
|
||||
|
||||
executorClient.transferFile(nodeId, fileStorageId, path, response.raw().getOutputStream());
|
||||
var storage = fileStorageService.getStorage(fileStorageId);
|
||||
|
||||
try (var urlStream = executorClient.remoteFileURL(storage, path).openStream()) {
|
||||
urlStream.transferTo(response.raw().getOutputStream());
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
@ -100,6 +103,7 @@ public class ControlFileStorageService {
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
public Object flagFileForDeletionRequest(Request request, Response response) throws SQLException {
|
||||
FileStorageId fid = new FileStorageId(Long.parseLong(request.params(":fid")));
|
||||
fileStorageService.flagFileForDeletion(fid);
|
||||
|
@ -1,5 +1,6 @@
|
||||
package nu.marginalia.control.node.svc;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.inject.Inject;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import lombok.SneakyThrows;
|
||||
@ -23,10 +24,16 @@ import spark.Request;
|
||||
import spark.Response;
|
||||
import spark.Spark;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class ControlNodeService {
|
||||
private final FileStorageService fileStorageService;
|
||||
@ -39,6 +46,8 @@ public class ControlNodeService {
|
||||
private final RedirectControl redirectControl;
|
||||
private final NodeConfigurationService nodeConfigurationService;
|
||||
|
||||
private final ControlCrawlDataService crawlDataService;
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
@Inject
|
||||
@ -51,7 +60,7 @@ public class ControlNodeService {
|
||||
HikariDataSource dataSource,
|
||||
ServiceMonitors monitors,
|
||||
RedirectControl redirectControl,
|
||||
NodeConfigurationService nodeConfigurationService)
|
||||
NodeConfigurationService nodeConfigurationService, ControlCrawlDataService crawlDataService)
|
||||
{
|
||||
this.fileStorageService = fileStorageService;
|
||||
this.rendererFactory = rendererFactory;
|
||||
@ -62,6 +71,7 @@ public class ControlNodeService {
|
||||
this.monitors = monitors;
|
||||
this.redirectControl = redirectControl;
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
this.crawlDataService = crawlDataService;
|
||||
}
|
||||
|
||||
public void register() throws IOException {
|
||||
@ -72,6 +82,7 @@ public class ControlNodeService {
|
||||
var storageConfRenderer = rendererFactory.renderer("control/node/node-storage-conf");
|
||||
var storageListRenderer = rendererFactory.renderer("control/node/node-storage-list");
|
||||
var storageDetailsRenderer = rendererFactory.renderer("control/node/node-storage-details");
|
||||
var storageCrawlParquetDetails = rendererFactory.renderer("control/node/node-storage-crawl-parquet-details");
|
||||
var configRenderer = rendererFactory.renderer("control/node/node-config");
|
||||
|
||||
|
||||
@ -84,6 +95,8 @@ public class ControlNodeService {
|
||||
Spark.get("/nodes/:id/storage/conf", this::nodeStorageConfModel, storageConfRenderer::render);
|
||||
Spark.get("/nodes/:id/storage/details", this::nodeStorageDetailsModel, storageDetailsRenderer::render);
|
||||
|
||||
Spark.get("/nodes/:id/storage/:fid/crawl-parquet-info", crawlDataService::crawlParquetInfo, storageCrawlParquetDetails::render);
|
||||
|
||||
Spark.post("/nodes/:id/process/:processBase/stop", this::stopProcess,
|
||||
redirectControl.renderRedirectAcknowledgement("Stopping", "../..")
|
||||
);
|
||||
@ -210,7 +223,8 @@ public class ControlNodeService {
|
||||
|
||||
private Object nodeStorageDetailsModel(Request request, Response response) throws SQLException {
|
||||
int nodeId = Integer.parseInt(request.params("id"));
|
||||
var storage = getFileStorageWithRelatedEntries(nodeId, FileStorageId.parse(request.queryParams("fid")));
|
||||
var fsid = FileStorageId.parse(request.queryParams("fid"));
|
||||
var storage = getFileStorageWithRelatedEntries(nodeId, fsid);
|
||||
|
||||
String view = switch(storage.type()) {
|
||||
case BACKUP -> "backup";
|
||||
@ -221,14 +235,26 @@ public class ControlNodeService {
|
||||
default -> throw new IllegalStateException(storage.type().toString());
|
||||
};
|
||||
|
||||
return Map.of(
|
||||
"tab", Map.of("storage", true),
|
||||
"view", Map.of(view, true),
|
||||
"node", nodeConfigurationService.get(nodeId),
|
||||
"storage", storage
|
||||
);
|
||||
var ret = new HashMap<>();
|
||||
|
||||
ret.put("tab", Map.of("storage", true));
|
||||
ret.put("view", Map.of(view, true));
|
||||
ret.put("node", nodeConfigurationService.get(nodeId));
|
||||
ret.put("storage", storage);
|
||||
|
||||
if (storage.type() == FileStorageType.CRAWL_DATA) {
|
||||
var cdFiles = crawlDataService.getCrawlDataFiles(fsid,
|
||||
request.queryParams("filterDomain"),
|
||||
request.queryParams("afterDomain")
|
||||
);
|
||||
ret.put("crawlDataFiles", cdFiles);
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
}
|
||||
|
||||
|
||||
private Object nodeConfigModel(Request request, Response response) throws SQLException {
|
||||
int nodeId = Integer.parseInt(request.params("id"));
|
||||
|
||||
|
@ -0,0 +1,89 @@
|
||||
<!doctype html>
|
||||
<html lang="en-US">
|
||||
{{> control/partials/head-includes }}
|
||||
<head><title>Control Service: Node {{node.id}}</title></head>
|
||||
<body>
|
||||
{{> control/partials/nav}}
|
||||
|
||||
<div class="container">
|
||||
|
||||
<nav aria-label="breadcrumb">
|
||||
<ol class="breadcrumb">
|
||||
<li class="breadcrumb-item"><a href="/nodes">nodes</a></li>
|
||||
<li class="breadcrumb-item">node-{{node.id}}</li>
|
||||
</ol>
|
||||
</nav>
|
||||
|
||||
{{> control/node/partial-node-nav }}
|
||||
|
||||
<div class="container">
|
||||
{{>control/partials/storage-types}}
|
||||
|
||||
<h1>Crawl Parquet Info</h1>
|
||||
|
||||
<h2>Summary</h2>
|
||||
<table class="table">
|
||||
<tr>
|
||||
<th>Domain</th><th>Filename</th>
|
||||
</tr>
|
||||
<td>{{domain}}</td>
|
||||
<td>
|
||||
<a href="/nodes/{{node.id}}/storage/{{storage.id}}/transfer?path={{{path}}}">{{path}}</a>
|
||||
</td>
|
||||
<tr>
|
||||
<th>HTTP Status</th>
|
||||
<th>Count</th>
|
||||
</tr>
|
||||
{{#each byStatusCode}}
|
||||
<tr>
|
||||
<td>{{statusCode}}</td>
|
||||
<td>{{count}}</td>
|
||||
</tr>
|
||||
{{/each}}
|
||||
<tr>
|
||||
<th>Content Type</th>
|
||||
<th>Count</th>
|
||||
</tr>
|
||||
{{#each byContentType}}
|
||||
<tr>
|
||||
<td>{{contentType}}</td>
|
||||
<td>{{count}}</td>
|
||||
</tr>
|
||||
{{/each}}
|
||||
</table>
|
||||
|
||||
<h2>Contents</h2>
|
||||
<table class="table">
|
||||
<tr>
|
||||
<th>URL</th>
|
||||
<th>Content Type</th>
|
||||
<th>HTTP Status</th>
|
||||
<th>Has Body</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<th colspan="2">ETag</th>
|
||||
<th colspan="2">Last Modified</th>
|
||||
</tr>
|
||||
{{#each records}}
|
||||
<tr>
|
||||
<td>
|
||||
<a href="{{url}}" rel="noreferrer noopener">{{url}}</a>
|
||||
</td>
|
||||
<td>{{contentType}}</td>
|
||||
<td>{{httpStatus}}</td>
|
||||
<td>{{#if hasBody}}✓{{/if}}</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td colspan="2">{{etag}}</td>
|
||||
<td colspan="2">{{lastModified}}</td>
|
||||
</tr>
|
||||
{{/each}}
|
||||
</table>
|
||||
</div>
|
||||
|
||||
|
||||
|
||||
</div>
|
||||
</body>
|
||||
{{> control/partials/foot-includes }}
|
||||
</html>
|
@ -62,6 +62,39 @@
|
||||
{{/with}}
|
||||
</table>
|
||||
|
||||
{{#if view.crawl}}
|
||||
<h1 class="my-3">Crawl Data</h1>
|
||||
<table class="table">
|
||||
<tr>
|
||||
<th>Domain</th>
|
||||
<th>Count</th>
|
||||
</tr>
|
||||
<form method="get" action="/nodes/{{node.id}}/storage/details">
|
||||
<tr>
|
||||
<input type="hidden" name="fid" value="{{storage.id}}">
|
||||
<td> <input type="text" name="filterDomain" value="{{crawlDataFiles.filter}}" placeholder="Filter"> <button class="btn btn-secondary btn-sm" type="submit">Filter</button> </td>
|
||||
<td></td>
|
||||
</tr>
|
||||
</form>
|
||||
|
||||
{{#each crawlDataFiles.files}}
|
||||
<tr>
|
||||
<td><a href="/nodes/{{node.id}}/storage/{{storage.id}}/crawl-parquet-info?path={{{path}}}">{{domain}}</a></td>
|
||||
<td>{{count}}</td>
|
||||
</tr>
|
||||
{{/each}}
|
||||
|
||||
{{#if crawlDataFiles.nextAfter}}
|
||||
<tr>
|
||||
<td></td>
|
||||
<td>
|
||||
<a href="/nodes/{{node.id}}/storage/details?fid={{storage.id}}&filter={{crawlDataFiles.filter}}&afterDomain={{crawlDataFiles.nextAfter}}">Next</a>
|
||||
</td>
|
||||
</tr>
|
||||
{{/if}}
|
||||
|
||||
</table>
|
||||
{{/if}}
|
||||
|
||||
{{#with storage}}
|
||||
{{>control/partials/storage-details/related}}
|
||||
|
@ -0,0 +1,105 @@
|
||||
package nu.marginalia.executor;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
import spark.Spark;
|
||||
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
|
||||
public class ExecutorFileTransferService {
|
||||
private final FileStorageService fileStorageService;
|
||||
private static final Logger logger = LoggerFactory.getLogger(ExecutorFileTransferService.class);
|
||||
|
||||
@Inject
|
||||
public ExecutorFileTransferService(FileStorageService fileStorageService) {
|
||||
this.fileStorageService = fileStorageService;
|
||||
}
|
||||
|
||||
/** Allows transfer of files from each partition */
|
||||
public Object transferFile(Request request, Response response) throws SQLException, IOException {
|
||||
|
||||
FileStorageId fileStorageId = FileStorageId.parse(request.params("fid"));
|
||||
|
||||
var fileStorage = fileStorageService.getStorage(fileStorageId);
|
||||
|
||||
Path basePath = fileStorage.asPath();
|
||||
|
||||
String path = request.queryParams("path").replaceAll("%2F", "/");
|
||||
Path filePath = basePath.resolve(path).normalize();
|
||||
|
||||
// ensure filePath is within basePath
|
||||
// even though this is an internal API, it's better to be safe than sorry
|
||||
if (!filePath.startsWith(basePath)) {
|
||||
response.status(403);
|
||||
return "Forbidden";
|
||||
}
|
||||
|
||||
// Announce that we support byte ranges
|
||||
response.header("Accept-Ranges", "bytes");
|
||||
|
||||
// Set the content type to binary
|
||||
response.type("application/octet-stream");
|
||||
|
||||
String range = request.headers("Range");
|
||||
if (range != null) {
|
||||
String[] ranges = StringUtils.split(range, '=');
|
||||
if (ranges.length != 2 || !"bytes".equals(ranges[0])) {
|
||||
logger.warn("Invalid range header in {}: {}", filePath, range);
|
||||
Spark.halt(400, "Invalid range header");
|
||||
}
|
||||
|
||||
String[] rangeValues = StringUtils.split(ranges[1], '-');
|
||||
if (rangeValues.length != 2) {
|
||||
logger.warn("Invalid range header in {}: {}", filePath, range);
|
||||
Spark.halt(400, "Invalid range header");
|
||||
}
|
||||
|
||||
long start = Long.parseLong(rangeValues[0].trim());
|
||||
long end = Long.parseLong(rangeValues[1].trim());
|
||||
long contentLength = end - start + 1;
|
||||
response.header("Content-Range", "bytes " + start + "-" + end + "/" + Files.size(filePath));
|
||||
response.header("Content-Length", String.valueOf(contentLength));
|
||||
response.status(206);
|
||||
|
||||
if ("HEAD".equalsIgnoreCase(request.requestMethod())) {
|
||||
return "";
|
||||
}
|
||||
|
||||
serveFile(filePath, response.raw().getOutputStream(), start, end + 1);
|
||||
} else {
|
||||
response.header("Content-Length", String.valueOf(Files.size(filePath)));
|
||||
response.status(200);
|
||||
|
||||
if ("HEAD".equalsIgnoreCase(request.requestMethod())) {
|
||||
return "";
|
||||
}
|
||||
|
||||
serveFile(filePath, response.raw().getOutputStream());
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
private void serveFile(Path filePath, ServletOutputStream outputStream) throws IOException {
|
||||
try (var is = Files.newInputStream(filePath)) {
|
||||
IOUtils.copy(is, outputStream);
|
||||
}
|
||||
}
|
||||
|
||||
private void serveFile(Path filePath, ServletOutputStream outputStream, long start, long end) throws IOException {
|
||||
try (var is = Files.newInputStream(filePath)) {
|
||||
IOUtils.copyLarge(is, outputStream, start, end - start);
|
||||
}
|
||||
}
|
||||
}
|
@ -6,18 +6,10 @@ import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.server.BaseServiceParams;
|
||||
import nu.marginalia.service.server.Service;
|
||||
import nu.marginalia.service.server.mq.MqRequest;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
import spark.Spark;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
|
||||
// Weird name for this one to not have clashes with java.util.concurrent.ExecutorService
|
||||
@ -25,7 +17,6 @@ public class ExecutorSvc extends Service {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ExecutorSvc.class);
|
||||
private final ExecutionInit executionInit;
|
||||
private final FileStorageService fileStorageService;
|
||||
|
||||
@Inject
|
||||
public ExecutorSvc(BaseServiceParams params,
|
||||
@ -34,7 +25,7 @@ public class ExecutorSvc extends Service {
|
||||
ExecutorSideloadGrpcService executorSideloadGrpcService,
|
||||
ExecutorExportGrpcService executorExportGrpcService,
|
||||
ExecutionInit executionInit,
|
||||
FileStorageService fileStorageService)
|
||||
ExecutorFileTransferService fileTransferService)
|
||||
{
|
||||
super(params,
|
||||
ServicePartition.partition(params.configuration.node()),
|
||||
@ -45,9 +36,9 @@ public class ExecutorSvc extends Service {
|
||||
);
|
||||
|
||||
this.executionInit = executionInit;
|
||||
this.fileStorageService = fileStorageService;
|
||||
|
||||
Spark.get("/transfer/file/:fid", this::transferFile);
|
||||
Spark.get("/transfer/file/:fid", fileTransferService::transferFile);
|
||||
Spark.head("/transfer/file/:fid", fileTransferService::transferFile);
|
||||
}
|
||||
|
||||
@MqRequest(endpoint="FIRST-BOOT")
|
||||
@ -57,19 +48,6 @@ public class ExecutorSvc extends Service {
|
||||
executionInit.initDefaultActors();
|
||||
}
|
||||
|
||||
/** Allows transfer of files from each partition */
|
||||
private Object transferFile(Request request, Response response) throws SQLException, IOException {
|
||||
FileStorageId fileStorageId = FileStorageId.parse(request.params("fid"));
|
||||
|
||||
var fileStorage = fileStorageService.getStorage(fileStorageId);
|
||||
|
||||
Path basePath = fileStorage.asPath();
|
||||
// This is not a public API so injection isn't a concern
|
||||
Path filePath = basePath.resolve(request.queryParams("path"));
|
||||
|
||||
response.type("application/octet-stream");
|
||||
FileUtils.copyFile(filePath.toFile(), response.raw().getOutputStream());
|
||||
return "";
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,54 @@
|
||||
package nu.marginalia.executor;
|
||||
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
import spark.Spark;
|
||||
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
class ExecutorFileTransferServiceTest {
|
||||
|
||||
@Test
|
||||
public void test() throws SQLException, InterruptedException {
|
||||
var fileStorage = Mockito.mock(FileStorageService.class);
|
||||
|
||||
when(fileStorage.getStorage(Mockito.any(FileStorageId.class))).thenReturn(new FileStorage(null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"/tmp",
|
||||
null,
|
||||
null));
|
||||
|
||||
var svc = new ExecutorFileTransferService(fileStorage);
|
||||
|
||||
Spark.port(9998);
|
||||
Spark.get("/transfer/file/:fid", svc::transferFile);
|
||||
Spark.head("/transfer/file/:fid", svc::transferFile);
|
||||
|
||||
Spark.init();
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
|
||||
try (var conn = DriverManager.getConnection("jdbc:duckdb:");
|
||||
var stmt = conn.createStatement())
|
||||
{
|
||||
var rs = stmt.executeQuery("""
|
||||
SELECT COUNT(*) AS cnt, httpStatus
|
||||
FROM 'http://hostname:9998/transfer/file/0?path=crawl.parquet'
|
||||
GROUP BY httpStatus
|
||||
""");
|
||||
while (rs.next()) {
|
||||
System.out.println(rs.getInt("CNT") + " " + rs.getInt("httpStatus"));
|
||||
}
|
||||
}
|
||||
for(;;);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user