Refactoring

* Encyclopedia sideloader; permit providing base URL.
* Storage base shows node id in GUI
* ProcessLivenessMonitorActor restarts automatically
* Clean-up of outbox code
This commit is contained in:
Viktor Lofgren 2023-10-25 18:50:32 +02:00
parent b8855afd10
commit d7686b665e
23 changed files with 152 additions and 77 deletions

View File

@ -62,9 +62,9 @@ public class ExecutorClient extends AbstractDynamicClient {
// FIXME this shouldn't be done in the executor
}
public void sideloadEncyclopedia(Context ctx, int node, Path sourcePath) {
public void sideloadEncyclopedia(Context ctx, int node, Path sourcePath, String baseUrl) {
post(ctx, node,
"/sideload/encyclopedia?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8),
"/sideload/encyclopedia?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8) + "&baseUrl=" + URLEncoder.encode(baseUrl, StandardCharsets.UTF_8),
"").blockingSubscribe();
}

View File

@ -3,10 +3,47 @@ package nu.marginalia.mqapi.converting;
import lombok.AllArgsConstructor;
import nu.marginalia.storage.model.FileStorageId;
import java.nio.file.Path;
@AllArgsConstructor
public class ConvertRequest {
public final ConvertAction action;
public final String inputSource;
public final FileStorageId crawlStorage;
public final FileStorageId processedDataStorage;
public final String baseUrl;
public static ConvertRequest forCrawlData(FileStorageId sourceId, FileStorageId destId) {
return new ConvertRequest(
ConvertAction.ConvertCrawlData,
null,
sourceId,
destId,
null
);
}
public static ConvertRequest forEncyclopedia(Path sourcePath, String baseUrl, FileStorageId destId) {
return new ConvertRequest(ConvertAction.SideloadEncyclopedia,
sourcePath.toString(),
null,
destId,
baseUrl);
}
public static ConvertRequest forDirtree(Path sourcePath, FileStorageId destId) {
return new ConvertRequest(ConvertAction.SideloadDirtree,
sourcePath.toString(),
null,
destId,
null);
}
public static ConvertRequest forStackexchange(Path sourcePath, FileStorageId destId) {
return new ConvertRequest(ConvertAction.SideloadStackexchange,
sourcePath.toString(),
null,
destId,
null);
}
}

View File

@ -49,17 +49,18 @@ public class FileStorageService {
public FileStorageBase getStorageBase(FileStorageBaseId id) throws SQLException {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT ID, NAME, PATH, TYPE
SELECT ID, NAME, NODE, PATH, TYPE
FROM FILE_STORAGE_BASE WHERE ID = ?
""")) {
stmt.setLong(1, id.id());
try (var rs = stmt.executeQuery()) {
if (rs.next()) {
return new FileStorageBase(
new FileStorageBaseId(rs.getLong(1)),
FileStorageBaseType.valueOf(rs.getString(4)),
rs.getString(2),
rs.getString(3)
new FileStorageBaseId(rs.getLong("ID")),
FileStorageBaseType.valueOf(rs.getString("TYPE")),
rs.getInt("NODE"),
rs.getString("NAME"),
rs.getString("PATH")
);
}
}
@ -154,7 +155,7 @@ public class FileStorageService {
public FileStorageBase getStorageBase(FileStorageBaseType type, int node) throws SQLException {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT ID, NAME, PATH, TYPE
SELECT ID, NAME, NODE, PATH, TYPE
FROM FILE_STORAGE_BASE WHERE TYPE = ? AND NODE = ?
""")) {
stmt.setString(1, type.name());
@ -162,10 +163,11 @@ public class FileStorageService {
try (var rs = stmt.executeQuery()) {
if (rs.next()) {
return new FileStorageBase(
new FileStorageBaseId(rs.getLong(1)),
FileStorageBaseType.valueOf(rs.getString(4)),
rs.getString(2),
rs.getString(3)
new FileStorageBaseId(rs.getLong("ID")),
FileStorageBaseType.valueOf(rs.getString("TYPE")),
rs.getInt("NODE"),
rs.getString("NAME"),
rs.getString("PATH")
);
}
}

View File

@ -30,6 +30,7 @@ public record FileStorage (
var mockBase = new FileStorageBase(
new FileStorageBaseId(-1),
baseType,
-1,
"OVERRIDE:" + type.name(),
"INVALIDINVALIDINVALID"
);
@ -45,6 +46,9 @@ public record FileStorage (
);
}
public int node() {
return base.node();
}
public Path asPath() {
return Path.of(path);
}

View File

@ -12,6 +12,7 @@ import java.nio.file.Path;
*/
public record FileStorageBase(FileStorageBaseId id,
FileStorageBaseType type,
int node,
String name,
String path
) {

View File

@ -1,5 +1,6 @@
package nu.marginalia.mq.outbox;
import com.google.gson.Gson;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
@ -25,7 +26,7 @@ public class MqOutbox {
private final int pollIntervalMs = Integer.getInteger("mq.outbox.poll-interval-ms", 1000);
private final int maxPollCount = Integer.getInteger("mq.outbox.max-poll-count", 10);
private final Thread pollThread;
private final Gson gson;
private volatile boolean run = true;
public MqOutbox(MqPersistence persistence,
@ -35,6 +36,7 @@ public class MqOutbox {
int outboxNode,
UUID instanceUUID) {
this.persistence = persistence;
this.gson = persistence.getGson();
this.inboxName = inboxName + ":" + inboxNode;
this.replyInboxName = String.format("%s:%d//%s:%d", outboxName, outboxNode, inboxName, inboxNode);
@ -97,6 +99,13 @@ public class MqOutbox {
return waitResponse(id);
}
/** Send a message and wait for a response */
public MqMessage send(Object object) throws Exception {
final long id = sendAsync(object);
return waitResponse(id);
}
/** Send a message asynchronously, without waiting for a response.
* <br>
* Use waitResponse(id) or pollResponse(id) to fetch the response. */
@ -104,6 +113,15 @@ public class MqOutbox {
return persistence.sendNewMessage(inboxName, replyInboxName, null, function, payload, null);
}
/** Send a message asynchronously, without waiting for a response.
* <br>
* Use waitResponse(id) or pollResponse(id) to fetch the response. */
public long sendAsync(Object request) throws Exception {
return persistence.sendNewMessage(inboxName, replyInboxName, null,
request.getClass().getSimpleName(),
gson.toJson(request),
null);
}
/** Blocks until a response arrives for the given message id (possibly forever) */
public MqMessage waitResponse(long id) throws Exception {
synchronized (pendingResponses) {
@ -156,13 +174,27 @@ public class MqOutbox {
return Optional.ofNullable(response);
}
public long sendNotice(String function, String payload) throws Exception {
return persistence.sendNewMessage(inboxName, null, null, function, payload, null);
}
public long sendNotice(long relatedId, String function, String payload) throws Exception {
return persistence.sendNewMessage(inboxName, null, relatedId, function, payload, null);
}
public long sendNotice(String function, String payload) throws Exception {
return persistence.sendNewMessage(inboxName, null, null, function, payload, null);
}
public long sendNotice(long relatedId, Object object) throws Exception {
return persistence.sendNewMessage(inboxName, null, relatedId,
object.getClass().getSimpleName(),
gson.toJson(object),
null);
}
public long sendNotice(Object object) throws Exception {
return persistence.sendNewMessage(inboxName, null, null,
object.getClass().getSimpleName(),
gson.toJson(object),
null);
}
public void flagAsBad(long id) throws SQLException {
persistence.updateMessageState(id, MqMessageState.ERR);
}

View File

@ -1,6 +1,7 @@
package nu.marginalia.mq.persistence;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
@ -21,10 +22,16 @@ import static nu.marginalia.mq.MqMessageState.NEW;
@Singleton
public class MqPersistence {
private final HikariDataSource dataSource;
private final Gson gson;
@Inject
public MqPersistence(HikariDataSource dataSource) {
this.dataSource = dataSource;
this.gson = null;
}
@Inject
public MqPersistence(HikariDataSource dataSource, Gson gson) {
this.dataSource = dataSource;
this.gson = gson;
}
/**
@ -484,4 +491,8 @@ public class MqPersistence {
return ret;
}
}
public Gson getGson() {
return gson;
}
}

View File

@ -244,7 +244,7 @@ public class ConverterMain {
case SideloadEncyclopedia -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(Path.of(request.inputSource)),
yield new SideloadAction(sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(Path.of(request.inputSource), request.baseUrl),
processData.asPath(),
msg, inbox);
}

View File

@ -34,8 +34,8 @@ public class SideloadSourceFactory {
this.dirtreeSideloaderFactory = dirtreeSideloaderFactory;
}
public SideloadSource sideloadEncyclopediaMarginaliaNu(Path pathToDbFile) throws SQLException {
return new EncyclopediaMarginaliaNuSideloader(pathToDbFile, gson, sideloaderProcessing);
public SideloadSource sideloadEncyclopediaMarginaliaNu(Path pathToDbFile, String baseUrl) throws SQLException {
return new EncyclopediaMarginaliaNuSideloader(pathToDbFile, baseUrl, gson, sideloaderProcessing);
}
public Collection<? extends SideloadSource> sideloadDirtree(Path pathToYamlFile) throws IOException {

View File

@ -8,7 +8,7 @@ import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.converting.sideload.SideloaderProcessing;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawl.DomainIndexingState;
import java.io.ByteArrayInputStream;
@ -35,12 +35,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoCloseable {
private final Connection connection;
private final EdgeUrl baseUrl;
private final Gson gson;
private final SideloaderProcessing sideloaderProcessing;
public EncyclopediaMarginaliaNuSideloader(Path pathToDbFile,
String baseUrl,
Gson gson,
SideloaderProcessing sideloaderProcessing) throws SQLException {
this.baseUrl = EdgeUrl.parse(baseUrl).orElseThrow(AssertionError::new);
this.gson = gson;
this.sideloaderProcessing = sideloaderProcessing;
String sqliteDbString = "jdbc:sqlite:" + pathToDbFile.toString();
@ -53,7 +56,7 @@ public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoC
public ProcessedDomain getDomain() {
var ret = new ProcessedDomain();
ret.domain = new EdgeDomain("encyclopedia.marginalia.nu");
ret.domain = baseUrl.getDomain();
ret.ip = "0.0.0.0";
ret.state = DomainIndexingState.ACTIVE;
@ -138,7 +141,7 @@ public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoC
}
private ProcessedDocument convertDocument(List<String> parts, String title, String url) throws URISyntaxException, DisqualifiedException {
String fullUrl = "https://encyclopedia.marginalia.nu/article/"+url;
String fullUrl = baseUrl.toString() + url;
StringBuilder fullHtml = new StringBuilder();
fullHtml.append("<!DOCTYPE html><html><head><title>").append(title).append("</title></head><body>");

View File

@ -32,7 +32,7 @@ class LoaderIndexJournalWriterTest {
tempDir = Files.createTempDirectory(getClass().getSimpleName());
FileStorageService storageService = Mockito.mock(FileStorageService.class);
Mockito.when(storageService.getStorageBase(FileStorageBaseType.CURRENT)).thenReturn(new FileStorageBase(null, null, null, tempDir.toString()));
Mockito.when(storageService.getStorageBase(FileStorageBaseType.CURRENT)).thenReturn(new FileStorageBase(null, null, 1,null, tempDir.toString()));
writer = new LoaderIndexJournalWriter(storageService);
}

View File

@ -55,12 +55,13 @@ public class ControlNodeActionsService {
Spark.halt(404);
return "No such file " + sourcePath;
}
String baseUrl = request.queryParams("baseUrl");
final int nodeId = Integer.parseInt(request.params("node"));
eventLog.logEvent("USER-ACTION", "SIDELOAD ENCYCLOPEDIA " + nodeId);
executorClient.sideloadEncyclopedia(Context.fromRequest(request), nodeId, sourcePath);
executorClient.sideloadEncyclopedia(Context.fromRequest(request), nodeId, sourcePath, baseUrl);
return "";
}

View File

@ -331,7 +331,7 @@ public class ControlNodeService {
for (var type : FileStorageBaseType.values()) {
var base = fileStorageService.getStorageBase(type, nodeId);
bases.add(Objects.requireNonNullElseGet(base,
() -> new FileStorageBase(new FileStorageBaseId(-1), type, "MISSING", "MISSING"))
() -> new FileStorageBase(new FileStorageBaseId(-1), type, -1, "MISSING", "MISSING"))
);
}

View File

@ -51,7 +51,13 @@
<form method="post" action="actions/sideload-encyclopedia" onsubmit="return confirm('Confirm sideloading')">
<div class="my-3 py-3">
<label for="source" class="form-label">articles.db location accessible from the node on the server</label>
<label for="baseUrl" class="form-label">Base URL</label>
<div class="col mb-3">
<input id="baseUrl" name="baseUrl" class="form-control" value="https://en.wikipedia.org/wiki/">
</div>
<label for="source" class="form-label">Path to articles.db accessible from the node on the server</label>
<div class="row">
<div class="col">

View File

@ -89,13 +89,15 @@
{{#each storage}}
<tr>
<th>Type</th>
<th>Node</th>
<th>Path</th>
<th>Name</th>
<th colspan="2">Path</th>
</tr>
<tr>
<td>{{base.type}}</td>
<td>{{base.node}}</td>
<td>{{base.path}}</td>
<td>{{base.name}}</td>
<td colspan="2">{{base.path}}</td>
</tr>
<tr>
<th>Created</th>

View File

@ -36,6 +36,7 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Monitor(int errorAttempts) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RESTART)
public record Run(int attempts) implements ActorStep {}
@Terminal
public record Aborted() implements ActorStep {}

View File

@ -5,7 +5,9 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.module.ServiceConfiguration;
@ -24,6 +26,7 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
private final int node;
public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RESTART)
public record Monitor() implements ActorStep {}
@Override

View File

@ -30,7 +30,7 @@ public class ConvertActor extends RecordActorPrototype {
private final Gson gson;
public record Convert(FileStorageId fid) implements ActorStep {};
public record ConvertEncyclopedia(String source) implements ActorStep {};
public record ConvertEncyclopedia(String source, String baseUrl) implements ActorStep {};
public record ConvertDirtree(String source) implements ActorStep {};
public record ConvertStackexchange(String source) implements ActorStep {};
@Resume(behavior = ActorResumeBehavior.RETRY)
@ -50,15 +50,9 @@ public class ConvertActor extends RecordActorPrototype {
storageService.relateFileStorages(toProcess.id(), processedArea.id());
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.ConvertCrawlData,
null,
fid,
processedArea.id());
yield new ConvertWait(
processedArea.id(),
mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request))
mqConverterOutbox.sendAsync(ConvertRequest.forCrawlData(fid, processedArea.id()))
);
}
case ConvertDirtree(String source) -> {
@ -75,18 +69,12 @@ public class ConvertActor extends RecordActorPrototype {
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.SideloadDirtree,
sourcePath.toString(),
null,
processedArea.id());
yield new ConvertWait(
processedArea.id(),
mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request))
mqConverterOutbox.sendAsync(ConvertRequest.forDirtree(sourcePath, processedArea.id()))
);
}
case ConvertEncyclopedia(String source) -> {
case ConvertEncyclopedia(String source, String baseUrl) -> {
Path sourcePath = Path.of(source);
if (!Files.exists(sourcePath))
@ -101,16 +89,9 @@ public class ConvertActor extends RecordActorPrototype {
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.SideloadEncyclopedia,
sourcePath.toString(),
null,
processedArea.id());
yield new ConvertWait(
processedArea.id(),
mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request))
mqConverterOutbox.sendAsync(ConvertRequest.forEncyclopedia(sourcePath, baseUrl, processedArea.id()))
);
}
case ConvertStackexchange(String source) -> {
@ -129,14 +110,10 @@ public class ConvertActor extends RecordActorPrototype {
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.SideloadStackexchange,
sourcePath.toString(),
null,
processedArea.id());
yield new ConvertWait(
processedArea.id(),
mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request))
mqConverterOutbox.sendAsync(ConvertRequest.forStackexchange(sourcePath, processedArea.id()))
);
}
case ConvertWait(FileStorageId destFid, long msgId) -> {

View File

@ -116,14 +116,8 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
yield new Convert(fid, processedArea.id());
}
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) when msgId < 0 -> {
var request = new ConvertRequest(ConvertAction.ConvertCrawlData,
null,
crawlId,
processedId);
yield new Convert(crawlId, processedId,
mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)));
}
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) when msgId < 0 ->
new Convert(crawlId, processedId, mqConverterOutbox.sendAsync(ConvertRequest.forCrawlData(crawlId, processedId)));
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) -> {
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId);
@ -133,8 +127,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
yield new Load(List.of(processedId));
}
case Load(List<FileStorageId> processedIds, long msgId) when msgId < 0 -> {
var request = new LoadRequest(processedIds);
long id = mqLoaderOutbox.sendAsync(LoadRequest.class.getSimpleName(), gson.toJson(request));
long id = mqLoaderOutbox.sendAsync(new LoadRequest(processedIds));
yield new Load(processedIds, id);
}
@ -201,8 +194,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
}
private long createIndex(IndexName index) throws Exception {
return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(),
gson.toJson(new CreateIndexRequest(index)));
return mqIndexConstructorOutbox.sendAsync(new CreateIndexRequest(index));
}

View File

@ -54,8 +54,7 @@ public class CrawlActor extends RecordActorPrototype {
storageService.relateFileStorages(storage.id(), dataArea.id());
// Send convert request
var request = new CrawlRequest(List.of(fid), dataArea.id());
long msgId = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request));
long msgId = mqCrawlerOutbox.sendAsync(new CrawlRequest(List.of(fid), dataArea.id()));
yield new Crawl(msgId);
}

View File

@ -46,8 +46,7 @@ public class RecrawlActor extends RecordActorPrototype {
refreshService.synchronizeDomainList();
var request = new CrawlRequest(null, fid);
long id = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request));
long id = mqCrawlerOutbox.sendAsync(new CrawlRequest(null, fid));
yield new Crawl(id);
}

View File

@ -21,7 +21,11 @@ public class SideloadService {
}
public Object sideloadEncyclopedia(Request request, Response response) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertEncyclopedia(request.queryParams("path")));
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertEncyclopedia(
request.queryParams("path"),
request.queryParams("baseUrl")
));
return "";
}

View File

@ -86,6 +86,9 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
private final ExecutorService es = Executors.newVirtualThreadPerTaskExecutor();
private static final Comparator<RpcDecoratedResultItem> comparator =
Comparator.comparing(RpcDecoratedResultItem::getRankingScore);
private List<RpcDecoratedResultItem> executeQueries(RpcIndexQuery indexRequest, int totalSize) throws InterruptedException
{
List<Callable<List<RpcDecoratedResultItem>>> tasks = createTasks(indexRequest);
@ -119,8 +122,6 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
return tasks;
}
private static final Comparator<RpcDecoratedResultItem> comparator =
Comparator.comparing(RpcDecoratedResultItem::getRankingScore);
private boolean isBlacklisted(RpcDecoratedResultItem item) {
return blacklist.isBlacklisted(UrlIdCodec.getDomainId(item.getRawItem().getCombinedId()));