(*) Clean up code related to crawl parquet inspection

This commit is contained in:
Viktor Lofgren 2024-05-22 12:55:08 +02:00
parent 365229991b
commit 59ec70eb73
7 changed files with 74 additions and 36 deletions

View File

@ -53,7 +53,7 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
@Override @Override
public synchronized void onChange() { public synchronized void onChange() {
Set<InstanceAddress> newRoutes = serviceRegistryIf.getEndpoints(serviceKey); Set<InstanceAddress> newRoutes = new HashSet<>(serviceRegistryIf.getEndpoints(serviceKey));
Set<InstanceAddress> oldRoutes = new HashSet<>(channels.keySet()); Set<InstanceAddress> oldRoutes = new HashSet<>(channels.keySet());
// Find the routes that have been added or removed // Find the routes that have been added or removed

View File

@ -6,7 +6,7 @@ import static nu.marginalia.service.discovery.property.ServiceEndpoint.*;
import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServiceKey;
import java.util.Set; import java.util.List;
import java.util.UUID; import java.util.UUID;
/** A service registry that allows services to register themselves and /** A service registry that allows services to register themselves and
@ -42,7 +42,7 @@ public interface ServiceRegistryIf {
int requestPort(String externalHost, ServiceKey<?> key); int requestPort(String externalHost, ServiceKey<?> key);
/** Get all endpoints for the service on the specified node and schema. */ /** Get all endpoints for the service on the specified node and schema. */
Set<InstanceAddress> getEndpoints(ServiceKey<?> schema); List<InstanceAddress> getEndpoints(ServiceKey<?> schema);
/** Register a monitor to be notified when the service registry changes. /** Register a monitor to be notified when the service registry changes.
* <p></p> * <p></p>

View File

@ -177,9 +177,9 @@ public class ZkServiceRegistry implements ServiceRegistryIf {
} }
@Override @Override
public Set<InstanceAddress> getEndpoints(ServiceKey<?> key) { public List<InstanceAddress> getEndpoints(ServiceKey<?> key) {
try { try {
Set<InstanceAddress> ret = new HashSet<>(); List<InstanceAddress> ret = new ArrayList<>();
for (var uuid : curatorFramework for (var uuid : curatorFramework
.getChildren() .getChildren()
.forPath(key.toPath())) { .forPath(key.toPath())) {
@ -204,7 +204,7 @@ public class ZkServiceRegistry implements ServiceRegistryIf {
return ret; return ret;
} }
catch (Exception ex) { catch (Exception ex) {
return Set.of(); return List.of();
} }
} }

View File

@ -63,7 +63,7 @@ public class GrpcServerTest {
var mockRegistry = Mockito.mock(ServiceRegistryIf.class); var mockRegistry = Mockito.mock(ServiceRegistryIf.class);
when(mockRegistry.getEndpoints(any())).thenReturn( when(mockRegistry.getEndpoints(any())).thenReturn(
Set.of(new ServiceEndpoint("127.0.0.1", port).asInstance(serverUUID))); List.of(new ServiceEndpoint("127.0.0.1", port).asInstance(serverUUID)));
var client = createClient(mockRegistry); var client = createClient(mockRegistry);
client.onChange(); client.onChange();
@ -83,7 +83,7 @@ public class GrpcServerTest {
server1.start(); server1.start();
Set<ServiceEndpoint.InstanceAddress> endpoints = new HashSet<>(); List<ServiceEndpoint.InstanceAddress> endpoints = new ArrayList<>();
endpoints.add(new ServiceEndpoint("127.0.0.1", port).asInstance(serverUUID1)); endpoints.add(new ServiceEndpoint("127.0.0.1", port).asInstance(serverUUID1));
var mockRegistry = Mockito.mock(ServiceRegistryIf.class); var mockRegistry = Mockito.mock(ServiceRegistryIf.class);

View File

@ -21,8 +21,6 @@ import nu.marginalia.storage.model.FileStorageId;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.net.*; import java.net.*;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
@ -161,18 +159,24 @@ public class ExecutorClient {
} }
} }
/** Get the URL to download a file from a (possibly remote) file storage.
* The endpoint is compatible with range requests.
* */
public URL remoteFileURL(FileStorage fileStorage, String path) { public URL remoteFileURL(FileStorage fileStorage, String path) {
String uriPath = STR."/transfer/file/\{fileStorage.id()}"; String uriPath = STR."/transfer/file/\{fileStorage.id()}";
String uriQuery = STR."path=\{URLEncoder.encode(path, StandardCharsets.UTF_8)}"; String uriQuery = STR."path=\{URLEncoder.encode(path, StandardCharsets.UTF_8)}";
var service = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, fileStorage.node())) var endpoints = registry.getEndpoints(ServiceKey.forRest(ServiceId.Executor, fileStorage.node()));
.stream().findFirst().orElseThrow(); if (endpoints.isEmpty()) {
throw new RuntimeException("No endpoints for node " + fileStorage.node());
}
var service = endpoints.getFirst();
try { try {
return service.endpoint().toURL(uriPath, uriQuery); return service.endpoint().toURL(uriPath, uriQuery);
} }
catch (URISyntaxException|MalformedURLException ex) { catch (URISyntaxException|MalformedURLException ex) {
throw new RuntimeException(ex); throw new RuntimeException("Failed to construct URL for path", ex);
} }
} }

View File

@ -22,12 +22,16 @@ import java.util.*;
import java.util.stream.Stream; import java.util.stream.Stream;
/** Service for inspecting crawl data within the control service. /** Service for inspecting crawl data within the control service.
* * <p></p>
* Uses remote calls to the executor service to fetch information about the crawl data. * Uses remote calls to the executor service to fetch information about the crawl data.
* Both directly, when inspecting the crawler log, and indirectly via duckdb when * Both directly, when inspecting the crawler log, and indirectly via duckdb when
* inspecting the parquet files. The duckdb calls rely on range queries to fetch * inspecting the parquet files. The duckdb calls rely on range queries to fetch
* only the relevant data from the files, so that the UI remains responsive even when * only the relevant data from the files, so that the UI remains responsive even when
* dealing with large (100MB+ files). * dealing with large (100MB+ files).
* <p></p>
* This service is built in a fairly "raw" manner, for the purpose of not adding architectural
* overhead by modelling the data in a more structured way through an API; instead the data is
* fetched and presented directly to the UI.
*/ */
@Singleton @Singleton
public class ControlCrawlDataService { public class ControlCrawlDataService {
@ -63,14 +67,16 @@ public class ControlCrawlDataService {
List<SummaryStatusCode> byStatusCode = new ArrayList<>(); List<SummaryStatusCode> byStatusCode = new ArrayList<>();
List<SummaryContentType> byContentType = new ArrayList<>(); List<SummaryContentType> byContentType = new ArrayList<>();
List<CrawlDataRecordSummary> records = new ArrayList<>(); List<CrawlDataRecordSummary> records = new ArrayList<>();
// Fetch the data from the parquet file using DuckDB
String domain; String domain;
try (var conn = DriverManager.getConnection("jdbc:duckdb:"); try (var conn = DriverManager.getConnection("jdbc:duckdb:");
var stmt = conn.createStatement()) { var stmt = conn.createStatement()) {
ResultSet rs; ResultSet rs;
// Summarize by status code
rs = stmt.executeQuery(DUCKDB."SELECT domain FROM \{url} LIMIT 1"); rs = stmt.executeQuery(DUCKDB."SELECT domain FROM \{url} LIMIT 1");
domain = rs.next() ? rs.getString(1) : "NO DOMAIN"; domain = rs.next() ? rs.getString(1) : "NO DOMAIN";
@ -80,9 +86,15 @@ public class ControlCrawlDataService {
ORDER BY httpStatus ORDER BY httpStatus
"""); """);
while (rs.next()) { while (rs.next()) {
byStatusCode.add(new SummaryStatusCode(rs.getInt(1), rs.getInt(2), selectedHttpStatus.equals(rs.getString(1)))); final boolean isCurrentFilter = selectedContentType.equals(rs.getString("httpStatus"));
final int status = rs.getInt("httpStatus");
final int cnt = rs.getInt("cnt");
byStatusCode.add(new SummaryStatusCode(status, cnt, isCurrentFilter));
} }
// Summarize by content type
rs = stmt.executeQuery(DUCKDB.""" rs = stmt.executeQuery(DUCKDB."""
SELECT contentType, COUNT(*) as cnt SELECT contentType, COUNT(*) as cnt
FROM \{url} FROM \{url}
@ -90,11 +102,16 @@ public class ControlCrawlDataService {
ORDER BY contentType ORDER BY contentType
"""); """);
while (rs.next()) { while (rs.next()) {
byContentType.add(new SummaryContentType(rs.getString(1), rs.getInt(2), selectedContentType.equals(rs.getString(1)))); final boolean isCurrentFilter = selectedContentType.equals(rs.getString("contentType"));
final String contentType = rs.getString("contentType");
final int cnt = rs.getInt("cnt");
byContentType.add(new SummaryContentType(contentType, cnt, isCurrentFilter));
} }
// Extract the document data
var query = DUCKDB."SELECT url, contentType, httpStatus, body != '', etagHeader, lastModifiedHeader FROM \{url} WHERE 1=1"; var query = DUCKDB."SELECT url, contentType, httpStatus, body != '' as bodied, etagHeader, lastModifiedHeader FROM \{url} WHERE 1=1";
if (!urlGlob.isBlank()) if (!urlGlob.isBlank())
query += DUCKDB." AND url LIKE \{urlGlob.replace('*', '%')}"; query += DUCKDB." AND url LIKE \{urlGlob.replace('*', '%')}";
if (!selectedContentType.equals("ALL")) if (!selectedContentType.equals("ALL"))
@ -105,7 +122,14 @@ public class ControlCrawlDataService {
rs = stmt.executeQuery(query); rs = stmt.executeQuery(query);
while (rs.next()) { while (rs.next()) {
records.add(new CrawlDataRecordSummary(rs.getString(1), rs.getString(2), rs.getInt(3), rs.getBoolean(4), rs.getString(5), rs.getString(6)));
records.add(new CrawlDataRecordSummary(
rs.getString("url"),
rs.getString("contentType"),
rs.getInt("httpStatus"),
rs.getBoolean("bodied"),
rs.getString("etagHeader"),
rs.getString("lastModifiedHeader")));
} }
} }
@ -113,19 +137,21 @@ public class ControlCrawlDataService {
ret.put("tab", Map.of("storage", true)); ret.put("tab", Map.of("storage", true));
ret.put("view", Map.of("crawl", true)); ret.put("view", Map.of("crawl", true));
ret.put("contentType", selectedContentType);
ret.put("httpStatus", selectedHttpStatus);
ret.put("urlGlob", urlGlob);
ret.put("pagination", new Pagination(after + 10, after - 10, records.size()));
ret.put("node", nodeConfigurationService.get(nodeId)); ret.put("node", nodeConfigurationService.get(nodeId));
ret.put("storage", fileStorageService.getStorage(fsid)); ret.put("storage", fileStorageService.getStorage(fsid));
ret.put("path", path); ret.put("path", path);
ret.put("domain", domain); ret.put("domain", domain);
ret.put("contentType", selectedContentType);
ret.put("httpStatus", selectedHttpStatus);
ret.put("urlGlob", urlGlob);
ret.put("byStatusCode", byStatusCode); ret.put("byStatusCode", byStatusCode);
ret.put("byContentType", byContentType); ret.put("byContentType", byContentType);
ret.put("records", records); ret.put("records", records);
ret.put("pagination", new Pagination(after + 10, after - 10, records.size()));
return ret; return ret;
} }
@ -206,6 +232,7 @@ public class ControlCrawlDataService {
// DuckDB template processor that deals with quoting and escaping values // DuckDB template processor that deals with quoting and escaping values
// in the SQL query; this offers a very basic protection against accidental SQL injection // in the SQL query; this offers a very basic protection against accidental SQL injection
@SuppressWarnings("preview")
static StringTemplate.Processor<String, IllegalArgumentException> DUCKDB = st -> { static StringTemplate.Processor<String, IllegalArgumentException> DUCKDB = st -> {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
Iterator<String> fragmentsIter = st.fragments().iterator(); Iterator<String> fragmentsIter = st.fragments().iterator();

View File

@ -7,6 +7,8 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import spark.Spark; import spark.Spark;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
@ -16,6 +18,11 @@ class ExecutorFileTransferServiceTest {
@Test @Test
public void test() throws SQLException, InterruptedException { public void test() throws SQLException, InterruptedException {
// Test requires this file to exist
if (!Files.exists(Path.of("/tmp/crawl.parquet"))) {
return;
}
var fileStorage = Mockito.mock(FileStorageService.class); var fileStorage = Mockito.mock(FileStorageService.class);
when(fileStorage.getStorage(Mockito.any(FileStorageId.class))).thenReturn(new FileStorage(null, when(fileStorage.getStorage(Mockito.any(FileStorageId.class))).thenReturn(new FileStorage(null,
@ -38,17 +45,17 @@ class ExecutorFileTransferServiceTest {
try (var conn = DriverManager.getConnection("jdbc:duckdb:"); try (var conn = DriverManager.getConnection("jdbc:duckdb:");
var stmt = conn.createStatement()) var stmt = conn.createStatement()) {
{
var rs = stmt.executeQuery(""" var rs = stmt.executeQuery("""
SELECT COUNT(*) AS cnt, httpStatus SELECT COUNT(*) AS cnt, httpStatus
FROM 'http://hostname:9998/transfer/file/0?path=crawl.parquet' FROM 'http://localhost:9998/transfer/file/0?path=crawl.parquet'
GROUP BY httpStatus GROUP BY httpStatus
"""); """);
while (rs.next()) { while (rs.next()) {
System.out.println(rs.getInt("CNT") + " " + rs.getInt("httpStatus")); System.out.println(rs.getInt("CNT") + " " + rs.getInt("httpStatus"));
} }
} }
for(;;);
Spark.stop();
} }
} }