diff --git a/code/api/process-mqapi/build.gradle b/code/api/process-mqapi/build.gradle new file mode 100644 index 00000000..0b360576 --- /dev/null +++ b/code/api/process-mqapi/build.gradle @@ -0,0 +1,30 @@ +plugins { + id 'java' + id "io.freefair.lombok" version "5.3.3.3" + + id 'jvm-test-suite' +} + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(17)) + } +} + +dependencies { + implementation project(':code:common:db') + + testImplementation libs.bundles.slf4j.test + testImplementation libs.bundles.junit + testImplementation libs.mockito +} + +test { + useJUnitPlatform() +} + +task fastTests(type: Test) { + useJUnitPlatform { + excludeTags "slow" + } +} \ No newline at end of file diff --git a/code/process-models/converting-model/src/main/java/nu/marginalia/converting/mqapi/ConverterInboxNames.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/ProcessInboxNames.java similarity index 50% rename from code/process-models/converting-model/src/main/java/nu/marginalia/converting/mqapi/ConverterInboxNames.java rename to code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/ProcessInboxNames.java index 5ce3ebff..9ca91fe6 100644 --- a/code/process-models/converting-model/src/main/java/nu/marginalia/converting/mqapi/ConverterInboxNames.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/ProcessInboxNames.java @@ -1,6 +1,7 @@ -package nu.marginalia.converting.mqapi; +package nu.marginalia.mqapi; -public class ConverterInboxNames { +public class ProcessInboxNames { public static final String CONVERTER_INBOX = "converter"; public static final String LOADER_INBOX = "loader"; + public static final String CRAWLER_INBOX = "crawler"; } diff --git a/code/process-models/converting-model/src/main/java/nu/marginalia/converting/mqapi/ConvertRequest.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java similarity index 85% rename from code/process-models/converting-model/src/main/java/nu/marginalia/converting/mqapi/ConvertRequest.java rename to code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java index 881d75a2..64091146 100644 --- a/code/process-models/converting-model/src/main/java/nu/marginalia/converting/mqapi/ConvertRequest.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java @@ -1,4 +1,4 @@ -package nu.marginalia.converting.mqapi; +package nu.marginalia.mqapi.converting; import lombok.AllArgsConstructor; import nu.marginalia.db.storage.model.FileStorageId; diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/mqapi/CrawlRequest.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java similarity index 66% rename from code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/mqapi/CrawlRequest.java rename to code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java index 53f387d5..5aaecc5d 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/mqapi/CrawlRequest.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java @@ -1,8 +1,10 @@ -package nu.marginalia.crawling.mqapi; +package nu.marginalia.mqapi.crawling; +import lombok.AllArgsConstructor; import nu.marginalia.db.storage.model.FileStorageId; /** A request to start a crawl */ +@AllArgsConstructor public class CrawlRequest { FileStorageId specStorage; FileStorageId crawlStorage; diff --git a/code/process-models/converting-model/src/main/java/nu/marginalia/converting/mqapi/LoadRequest.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/loading/LoadRequest.java similarity index 81% rename from code/process-models/converting-model/src/main/java/nu/marginalia/converting/mqapi/LoadRequest.java rename to code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/loading/LoadRequest.java index 186f0f7e..eff92c9c 100644 --- a/code/process-models/converting-model/src/main/java/nu/marginalia/converting/mqapi/LoadRequest.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/loading/LoadRequest.java @@ -1,4 +1,4 @@ -package nu.marginalia.converting.mqapi; +package nu.marginalia.mqapi.loading; import lombok.AllArgsConstructor; import nu.marginalia.db.storage.model.FileStorageId; @@ -6,5 +6,4 @@ import nu.marginalia.db.storage.model.FileStorageId; @AllArgsConstructor public class LoadRequest { public FileStorageId processedDataStorage; - } diff --git a/code/api/readme.md b/code/api/readme.md index 4b19381f..f98f326a 100644 --- a/code/api/readme.md +++ b/code/api/readme.md @@ -1,4 +1,10 @@ -# Core Service Clients +# Clients + +## Core Services + +* [assistant-api](assistant-api/) +* [search-api](search-api/) +* [index-api](index-api/) These are clients for the [core services](../services-core/), along with what models are necessary for speaking to them. They each implement the abstract client classes from @@ -8,3 +14,10 @@ All that is necessary is to `@Inject` them into the constructor and then requests can be sent. **Note:** If you are looking for the public API, it's handled by the api service in [services-satellite/api-service](../services-satellite/api-service). + +## MQ-API Process API + +[process-mqapi](process-mqapi/) defines requests and inboxes for the message queue based API used +for interacting with processes. + +See [common/message-queue](../common/message-queue) and [services-satellite/control-service](../services-satellite/control-service). \ No newline at end of file diff --git a/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageBaseId.java b/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageBaseId.java index e4dbaf68..1c7ededd 100644 --- a/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageBaseId.java +++ b/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageBaseId.java @@ -1,3 +1,8 @@ package nu.marginalia.db.storage.model; -public record FileStorageBaseId(long id) {} +public record FileStorageBaseId(long id) { + + public String toString() { + return Long.toString(id); + } +} diff --git a/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageId.java b/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageId.java index 43e5503f..3d6331e3 100644 --- a/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageId.java +++ b/code/common/db/src/main/java/nu/marginalia/db/storage/model/FileStorageId.java @@ -4,4 +4,8 @@ public record FileStorageId(long id) { public static FileStorageId of(int storageId) { return new FileStorageId(storageId); } + + public String toString() { + return Long.toString(id); + } } diff --git a/code/common/message-queue/readme.md b/code/common/message-queue/readme.md index 20e59642..cbb5082c 100644 --- a/code/common/message-queue/readme.md +++ b/code/common/message-queue/readme.md @@ -5,4 +5,79 @@ as well as a finite state machine library backed by the message queue that enables long-running tasks that outlive the execution lifespan of the involved processes. -![Message States](msgstate.svg) \ No newline at end of file +![Message States](msgstate.svg) + +The message queue is interacted with via the Inbox and Outbox classes. + +There are three types of inboxes; + +Name|Description +---|--- +MqSingleShotInbox|A single message is received and then the inbox is closed. +MqAsynchronousInbox|Messages are received asynchronously and can be processed in parallel. +MqSynchronousInbox|Messages are received synchronously and will be processed in order; message processing can be aborted. + +A single outbox implementation exists, the `MqOutbox`, which implements multiple message sending strategies, +including blocking and asynchronous paradigms. Lower level access to the message queue itself is provided by the `MqPersistence` class. + +The inbox implementations as well as the outbox can be constructed via the `MessageQueueFactory` class. + +## Message Queue State Machine (MQSM) + +The MQSM is a finite state machine that is backed by the message queue. The machine itself +is defined through a class that extends the 'AbstractStateGraph'; with state transitions and +names defined as implementations. + +Example: + +```java +class ExampleStateMachine extends AbstractStateGraph { + + @GraphState(name = "INITIAL", next="GREET") + public void initial() { + return "World"; // passed to the next state + } + + @GraphState(name = "GREET", next="COUNT-TO-FIVE") + public void greet(String name) { + System.out.println("Hello " + name); + } + + @GraphState(name = "COUNT-TO-FIVE", next="END") + public void countToFive(Integer value) { + // value is passed from the previous state, since greet didn't pass a value, + // null will be the default. + + if (null == value) { + // jumps to the current state with a value of 0 + transition("COUNT-TO-FIVE", 0); + } + + + System.out.println(++value); + if (value < 5) { + // Loops the current state until value = 5 + transition("COUNT-TO-FIVE", value); + } + + if (value > 5) { + // demonstrates an error condition + error("Illegal value"); + } + + // Default transition is to END + } + + @GraphState(name="END") + public void end() { + System.out.println("Done"); + } +} +``` + +Each method should ideally be idempotent, or at least be able to handle being called multiple times. +It can not be assumed that the states are invoked within the same process, or even on the same machine, +on the same day, etc. + +The usual considerations for writing deterministic Java code are advisable unless unavoidable; +all state must be local, don't iterate over hash maps, etc. \ No newline at end of file diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java index 6a143157..cd7824a7 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateFactory.java @@ -28,6 +28,11 @@ public class StateFactory { @Override public StateTransition next(String message) { + + if (message.equals("")) { + return logic.apply(null); + } + return logic.apply(gson.fromJson(message, param)); } @@ -72,6 +77,11 @@ public class StateFactory { } public StateTransition transition(String state, Object message) { + + if (null == message) { + return StateTransition.to(state); + } + return StateTransition.to(state, gson.toJson(message)); } diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java index e3937894..36ed81cf 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java @@ -186,8 +186,16 @@ public class StateMachine { if (resumeState.resumeBehavior().equals(ResumeBehavior.ERROR)) { // The message is acknowledged, but the state does not support resuming smOutbox.notify(expectedMessage.id, "ERROR", "Illegal resumption from ACK'ed state " + message.function()); - } else { + } + else if (resumeState.resumeBehavior().equals(ResumeBehavior.RESTART)) { + this.state = resumeState; + // The message is already acknowledged, we flag it as dead and then send an identical message + smOutbox.flagAsDead(message.msgId()); + expectedMessage = ExpectedMessage.responseTo(message); + smOutbox.notify(message.msgId(), "INITIAL", ""); + } + else { this.state = resumeState; // The message is already acknowledged, we flag it as dead and then send an identical message diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java index 2e275cb5..33dacb5d 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/graph/ResumeBehavior.java @@ -4,5 +4,7 @@ public enum ResumeBehavior { /** Retry the state on resume */ RETRY, /** Jump to ERROR on resume if the message has been acknowledged */ - ERROR + ERROR, + /** Jump to INITIAL on resume */ + RESTART } diff --git a/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineNullTest.java b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineNullTest.java new file mode 100644 index 00000000..301b75a1 --- /dev/null +++ b/code/common/message-queue/src/test/java/nu/marginalia/mqsm/StateMachineNullTest.java @@ -0,0 +1,98 @@ +package nu.marginalia.mqsm; + +import com.google.gson.GsonBuilder; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.mq.MessageQueueFactory; +import nu.marginalia.mq.MqTestUtil; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.parallel.Execution; +import org.testcontainers.containers.MariaDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.fail; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; + +@Tag("slow") +@Testcontainers +@Execution(SAME_THREAD) +public class StateMachineNullTest { + @Container + static MariaDBContainer mariaDBContainer = new MariaDBContainer<>("mariadb") + .withDatabaseName("WMSA_prod") + .withUsername("wmsa") + .withPassword("wmsa") + .withInitScript("sql/current/12-message-queue.sql") + .withNetworkAliases("mariadb"); + + static HikariDataSource dataSource; + static MqPersistence persistence; + static MessageQueueFactory messageQueueFactory; + private String inboxId; + + @BeforeEach + public void setUp() { + inboxId = UUID.randomUUID().toString(); + } + @BeforeAll + public static void setUpAll() { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(mariaDBContainer.getJdbcUrl()); + config.setUsername("wmsa"); + config.setPassword("wmsa"); + + dataSource = new HikariDataSource(config); + persistence = new MqPersistence(dataSource); + messageQueueFactory = new MessageQueueFactory(persistence); + } + + @AfterAll + public static void tearDownAll() { + dataSource.close(); + } + + public static class TestGraph extends AbstractStateGraph { + public TestGraph(StateFactory stateFactory) { + super(stateFactory); + } + + @GraphState(name = "INITIAL", next = "GREET") + public void initial() {} + + @GraphState(name = "GREET", next = "END") + public void greet(String message) { + if (null == message) { + System.out.println("Hello, null!"); + return; + } + Assertions.fail("Should not be called"); + } + + } + + @Test + public void testStateGraphNullSerialization() throws Exception { + var stateFactory = new StateFactory(new GsonBuilder().create()); + var graph = new TestGraph(stateFactory); + + + var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), graph); + sm.registerStates(graph); + + sm.init(); + + sm.join(2, TimeUnit.SECONDS); + sm.stop(); + + MqTestUtil.getMessages(dataSource, inboxId).forEach(System.out::println); + + } + +} diff --git a/code/processes/converting-process/build.gradle b/code/processes/converting-process/build.gradle index 7a9121d8..b85a829b 100644 --- a/code/processes/converting-process/build.gradle +++ b/code/processes/converting-process/build.gradle @@ -26,7 +26,9 @@ dependencies { implementation project(':third-party:porterstemmer') implementation project(':third-party:count-min-sketch') + implementation project(':code:api:index-api') + implementation project(':code:api:process-mqapi') implementation project(':code:common:model') implementation project(':code:common:db') diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index a42f5b67..36e5b558 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -29,7 +29,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static nu.marginalia.converting.mqapi.ConverterInboxNames.CONVERTER_INBOX; +import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX; public class ConverterMain { @@ -176,10 +176,10 @@ public class ConverterMain { var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, UUID.randomUUID()); - var msgOpt = getMessage(inbox, nu.marginalia.converting.mqapi.ConvertRequest.class.getSimpleName()); + var msgOpt = getMessage(inbox, nu.marginalia.mqapi.converting.ConvertRequest.class.getSimpleName()); var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); - var request = gson.fromJson(msg.payload(), nu.marginalia.converting.mqapi.ConvertRequest.class); + var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class); var crawlData = fileStorageService.getStorage(request.crawlStorage); var processData = fileStorageService.getStorage(request.processedDataStorage); diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterModule.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterModule.java index 121159ed..90d4e3ad 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterModule.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterModule.java @@ -2,26 +2,13 @@ package nu.marginalia.converting; import com.google.gson.Gson; import com.google.inject.AbstractModule; -import com.google.inject.Provides; -import com.google.inject.Singleton; import com.google.inject.name.Names; -import lombok.SneakyThrows; import nu.marginalia.LanguageModels; import nu.marginalia.ProcessConfiguration; import nu.marginalia.WmsaHome; -import nu.marginalia.converting.mqapi.ConvertRequest; -import nu.marginalia.db.storage.FileStorageService; -import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.persistence.MqPersistence; -import plan.CrawlPlan; import nu.marginalia.model.gson.GsonFactory; -import plan.CrawlPlanLoader; -import java.io.IOException; -import java.nio.file.Path; -import java.sql.SQLException; import java.util.UUID; -import java.util.concurrent.TimeUnit; public class ConverterModule extends AbstractModule { diff --git a/code/processes/loading-process/build.gradle b/code/processes/loading-process/build.gradle index caba9812..d204247d 100644 --- a/code/processes/loading-process/build.gradle +++ b/code/processes/loading-process/build.gradle @@ -19,7 +19,7 @@ tasks.distZip.enabled = false dependencies { implementation project(':code:common:process') - + implementation project(':code:api:process-mqapi') implementation project(':code:api:index-api') implementation project(':code:common:model') implementation project(':code:common:db') diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java index 08649808..f6ccc79d 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java @@ -7,7 +7,6 @@ import com.google.inject.Injector; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; import nu.marginalia.db.storage.FileStorageService; -import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.inbox.MqInboxResponse; @@ -30,7 +29,7 @@ import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import static nu.marginalia.converting.mqapi.ConverterInboxNames.LOADER_INBOX; +import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX; public class LoaderMain { private static final Logger logger = LoggerFactory.getLogger(LoaderMain.class); @@ -215,16 +214,16 @@ public class LoaderMain { var inbox = messageQueueFactory.createSingleShotInbox(LOADER_INBOX, UUID.randomUUID()); - var msgOpt = getMessage(inbox, nu.marginalia.converting.mqapi.LoadRequest.class.getSimpleName()); + var msgOpt = getMessage(inbox, nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName()); if (msgOpt.isEmpty()) throw new RuntimeException("No instruction received in inbox"); var msg = msgOpt.get(); - if (!nu.marginalia.converting.mqapi.LoadRequest.class.getSimpleName().equals(msg.function())) { + if (!nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName().equals(msg.function())) { throw new RuntimeException("Unexpected message in inbox: " + msg); } - var request = gson.fromJson(msg.payload(), nu.marginalia.converting.mqapi.LoadRequest.class); + var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.loading.LoadRequest.class); var processData = fileStorageService.getStorage(request.processedDataStorage); diff --git a/code/services-satellite/control-service/build.gradle b/code/services-satellite/control-service/build.gradle index 72c0552e..90b832da 100644 --- a/code/services-satellite/control-service/build.gradle +++ b/code/services-satellite/control-service/build.gradle @@ -22,6 +22,8 @@ tasks.distZip.enabled = false apply from: "$rootProject.projectDir/docker-service.gradle" dependencies { + implementation libs.bundles.gson + implementation project(':code:common:db') implementation project(':code:common:model') implementation project(':code:common:service') @@ -30,10 +32,9 @@ dependencies { implementation project(':code:common:message-queue') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') - implementation project(':code:process-models:converting-model') - implementation project(':code:process-models:crawling-model') implementation project(':code:api:search-api') implementation project(':code:api:index-api') + implementation project(':code:api:process-mqapi') implementation libs.lombok @@ -43,11 +44,11 @@ dependencies { implementation libs.prometheus implementation libs.notnull implementation libs.guice + implementation libs.trove implementation libs.spark implementation libs.fastutil implementation libs.commons.io - implementation libs.bundles.gson implementation libs.bundles.mariadb testImplementation libs.bundles.slf4j.test diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java index 34e600f5..a3db382b 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -77,38 +77,31 @@ public class ControlService extends Service { (req, rsp) -> Map.of("storage", controlFileStorageService.getStorageList()), (map) -> storageRenderer.render((Map) map)); + final HtmlRedirect redirectToServices = new HtmlRedirect("/services"); + final HtmlRedirect redirectToProcesses = new HtmlRedirect("/processes"); + final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage"); + Spark.post("/public/fsms/:fsm/start", (req, rsp) -> { controlFSMs.start(ControlProcess.valueOf(req.params("fsm").toUpperCase())); - return """ - - - """; - }); + return ""; + }, redirectToProcesses); + Spark.post("/public/fsms/:fsm/stop", (req, rsp) -> { controlFSMs.stop(ControlProcess.valueOf(req.params("fsm").toUpperCase())); - return """ - - - """; - }); + return ""; + }, redirectToProcesses); // TODO: This should be a POST Spark.get("/public/repartition", (req, rsp) -> { controlFSMs.start(ControlProcess.REPARTITION_REINDEX); - return """ - - - """; - }); + return ""; + } , redirectToProcesses); - // TODO: This should be a POST - Spark.get("/public/reconvert/:fid", (req, rsp) -> { + Spark.post("/public/storage/:fid/process", (req, rsp) -> { controlFSMs.start(ControlProcess.RECONVERT_LOAD, FileStorageId.of(Integer.parseInt(req.params("fid")))); - return """ - - - """; - }); + return ""; + }, redirectToProcesses); + Spark.post("/public/storage/:fid/delete", controlFileStorageService::flagFileForDeletionRequest, redirectToStorage); Spark.get("/public/:resource", this::serveStatic); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/HtmlRedirect.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/HtmlRedirect.java new file mode 100644 index 00000000..fd49bd6d --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/HtmlRedirect.java @@ -0,0 +1,19 @@ +package nu.marginalia.control; + +import spark.ResponseTransformer; + +public class HtmlRedirect implements ResponseTransformer { + private final String html; + + public HtmlRedirect(String destination) { + this.html = """ + + + """.formatted(destination); + } + + @Override + public String render(Object any) throws Exception { + return html; + } +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/ConverterMonitorFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/ConverterMonitorFSM.java index d5dd3908..674d064a 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/ConverterMonitorFSM.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/ConverterMonitorFSM.java @@ -3,7 +3,7 @@ package nu.marginalia.control.fsm.monitor; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.control.svc.ProcessService; -import nu.marginalia.converting.mqapi.ConverterInboxNames; +import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqsm.StateFactory; @@ -15,7 +15,7 @@ public class ConverterMonitorFSM extends AbstractProcessSpawnerFSM { public ConverterMonitorFSM(StateFactory stateFactory, MqPersistence persistence, ProcessService processService) { - super(stateFactory, persistence, processService, ConverterInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER); + super(stateFactory, persistence, processService, ProcessInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER); } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/LoaderMonitorFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/LoaderMonitorFSM.java index ff81433e..69015f65 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/LoaderMonitorFSM.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/monitor/LoaderMonitorFSM.java @@ -3,7 +3,7 @@ package nu.marginalia.control.fsm.monitor; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.control.svc.ProcessService; -import nu.marginalia.converting.mqapi.ConverterInboxNames; +import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqsm.StateFactory; @@ -17,7 +17,7 @@ public class LoaderMonitorFSM extends AbstractProcessSpawnerFSM { ProcessService processService) { super(stateFactory, persistence, processService, - ConverterInboxNames.LOADER_INBOX, + ProcessInboxNames.LOADER_INBOX, ProcessService.ProcessId.LOADER); } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java index 19881851..781882a5 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/fsm/task/ReconvertAndLoadFSM.java @@ -8,8 +8,8 @@ import lombok.NoArgsConstructor; import lombok.With; import nu.marginalia.control.svc.ProcessOutboxFactory; import nu.marginalia.control.svc.ProcessService; -import nu.marginalia.converting.mqapi.ConvertRequest; -import nu.marginalia.converting.mqapi.LoadRequest; +import nu.marginalia.mqapi.converting.ConvertRequest; +import nu.marginalia.mqapi.loading.LoadRequest; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.model.FileStorageBaseType; import nu.marginalia.db.storage.model.FileStorageId; diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageBaseWithStorage.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageBaseWithStorage.java index 94a39e2b..7411e3c7 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageBaseWithStorage.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageBaseWithStorage.java @@ -1,9 +1,10 @@ package nu.marginalia.control.model; -import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.db.storage.model.FileStorageBase; import java.util.List; -public record FileStorageBaseWithStorage(FileStorageBase base, List storage) { +public record FileStorageBaseWithStorage(FileStorageBase base, + List storage) +{ } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java new file mode 100644 index 00000000..927262d2 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/FileStorageWithActions.java @@ -0,0 +1,16 @@ +package nu.marginalia.control.model; + +import nu.marginalia.db.storage.model.FileStorage; +import nu.marginalia.db.storage.model.FileStorageType; + +public record FileStorageWithActions(FileStorage storage) { + public boolean isLoadable() { + return storage.type() == FileStorageType.PROCESSED_DATA; + } + public boolean isConvertible() { + return storage.type() == FileStorageType.CRAWL_DATA; + } + public boolean isDeletable() { + return storage.type() == FileStorageType.PROCESSED_DATA; + } +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlFileStorageService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlFileStorageService.java index 04dc34ae..982c42e0 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlFileStorageService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlFileStorageService.java @@ -5,6 +5,7 @@ import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; import nu.marginalia.control.model.FileStorageBaseWithStorage; +import nu.marginalia.control.model.FileStorageWithActions; import nu.marginalia.control.model.ProcessHeartbeat; import nu.marginalia.control.model.ServiceHeartbeat; import nu.marginalia.db.storage.FileStorageService; @@ -12,6 +13,8 @@ import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.db.storage.model.FileStorageBase; import nu.marginalia.db.storage.model.FileStorageBaseId; import nu.marginalia.db.storage.model.FileStorageId; +import spark.Request; +import spark.Response; import java.sql.SQLException; import java.util.ArrayList; @@ -30,10 +33,24 @@ public class ControlFileStorageService { this.fileStorageService = fileStorageService; } + public Object flagFileForDeletionRequest(Request request, Response response) throws SQLException { + FileStorageId fid = new FileStorageId(Long.parseLong(request.params(":fid"))); + flagFileForDeletion(fid); + return ""; + } + + public void flagFileForDeletion(FileStorageId id) throws SQLException { + try (var conn = dataSource.getConnection(); + var flagStmt = conn.prepareStatement("UPDATE FILE_STORAGE SET DO_PURGE = TRUE WHERE ID = ?")) { + flagStmt.setLong(1, id.id()); + flagStmt.executeUpdate(); + } + } + @SneakyThrows public List getStorageList() { Map fileStorageBaseByBaseId = new HashMap<>(); - Map> fileStoragByBaseId = new HashMap<>(); + Map> fileStoragByBaseId = new HashMap<>(); List storageIds = new ArrayList<>(); @@ -48,12 +65,15 @@ public class ControlFileStorageService { for (var id : storageIds) { var storage = fileStorageService.getStorage(id); fileStorageBaseByBaseId.computeIfAbsent(storage.base().id(), k -> storage.base()); - fileStoragByBaseId.computeIfAbsent(storage.base().id(), k -> new ArrayList<>()).add(storage); + fileStoragByBaseId.computeIfAbsent(storage.base().id(), k -> new ArrayList<>()).add(new FileStorageWithActions(storage)); } List result = new ArrayList<>(); for (var baseId : fileStorageBaseByBaseId.keySet()) { - result.add(new FileStorageBaseWithStorage(fileStorageBaseByBaseId.get(baseId), fileStoragByBaseId.get(baseId))); + result.add(new FileStorageBaseWithStorage(fileStorageBaseByBaseId.get(baseId), + fileStoragByBaseId.get(baseId) + + )); } return result; diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessOutboxFactory.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessOutboxFactory.java index e1b5a3b1..4c296069 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessOutboxFactory.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessOutboxFactory.java @@ -2,7 +2,7 @@ package nu.marginalia.control.svc; import com.google.inject.Inject; import com.google.inject.Singleton; -import nu.marginalia.converting.mqapi.ConverterInboxNames; +import nu.marginalia.mqapi.ProcessInboxNames; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.service.server.BaseServiceParams; @@ -19,9 +19,9 @@ public class ProcessOutboxFactory { } public MqOutbox createConverterOutbox() { - return new MqOutbox(persistence, ConverterInboxNames.CONVERTER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid()); + return new MqOutbox(persistence, ProcessInboxNames.CONVERTER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid()); } public MqOutbox createLoaderOutbox() { - return new MqOutbox(persistence, ConverterInboxNames.LOADER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid()); + return new MqOutbox(persistence, ProcessInboxNames.LOADER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid()); } } diff --git a/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb b/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb index 72f55e29..68410646 100644 --- a/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb +++ b/code/services-satellite/control-service/src/main/resources/templates/control/storage.hdb @@ -33,10 +33,26 @@ {{#each storage}} - - {{type}} - {{path}} - {{description}} + + {{#if isLoadable}} +
+ +
+ {{/if}} + {{#if isConvertible}} +
+ +
+ {{/if}} + {{#if isDeletable}} +
+ +
+ {{/if}} + + {{storage.type}} + {{storage.path}} + {{storage.description}} {{/each}} {{/each}} diff --git a/settings.gradle b/settings.gradle index 41e0cb53..7e6d02a0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -44,6 +44,7 @@ include 'code:features-index:domain-ranking' include 'code:api:search-api' include 'code:api:index-api' include 'code:api:assistant-api' +include 'code:api:process-mqapi' include 'code:common:service-discovery' include 'code:common:service-client'