From 51e46ad2b0de938ef3212f8068bbc28e0733d040 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Thu, 21 Nov 2024 16:00:09 +0100 Subject: [PATCH] (refac) Move export tasks to a process and clean up process initialization for all ProcessMainClass descendents Since some of the export tasks have been memory hungry, sometimes killing the executor-services, they've been moved to a separate process that can be given a larger Xmx. While doing this, the ProcessMainClass was given utilities for the boilerplate surrounding receiving mq requests and responding to them, some effort was also put toward making the process boot process a bit more uniform. It's still a bit heterogeneous between different processes, but a bit less so for now. --- .../nodecfg/NodeConfigurationServiceTest.java | 5 +- code/common/process/build.gradle | 34 --- code/common/process/readme.md | 4 - .../process/resources/log4j2.properties | 9 - .../process}/ProcessConfiguration.java | 2 +- .../process}/ProcessConfigurationModule.java | 2 +- .../marginalia/process/ProcessMainClass.java | 102 +++++++++ .../process/control/FakeProcessHeartbeat.java | 0 .../control/ProcessAdHocTaskHeartbeat.java | 0 .../ProcessAdHocTaskHeartbeatImpl.java | 2 +- .../process/control/ProcessHeartbeat.java | 0 .../process/control/ProcessHeartbeatImpl.java | 9 +- .../process/control/ProcessTaskHeartbeat.java | 0 .../control/ProcessTaskHeartbeatImpl.java | 2 +- .../process/log/WorkLoadIterable.java | 0 .../nu/marginalia/process/log/WorkLog.java | 0 .../marginalia/process/log/WorkLogEntry.java | 0 .../nu/marginalia/service/ConfigLoader.java | 4 +- .../marginalia/service/ProcessMainClass.java | 20 -- .../marginalia/process/log/WorkLogTest.java | 0 code/execution/build.gradle | 4 +- code/execution/data-extractors/readme.md | 7 - .../nu/marginalia/actor/ExecutorActor.java | 1 + .../actor/ExecutorActorControlService.java | 2 + .../actor/proc/ExportTaskMonitorActor.java | 29 +++ .../actor/task/ExportAtagsActor.java | 60 +++-- .../actor/task/ExportFeedsActor.java | 50 +++-- .../actor/task/ExportSampleDataActor.java | 53 +++-- .../actor/task/ExportTermFreqActor.java | 55 +++-- .../TriggerAdjacencyCalculationActor.java | 60 ++--- .../marginalia/process/ProcessOutboxes.java | 11 + .../nu/marginalia/process/ProcessService.java | 19 +- code/index/build.gradle | 1 - code/index/index-forward/build.gradle | 2 +- code/index/index-reverse/build.gradle | 2 +- .../processes/converting-process/build.gradle | 1 - .../ft-anchor-keywords/build.gradle | 2 +- .../atags/source/AnchorTagsSourceFactory.java | 2 +- .../marginalia/converting/ConverterMain.java | 206 ++++++++---------- .../ConvertingIntegrationTestModule.java | 2 +- .../sideload/reddit/RedditSideloaderTest.java | 2 +- code/processes/crawling-process/build.gradle | 1 - .../java/nu/marginalia/crawl/CrawlerMain.java | 105 +++------ .../crawl/warc/WarcArchiverFactory.java | 2 +- .../crawling-process/model/build.gradle | 1 - .../export-task-process}/build.gradle | 39 +++- .../adjacencies/AdjacenciesData.java | 0 .../adjacencies/AdjacenciesLoader.java | 0 .../marginalia/adjacencies/DomainAliases.java | 0 .../adjacencies/SparseBitVector.java | 0 .../WebsiteAdjacenciesCalculator.java | 127 +++++++++++ .../nu/marginalia/extractor/AtagExporter.java | 0 .../nu/marginalia/extractor/ExporterIf.java | 0 .../nu/marginalia/extractor/FeedExporter.java | 0 .../extractor/SampleDataExporter.java | 0 .../extractor/TermFrequencyExporter.java | 0 .../nu/marginalia/task/ExportTasksMain.java | 82 +++++++ .../adjacencies/AdjacenciesLoaderTest.java | 0 .../adjacencies/SparseBitVectorTest.java | 0 .../index-constructor-process/build.gradle | 1 - .../index/IndexConstructorMain.java | 81 +------ .../live-crawling-process/build.gradle | 1 - .../livecrawler/LiveCrawlerMain.java | 78 +------ .../livecrawler/LiveCrawlerModule.java | 2 +- code/processes/loading-process/build.gradle | 1 - .../nu/marginalia/loading/LoaderMain.java | 97 ++------- .../loading/domains/DomainLoaderService.java | 2 +- .../marginalia/mqapi/ProcessInboxNames.java | 2 + .../mqapi/tasks/ExportTaskRequest.java | 57 +++++ .../build.gradle | 49 ----- .../WebsiteAdjacenciesCalculator.java | 198 ----------------- .../website-adjacencies-calculator/readme.md | 8 - .../explorer-service/readme.md | 1 - .../executor-service/build.gradle | 3 - code/services-core/index-service/build.gradle | 1 - code/tools/experiment-runner/build.gradle | 1 - code/tools/integration-test/build.gradle | 1 - .../test/IntegrationTestModule.java | 2 +- settings.gradle | 5 +- 79 files changed, 802 insertions(+), 912 deletions(-) delete mode 100644 code/common/process/build.gradle delete mode 100644 code/common/process/readme.md delete mode 100644 code/common/process/resources/log4j2.properties rename code/common/{process/java/nu/marginalia => service/java/nu/marginalia/process}/ProcessConfiguration.java (78%) rename code/common/{process/java/nu/marginalia => service/java/nu/marginalia/process}/ProcessConfigurationModule.java (97%) create mode 100644 code/common/service/java/nu/marginalia/process/ProcessMainClass.java rename code/common/{process => service}/java/nu/marginalia/process/control/FakeProcessHeartbeat.java (100%) rename code/common/{process => service}/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java (100%) rename code/common/{process => service}/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java (99%) rename code/common/{process => service}/java/nu/marginalia/process/control/ProcessHeartbeat.java (100%) rename code/common/{process => service}/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java (96%) rename code/common/{process => service}/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java (100%) rename code/common/{process => service}/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java (99%) rename code/common/{process => service}/java/nu/marginalia/process/log/WorkLoadIterable.java (100%) rename code/common/{process => service}/java/nu/marginalia/process/log/WorkLog.java (100%) rename code/common/{process => service}/java/nu/marginalia/process/log/WorkLogEntry.java (100%) delete mode 100644 code/common/service/java/nu/marginalia/service/ProcessMainClass.java rename code/common/{process => service}/test/nu/marginalia/process/log/WorkLogTest.java (100%) delete mode 100644 code/execution/data-extractors/readme.md create mode 100644 code/execution/java/nu/marginalia/actor/proc/ExportTaskMonitorActor.java rename code/{execution/data-extractors => processes/export-task-process}/build.gradle (57%) rename code/processes/{website-adjacencies-calculator => export-task-process}/java/nu/marginalia/adjacencies/AdjacenciesData.java (100%) rename code/processes/{website-adjacencies-calculator => export-task-process}/java/nu/marginalia/adjacencies/AdjacenciesLoader.java (100%) rename code/processes/{website-adjacencies-calculator => export-task-process}/java/nu/marginalia/adjacencies/DomainAliases.java (100%) rename code/processes/{website-adjacencies-calculator => export-task-process}/java/nu/marginalia/adjacencies/SparseBitVector.java (100%) create mode 100644 code/processes/export-task-process/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java rename code/{execution/data-extractors => processes/export-task-process}/java/nu/marginalia/extractor/AtagExporter.java (100%) rename code/{execution/data-extractors => processes/export-task-process}/java/nu/marginalia/extractor/ExporterIf.java (100%) rename code/{execution/data-extractors => processes/export-task-process}/java/nu/marginalia/extractor/FeedExporter.java (100%) rename code/{execution/data-extractors => processes/export-task-process}/java/nu/marginalia/extractor/SampleDataExporter.java (100%) rename code/{execution/data-extractors => processes/export-task-process}/java/nu/marginalia/extractor/TermFrequencyExporter.java (100%) create mode 100644 code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java rename code/processes/{website-adjacencies-calculator => export-task-process}/test/nu/marginalia/adjacencies/AdjacenciesLoaderTest.java (100%) rename code/processes/{website-adjacencies-calculator => export-task-process}/test/nu/marginalia/adjacencies/SparseBitVectorTest.java (100%) create mode 100644 code/processes/process-mq-api/java/nu/marginalia/mqapi/tasks/ExportTaskRequest.java delete mode 100644 code/processes/website-adjacencies-calculator/build.gradle delete mode 100644 code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java delete mode 100644 code/processes/website-adjacencies-calculator/readme.md diff --git a/code/common/config/test/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java b/code/common/config/test/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java index 63a18c93..a7bab69c 100644 --- a/code/common/config/test/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java +++ b/code/common/config/test/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java @@ -2,6 +2,7 @@ package nu.marginalia.nodecfg; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.nodecfg.model.NodeProfile; import nu.marginalia.test.TestMigrationLoader; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; @@ -46,8 +47,8 @@ public class NodeConfigurationServiceTest { @Test public void test() throws SQLException { - var a = nodeConfigurationService.create(1, "Test", false, false); - var b = nodeConfigurationService.create(2, "Foo", true, false); + var a = nodeConfigurationService.create(1, "Test", false, false, NodeProfile.MIXED); + var b = nodeConfigurationService.create(2, "Foo", true, false, NodeProfile.MIXED); assertEquals(1, a.node()); assertEquals("Test", a.description()); diff --git a/code/common/process/build.gradle b/code/common/process/build.gradle deleted file mode 100644 index 51289ed4..00000000 --- a/code/common/process/build.gradle +++ /dev/null @@ -1,34 +0,0 @@ -plugins { - id 'java' - - id 'jvm-test-suite' -} - -java { - toolchain { - languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion)) - } -} - -apply from: "$rootProject.projectDir/srcsets.gradle" - -dependencies { - implementation libs.notnull - - implementation libs.bundles.slf4j - testImplementation libs.bundles.slf4j.test - - implementation libs.guava - implementation libs.guava - implementation dependencies.create(libs.guice.get()) { - exclude group: 'com.google.guava' - } - implementation libs.bundles.mariadb - implementation libs.commons.lang3 - - implementation libs.snakeyaml - - testImplementation libs.bundles.slf4j.test - testImplementation libs.bundles.junit - testImplementation libs.mockito -} diff --git a/code/common/process/readme.md b/code/common/process/readme.md deleted file mode 100644 index 989a6193..00000000 --- a/code/common/process/readme.md +++ /dev/null @@ -1,4 +0,0 @@ -# Process - -Basic functionality for a Process. Processes must include this dependency to ensure -their loggers are configured properly! \ No newline at end of file diff --git a/code/common/process/resources/log4j2.properties b/code/common/process/resources/log4j2.properties deleted file mode 100644 index 18eaf147..00000000 --- a/code/common/process/resources/log4j2.properties +++ /dev/null @@ -1,9 +0,0 @@ -log4j2.isThreadContextMapInheritable=true -status = info -appender.console.type = Console -appender.console.name = LogToConsole -appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %highlight{%-5level}{FATAL=red, ERROR=red, WARN=yellow} %c{1}- %msg{nolookups}%n -appender.console.filter.http.type = MarkerFilter -rootLogger.level = info -rootLogger.appenderRef.console.ref = LogToConsole diff --git a/code/common/process/java/nu/marginalia/ProcessConfiguration.java b/code/common/service/java/nu/marginalia/process/ProcessConfiguration.java similarity index 78% rename from code/common/process/java/nu/marginalia/ProcessConfiguration.java rename to code/common/service/java/nu/marginalia/process/ProcessConfiguration.java index 35e1433f..45c3cf22 100644 --- a/code/common/process/java/nu/marginalia/ProcessConfiguration.java +++ b/code/common/service/java/nu/marginalia/process/ProcessConfiguration.java @@ -1,4 +1,4 @@ -package nu.marginalia; +package nu.marginalia.process; import java.util.UUID; diff --git a/code/common/process/java/nu/marginalia/ProcessConfigurationModule.java b/code/common/service/java/nu/marginalia/process/ProcessConfigurationModule.java similarity index 97% rename from code/common/process/java/nu/marginalia/ProcessConfigurationModule.java rename to code/common/service/java/nu/marginalia/process/ProcessConfigurationModule.java index 61214074..12784230 100644 --- a/code/common/process/java/nu/marginalia/ProcessConfigurationModule.java +++ b/code/common/service/java/nu/marginalia/process/ProcessConfigurationModule.java @@ -1,4 +1,4 @@ -package nu.marginalia; +package nu.marginalia.process; import com.google.inject.AbstractModule; import com.google.inject.name.Names; diff --git a/code/common/service/java/nu/marginalia/process/ProcessMainClass.java b/code/common/service/java/nu/marginalia/process/ProcessMainClass.java new file mode 100644 index 00000000..17758007 --- /dev/null +++ b/code/common/service/java/nu/marginalia/process/ProcessMainClass.java @@ -0,0 +1,102 @@ +package nu.marginalia.process; + +import com.google.gson.Gson; +import nu.marginalia.mq.MessageQueueFactory; +import nu.marginalia.mq.MqMessage; +import nu.marginalia.mq.inbox.MqInboxResponse; +import nu.marginalia.mq.inbox.MqSingleShotInbox; +import nu.marginalia.service.ConfigLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public abstract class ProcessMainClass { + private static final Logger logger = LoggerFactory.getLogger(ProcessMainClass.class); + + private final MessageQueueFactory messageQueueFactory; + private final int node; + private final String inboxName; + + static { + // Load global config ASAP + ConfigLoader.loadConfig( + ConfigLoader.getConfigPath("system") + ); + } + + private final Gson gson; + + public ProcessMainClass(MessageQueueFactory messageQueueFactory, + ProcessConfiguration config, + Gson gson, + String inboxName + ) { + this.gson = gson; + new org.mariadb.jdbc.Driver(); + this.messageQueueFactory = messageQueueFactory; + this.node = config.node(); + this.inboxName = inboxName; + } + + + protected Instructions fetchInstructions(Class requestType) throws Exception { + + var inbox = messageQueueFactory.createSingleShotInbox(inboxName, node, UUID.randomUUID()); + + logger.info("Waiting for instructions"); + + var msgOpt = getMessage(inbox, requestType.getSimpleName()); + var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); + + // for live crawl, request is empty for now + T request = gson.fromJson(msg.payload(), requestType); + + return new Instructions<>(msg, inbox, request); + } + + + private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws InterruptedException, SQLException { + var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); + if (opt.isPresent()) { + if (!opt.get().function().equals(expectedFunction)) { + throw new RuntimeException("Unexpected function: " + opt.get().function()); + } + return opt; + } + else { + var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); + stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); + return stolenMessage; + } + } + + + protected static class Instructions { + private final MqMessage message; + private final MqSingleShotInbox inbox; + private final T value; + Instructions(MqMessage message, MqSingleShotInbox inbox, T value) + { + this.message = message; + this.inbox = inbox; + this.value = value; + } + + public T value() { + return value; + } + + public void ok() { + inbox.sendResponse(message, MqInboxResponse.ok()); + } + public void err() { + inbox.sendResponse(message, MqInboxResponse.err()); + } + + } + +} diff --git a/code/common/process/java/nu/marginalia/process/control/FakeProcessHeartbeat.java b/code/common/service/java/nu/marginalia/process/control/FakeProcessHeartbeat.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/control/FakeProcessHeartbeat.java rename to code/common/service/java/nu/marginalia/process/control/FakeProcessHeartbeat.java diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java b/code/common/service/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java rename to code/common/service/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java b/code/common/service/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java similarity index 99% rename from code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java rename to code/common/service/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java index 1faf72f0..1069446c 100644 --- a/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java +++ b/code/common/service/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java @@ -2,7 +2,7 @@ package nu.marginalia.process.control; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; +import nu.marginalia.process.ProcessConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessHeartbeat.java b/code/common/service/java/nu/marginalia/process/control/ProcessHeartbeat.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/control/ProcessHeartbeat.java rename to code/common/service/java/nu/marginalia/process/control/ProcessHeartbeat.java diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java b/code/common/service/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java similarity index 96% rename from code/common/process/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java rename to code/common/service/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java index 6f85d890..3e4bd948 100644 --- a/code/common/process/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java +++ b/code/common/service/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java @@ -4,17 +4,18 @@ package nu.marginalia.process.control; import com.google.inject.Inject; import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; +import nu.marginalia.process.ProcessConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.sql.SQLException; import java.util.concurrent.TimeUnit; /** This service sends a heartbeat to the database every 5 seconds. */ @Singleton -public class ProcessHeartbeatImpl implements ProcessHeartbeat { +public class ProcessHeartbeatImpl implements ProcessHeartbeat, Closeable { private final Logger logger = LoggerFactory.getLogger(ProcessHeartbeatImpl.class); private final String processName; private final String processBase; @@ -169,5 +170,9 @@ public class ProcessHeartbeatImpl implements ProcessHeartbeat { } } } + + public void close() { + shutDown(); + } } diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java b/code/common/service/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java rename to code/common/service/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java b/code/common/service/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java similarity index 99% rename from code/common/process/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java rename to code/common/service/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java index f8d68b7e..2ea3da52 100644 --- a/code/common/process/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java +++ b/code/common/service/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java @@ -2,7 +2,7 @@ package nu.marginalia.process.control; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; +import nu.marginalia.process.ProcessConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/common/process/java/nu/marginalia/process/log/WorkLoadIterable.java b/code/common/service/java/nu/marginalia/process/log/WorkLoadIterable.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/log/WorkLoadIterable.java rename to code/common/service/java/nu/marginalia/process/log/WorkLoadIterable.java diff --git a/code/common/process/java/nu/marginalia/process/log/WorkLog.java b/code/common/service/java/nu/marginalia/process/log/WorkLog.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/log/WorkLog.java rename to code/common/service/java/nu/marginalia/process/log/WorkLog.java diff --git a/code/common/process/java/nu/marginalia/process/log/WorkLogEntry.java b/code/common/service/java/nu/marginalia/process/log/WorkLogEntry.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/log/WorkLogEntry.java rename to code/common/service/java/nu/marginalia/process/log/WorkLogEntry.java diff --git a/code/common/service/java/nu/marginalia/service/ConfigLoader.java b/code/common/service/java/nu/marginalia/service/ConfigLoader.java index d5c9f627..b19fca65 100644 --- a/code/common/service/java/nu/marginalia/service/ConfigLoader.java +++ b/code/common/service/java/nu/marginalia/service/ConfigLoader.java @@ -9,11 +9,11 @@ import java.util.Properties; public class ConfigLoader { - static Path getConfigPath(String configName) { + public static Path getConfigPath(String configName) { return WmsaHome.getHomePath().resolve("conf/properties/" + configName + ".properties"); } - static void loadConfig(Path configPath) { + public static void loadConfig(Path configPath) { if (!Files.exists(configPath)) { System.err.println("No config file found at " + configPath); return; diff --git a/code/common/service/java/nu/marginalia/service/ProcessMainClass.java b/code/common/service/java/nu/marginalia/service/ProcessMainClass.java deleted file mode 100644 index 6f66b57d..00000000 --- a/code/common/service/java/nu/marginalia/service/ProcessMainClass.java +++ /dev/null @@ -1,20 +0,0 @@ -package nu.marginalia.service; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class ProcessMainClass { - private static final Logger logger = LoggerFactory.getLogger(ProcessMainClass.class); - - static { - // Load global config ASAP - ConfigLoader.loadConfig( - ConfigLoader.getConfigPath("system") - ); - } - - public ProcessMainClass() { - new org.mariadb.jdbc.Driver(); - } - -} diff --git a/code/common/process/test/nu/marginalia/process/log/WorkLogTest.java b/code/common/service/test/nu/marginalia/process/log/WorkLogTest.java similarity index 100% rename from code/common/process/test/nu/marginalia/process/log/WorkLogTest.java rename to code/common/service/test/nu/marginalia/process/log/WorkLogTest.java diff --git a/code/execution/build.gradle b/code/execution/build.gradle index 7dca6b96..641642db 100644 --- a/code/execution/build.gradle +++ b/code/execution/build.gradle @@ -15,7 +15,7 @@ dependencies { // These look weird but they're needed to be able to spawn the processes // from the executor service - implementation project(':code:processes:website-adjacencies-calculator') + implementation project(':code:processes:export-task-process') implementation project(':code:processes:crawling-process') implementation project(':code:processes:live-crawling-process') implementation project(':code:processes:loading-process') @@ -24,7 +24,6 @@ dependencies { implementation project(':code:common:config') implementation project(':code:common:model') - implementation project(':code:common:process') implementation project(':code:common:db') implementation project(':code:common:linkdb') @@ -43,7 +42,6 @@ dependencies { implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:ft-link-parser') - implementation project(':code:execution:data-extractors') implementation project(':code:index:index-journal') implementation project(':code:index:api') implementation project(':code:processes:process-mq-api') diff --git a/code/execution/data-extractors/readme.md b/code/execution/data-extractors/readme.md deleted file mode 100644 index ea318e9f..00000000 --- a/code/execution/data-extractors/readme.md +++ /dev/null @@ -1,7 +0,0 @@ -Contains converter-*like* extraction jobs that operate on crawled data to produce export files. - -## Important classes - -* [AtagExporter](java/nu/marginalia/extractor/AtagExporter.java) - extracts anchor texts from the crawled data. -* [FeedExporter](java/nu/marginalia/extractor/FeedExporter.java) - tries to find RSS/Atom feeds within the crawled data. -* [TermFrequencyExporter](java/nu/marginalia/extractor/TermFrequencyExporter.java) - exports the 'TF' part of TF-IDF. \ No newline at end of file diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActor.java b/code/execution/java/nu/marginalia/actor/ExecutorActor.java index bcb9d904..c4bd43fd 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActor.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActor.java @@ -10,6 +10,7 @@ public enum ExecutorActor { RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_CONVERTER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), + PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), EXPORT_SEGMENTATION_MODEL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java index e93b93b5..7cb3e91c 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java @@ -59,6 +59,7 @@ public class ExecutorActorControlService { ExportSampleDataActor exportSampleDataActor, ExportTermFreqActor exportTermFrequenciesActor, ExportSegmentationModelActor exportSegmentationModelActor, + ExportTaskMonitorActor exportTasksMonitorActor, DownloadSampleActor downloadSampleActor, ScrapeFeedsActor scrapeFeedsActor, ExecutorActorStateMachines stateMachines, @@ -83,6 +84,7 @@ public class ExecutorActorControlService { register(ExecutorActor.PROC_LOADER_SPAWNER, loaderMonitor); register(ExecutorActor.PROC_CRAWLER_SPAWNER, crawlerMonitorActor); register(ExecutorActor.PROC_LIVE_CRAWL_SPAWNER, liveCrawlerMonitorActor); + register(ExecutorActor.PROC_EXPORT_TASKS_SPAWNER, exportTasksMonitorActor); register(ExecutorActor.MONITOR_PROCESS_LIVENESS, processMonitorFSM); register(ExecutorActor.MONITOR_FILE_STORAGE, fileStorageMonitorActor); diff --git a/code/execution/java/nu/marginalia/actor/proc/ExportTaskMonitorActor.java b/code/execution/java/nu/marginalia/actor/proc/ExportTaskMonitorActor.java new file mode 100644 index 00000000..b07f80e8 --- /dev/null +++ b/code/execution/java/nu/marginalia/actor/proc/ExportTaskMonitorActor.java @@ -0,0 +1,29 @@ +package nu.marginalia.actor.proc; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqapi.ProcessInboxNames; +import nu.marginalia.process.ProcessService; +import nu.marginalia.service.module.ServiceConfiguration; + +@Singleton +public class ExportTaskMonitorActor extends AbstractProcessSpawnerActor { + + @Inject + public ExportTaskMonitorActor(Gson gson, + ServiceConfiguration configuration, + MqPersistence persistence, + ProcessService processService) { + super(gson, + configuration, + persistence, + processService, + ProcessInboxNames.EXPORT_TASK_INBOX, + ProcessService.ProcessId.EXPORT_TASKS); + } + + +} diff --git a/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java b/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java index 5323302b..e8b4f341 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java @@ -3,50 +3,68 @@ package nu.marginalia.actor.task; import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; -import nu.marginalia.extractor.AtagExporter; -import nu.marginalia.extractor.ExporterIf; -import nu.marginalia.storage.model.*; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.ProcessService; import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageId; +import nu.marginalia.storage.model.FileStorageState; +import nu.marginalia.storage.model.FileStorageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.LocalDateTime; @Singleton public class ExportAtagsActor extends RecordActorPrototype { private final FileStorageService storageService; - private final ExporterIf atagExporter; + private final ActorProcessWatcher processWatcher; + private final MqOutbox exportTasksOutbox; + private final Logger logger = LoggerFactory.getLogger(getClass()); public record Export(FileStorageId crawlId) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {} + public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { + public Run(FileStorageId crawlId, FileStorageId destId) { + this(crawlId, destId, -1); + } + } + @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { case Export(FileStorageId crawlId) -> { - var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atag-export", "Anchor Tags " + LocalDateTime.now()); + var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atags-export", "Atags " + LocalDateTime.now()); if (storage == null) yield new Error("Bad storage id"); yield new Run(crawlId, storage.id()); } - case Run(FileStorageId crawlId, FileStorageId destId) -> { + case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); - try { - atagExporter.export(crawlId, destId); - storageService.setFileStorageState(destId, FileStorageState.UNSET); - } - catch (Exception ex) { - storageService.setFileStorageState(destId, FileStorageState.DELETE); - yield new Error("Failed to export data"); - } - - yield new End(); + long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.atags(crawlId, destId)); + yield new Run(crawlId, destId, newMsgId); } + case Run(_, FileStorageId destId, long msgId) -> { + var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); + + if (rsp.state() != MqMessageState.OK) { + storageService.flagFileForDeletion(destId); + yield new Error("Exporter failed"); + } + else { + storageService.setFileStorageState(destId, FileStorageState.UNSET); + yield new End(); + } + } + default -> new Error(); }; } - @Override public String describe() { return "Export anchor tags from crawl data"; @@ -55,11 +73,13 @@ public class ExportAtagsActor extends RecordActorPrototype { @Inject public ExportAtagsActor(Gson gson, FileStorageService storageService, - AtagExporter atagExporter) + ProcessOutboxes processOutboxes, + ActorProcessWatcher processWatcher) { super(gson); + this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); this.storageService = storageService; - this.atagExporter = atagExporter; + this.processWatcher = processWatcher; } } diff --git a/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java b/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java index faaaf528..7a0d612c 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java @@ -5,8 +5,11 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; -import nu.marginalia.extractor.ExporterIf; -import nu.marginalia.extractor.FeedExporter; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.ProcessService; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageState; @@ -19,11 +22,17 @@ import java.time.LocalDateTime; @Singleton public class ExportFeedsActor extends RecordActorPrototype { private final FileStorageService storageService; + private final ActorProcessWatcher processWatcher; + private final MqOutbox exportTasksOutbox; private final Logger logger = LoggerFactory.getLogger(getClass()); - private final ExporterIf feedExporter; public record Export(FileStorageId crawlId) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {} + public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { + public Run(FileStorageId crawlId, FileStorageId destId) { + this(crawlId, destId, -1); + } + } + @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { @@ -33,20 +42,25 @@ public class ExportFeedsActor extends RecordActorPrototype { if (storage == null) yield new Error("Bad storage id"); yield new Run(crawlId, storage.id()); } - case Run(FileStorageId crawlId, FileStorageId destId) -> { + case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); - try { - feedExporter.export(crawlId, destId); - storageService.setFileStorageState(destId, FileStorageState.UNSET); - } - catch (Exception ex) { - storageService.setFileStorageState(destId, FileStorageState.DELETE); - yield new Error("Failed to export data"); - } - - yield new End(); + long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.feeds(crawlId, destId)); + yield new Run(crawlId, destId, newMsgId); } + case Run(_, FileStorageId destId, long msgId) -> { + var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); + + if (rsp.state() != MqMessageState.OK) { + storageService.flagFileForDeletion(destId); + yield new Error("Exporter failed"); + } + else { + storageService.setFileStorageState(destId, FileStorageState.UNSET); + yield new End(); + } + } + default -> new Error(); }; } @@ -60,11 +74,13 @@ public class ExportFeedsActor extends RecordActorPrototype { @Inject public ExportFeedsActor(Gson gson, FileStorageService storageService, - FeedExporter feedExporter) + ActorProcessWatcher processWatcher, + ProcessOutboxes outboxes) { super(gson); this.storageService = storageService; - this.feedExporter = feedExporter; + this.processWatcher = processWatcher; + this.exportTasksOutbox = outboxes.getExportTasksOutbox(); } } diff --git a/code/execution/java/nu/marginalia/actor/task/ExportSampleDataActor.java b/code/execution/java/nu/marginalia/actor/task/ExportSampleDataActor.java index e47a803e..fa63a523 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportSampleDataActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportSampleDataActor.java @@ -5,7 +5,11 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; -import nu.marginalia.extractor.SampleDataExporter; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.ProcessService; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageState; @@ -18,11 +22,17 @@ import java.time.LocalDateTime; @Singleton public class ExportSampleDataActor extends RecordActorPrototype { private final FileStorageService storageService; + private final ActorProcessWatcher processWatcher; + private final MqOutbox exportTasksOutbox; private final Logger logger = LoggerFactory.getLogger(getClass()); - private final SampleDataExporter dataExporter; public record Export(FileStorageId crawlId, int size, String name) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId, int size, String name) implements ActorStep {} + public record Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) implements ActorStep { + public Run(FileStorageId crawlId, FileStorageId destId, int size, String name) { + this(crawlId, destId, size, name, -1); + } + } + @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { @@ -35,28 +45,29 @@ public class ExportSampleDataActor extends RecordActorPrototype { if (storage == null) yield new Error("Bad storage id"); yield new Run(crawlId, storage.id(), size, name); } - case Run(FileStorageId crawlId, FileStorageId destId, int size, String name) -> { + case Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); - try { - dataExporter.export(crawlId, destId, size, name); - storageService.setFileStorageState(destId, FileStorageState.UNSET); - } - catch (Exception ex) { - storageService.setFileStorageState(destId, FileStorageState.DELETE); - - logger.error("Failed to export data", ex); - - yield new Error("Failed to export data"); - } - - yield new End(); + long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.sampleData(crawlId, destId, size, name)); + yield new Run(crawlId, destId, size, name, newMsgId); } + case Run(_, FileStorageId destId, _, _, long msgId) -> { + var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); + + if (rsp.state() != MqMessageState.OK) { + storageService.flagFileForDeletion(destId); + yield new Error("Exporter failed"); + } + else { + storageService.setFileStorageState(destId, FileStorageState.UNSET); + yield new End(); + } + } + default -> new Error(); }; } - @Override public String describe() { return "Export RSS/Atom feeds from crawl data"; @@ -65,11 +76,13 @@ public class ExportSampleDataActor extends RecordActorPrototype { @Inject public ExportSampleDataActor(Gson gson, FileStorageService storageService, - SampleDataExporter dataExporter) + ProcessOutboxes processOutboxes, + ActorProcessWatcher processWatcher) { super(gson); this.storageService = storageService; - this.dataExporter = dataExporter; + this.processWatcher = processWatcher; + this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); } } diff --git a/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java b/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java index a9191060..d4c50686 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java @@ -5,45 +5,62 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; -import nu.marginalia.extractor.ExporterIf; -import nu.marginalia.extractor.TermFrequencyExporter; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.ProcessService; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageState; import nu.marginalia.storage.model.FileStorageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.LocalDateTime; @Singleton public class ExportTermFreqActor extends RecordActorPrototype { private final FileStorageService storageService; - private final ExporterIf exporter; + private final ActorProcessWatcher processWatcher; + private final MqOutbox exportTasksOutbox; + private final Logger logger = LoggerFactory.getLogger(getClass()); + public record Export(FileStorageId crawlId) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {} + public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { + public Run(FileStorageId crawlId, FileStorageId destId) { + this(crawlId, destId, -1); + } + } @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { case Export(FileStorageId crawlId) -> { - var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq-export", "Term Frequencies " + LocalDateTime.now()); + var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq", "Term Frequencies " + LocalDateTime.now()); if (storage == null) yield new Error("Bad storage id"); yield new Run(crawlId, storage.id()); } - case Run(FileStorageId crawlId, FileStorageId destId) -> { + case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); - try { - exporter.export(crawlId, destId); - storageService.setFileStorageState(destId, FileStorageState.UNSET); - } - catch (Exception ex) { - storageService.setFileStorageState(destId, FileStorageState.DELETE); - yield new Error("Failed to export data"); - } - - yield new End(); + long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.termFreq(crawlId, destId)); + yield new Run(crawlId, destId, newMsgId); } + case Run(_, FileStorageId destId, long msgId) -> { + var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); + + if (rsp.state() != MqMessageState.OK) { + storageService.flagFileForDeletion(destId); + yield new Error("Exporter failed"); + } + else { + storageService.setFileStorageState(destId, FileStorageState.UNSET); + yield new End(); + } + } + default -> new Error(); }; } @@ -57,11 +74,13 @@ public class ExportTermFreqActor extends RecordActorPrototype { @Inject public ExportTermFreqActor(Gson gson, FileStorageService storageService, - TermFrequencyExporter exporter) + ProcessOutboxes processOutboxes, + ActorProcessWatcher processWatcher) { super(gson); this.storageService = storageService; - this.exporter = exporter; + this.processWatcher = processWatcher; + this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); } } diff --git a/code/execution/java/nu/marginalia/actor/task/TriggerAdjacencyCalculationActor.java b/code/execution/java/nu/marginalia/actor/task/TriggerAdjacencyCalculationActor.java index f81aeb79..e4a46d84 100644 --- a/code/execution/java/nu/marginalia/actor/task/TriggerAdjacencyCalculationActor.java +++ b/code/execution/java/nu/marginalia/actor/task/TriggerAdjacencyCalculationActor.java @@ -5,53 +5,59 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - @Singleton public class TriggerAdjacencyCalculationActor extends RecordActorPrototype { + private final ActorProcessWatcher processWatcher; + private final MqOutbox exportTasksOutbox; private final Logger logger = LoggerFactory.getLogger(getClass()); - private final ProcessService processService; - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - public record Run() implements ActorStep {} + public record Run(long msgId) implements ActorStep { + public Run() { + this(-1); + } + } @Override public ActorStep transition(ActorStep self) throws Exception { - return switch (self) { - case Run() -> { - AtomicBoolean hasError = new AtomicBoolean(false); - var future = executor.submit(() -> { - try { - processService.trigger(ProcessService.ProcessId.ADJACENCIES_CALCULATOR, "load"); - } - catch (Exception ex) { - logger.warn("Error triggering adjacency calculation", ex); - hasError.set(true); - } - }); - future.get(); - - if (hasError.get()) { - yield new Error("Error triggering adjacency calculation"); - } - yield new End(); + return switch(self) { + case Run(long msgId) when msgId < 0 -> { + long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.adjacencies()); + yield new Run(newMsgId); } + case Run(long msgId) -> { + var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); + + if (rsp.state() != MqMessageState.OK) { + yield new Error("Exporter failed"); + } + else { + yield new End(); + } + } + default -> new Error(); }; } + @Inject public TriggerAdjacencyCalculationActor(Gson gson, - ProcessService processService) { + ProcessOutboxes processOutboxes, + ActorProcessWatcher processWatcher) { super(gson); - this.processService = processService; + + this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); + this.processWatcher = processWatcher; + } @Override diff --git a/code/execution/java/nu/marginalia/process/ProcessOutboxes.java b/code/execution/java/nu/marginalia/process/ProcessOutboxes.java index 8600b86c..4f2e79d5 100644 --- a/code/execution/java/nu/marginalia/process/ProcessOutboxes.java +++ b/code/execution/java/nu/marginalia/process/ProcessOutboxes.java @@ -14,6 +14,7 @@ public class ProcessOutboxes { private final MqOutbox crawlerOutbox; private final MqOutbox indexConstructorOutbox; private final MqOutbox liveCrawlerOutbox; + private final MqOutbox exportTasksOutbox; @Inject public ProcessOutboxes(BaseServiceParams params, MqPersistence persistence) { @@ -53,6 +54,14 @@ public class ProcessOutboxes { params.configuration.node(), params.configuration.instanceUuid() ); + + exportTasksOutbox = new MqOutbox(persistence, + ProcessInboxNames.EXPORT_TASK_INBOX, + params.configuration.node(), + params.configuration.serviceName(), + params.configuration.node(), + params.configuration.instanceUuid() + ); } @@ -71,4 +80,6 @@ public class ProcessOutboxes { public MqOutbox getIndexConstructorOutbox() { return indexConstructorOutbox; } public MqOutbox getLiveCrawlerOutbox() { return liveCrawlerOutbox; } + + public MqOutbox getExportTasksOutbox() { return exportTasksOutbox; } } diff --git a/code/execution/java/nu/marginalia/process/ProcessService.java b/code/execution/java/nu/marginalia/process/ProcessService.java index 89707f94..f38651e1 100644 --- a/code/execution/java/nu/marginalia/process/ProcessService.java +++ b/code/execution/java/nu/marginalia/process/ProcessService.java @@ -3,15 +3,14 @@ package nu.marginalia.process; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.WmsaHome; -import nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator; import nu.marginalia.converting.ConverterMain; import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.index.IndexConstructorMain; import nu.marginalia.livecrawler.LiveCrawlerMain; import nu.marginalia.loading.LoaderMain; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.BaseServiceParams; +import nu.marginalia.task.ExportTasksMain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -38,13 +37,13 @@ public class ProcessService { private final int node; - public static ProcessService.ProcessId translateExternalIdBase(String id) { + public static ProcessId translateExternalIdBase(String id) { return switch (id) { - case "converter" -> ProcessService.ProcessId.CONVERTER; - case "crawler" -> ProcessService.ProcessId.CRAWLER; - case "loader" -> ProcessService.ProcessId.LOADER; - case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR; - case "index-constructor" -> ProcessService.ProcessId.INDEX_CONSTRUCTOR; + case "converter" -> ProcessId.CONVERTER; + case "crawler" -> ProcessId.CRAWLER; + case "loader" -> ProcessId.LOADER; + case "export-tasks" -> ProcessId.EXPORT_TASKS; + case "index-constructor" -> ProcessId.INDEX_CONSTRUCTOR; default -> null; }; } @@ -55,7 +54,7 @@ public class ProcessService { CONVERTER(ConverterMain.class), LOADER(LoaderMain.class), INDEX_CONSTRUCTOR(IndexConstructorMain.class), - ADJACENCIES_CALCULATOR(WebsiteAdjacenciesCalculator.class) + EXPORT_TASKS(ExportTasksMain.class), ; public final String mainClass; @@ -70,7 +69,7 @@ public class ProcessService { case CONVERTER -> "CONVERTER_PROCESS_OPTS"; case LOADER -> "LOADER_PROCESS_OPTS"; case INDEX_CONSTRUCTOR -> "INDEX_CONSTRUCTION_PROCESS_OPTS"; - case ADJACENCIES_CALCULATOR -> "ADJACENCIES_CALCULATOR_PROCESS_OPTS"; + case EXPORT_TASKS -> "EXPORT_TASKS_PROCESS_OPTS"; }; String value = System.getenv(variable); diff --git a/code/index/build.gradle b/code/index/build.gradle index ad1d1000..126de167 100644 --- a/code/index/build.gradle +++ b/code/index/build.gradle @@ -67,7 +67,6 @@ dependencies { testImplementation libs.bundles.junit testImplementation libs.mockito testImplementation libs.commons.lang3 - testImplementation project(':code:common:process') testImplementation project(':code:libraries:array') testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') diff --git a/code/index/index-forward/build.gradle b/code/index/index-forward/build.gradle index 946ef74b..53decc21 100644 --- a/code/index/index-forward/build.gradle +++ b/code/index/index-forward/build.gradle @@ -20,7 +20,7 @@ dependencies { implementation project(':code:index:query') implementation project(':code:index:index-journal') implementation project(':code:common:model') - implementation project(':code:common:process') + implementation project(':code:common:service') implementation project(':code:processes:converting-process:model') implementation libs.bundles.slf4j diff --git a/code/index/index-reverse/build.gradle b/code/index/index-reverse/build.gradle index bd0831ba..7e7504a4 100644 --- a/code/index/index-reverse/build.gradle +++ b/code/index/index-reverse/build.gradle @@ -21,8 +21,8 @@ dependencies { implementation project(':code:index:query') implementation project(':code:index:index-journal') implementation project(':code:common:model') + implementation project(':code:common:service') implementation project(':code:processes:converting-process:model') - implementation project(':code:common:process') implementation project(':third-party:parquet-floor') implementation project(':third-party:commons-codec') diff --git a/code/processes/converting-process/build.gradle b/code/processes/converting-process/build.gradle index 48c7a878..75987482 100644 --- a/code/processes/converting-process/build.gradle +++ b/code/processes/converting-process/build.gradle @@ -21,7 +21,6 @@ tasks.distZip.enabled = false apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:process') implementation project(':third-party:porterstemmer') implementation project(':third-party:count-min-sketch') diff --git a/code/processes/converting-process/ft-anchor-keywords/build.gradle b/code/processes/converting-process/ft-anchor-keywords/build.gradle index 7572cce0..dbf6b2cf 100644 --- a/code/processes/converting-process/ft-anchor-keywords/build.gradle +++ b/code/processes/converting-process/ft-anchor-keywords/build.gradle @@ -14,9 +14,9 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:common:config') + implementation project(':code:common:service') implementation project(':code:common:model') implementation project(':code:common:db') - implementation project(':code:common:process') implementation project(':code:processes:converting-process:ft-keyword-extraction') implementation project(':code:libraries:language-processing') implementation project(':code:libraries:term-frequency-dict') diff --git a/code/processes/converting-process/ft-anchor-keywords/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java b/code/processes/converting-process/ft-anchor-keywords/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java index aaed5ace..b0da1148 100644 --- a/code/processes/converting-process/ft-anchor-keywords/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java +++ b/code/processes/converting-process/ft-anchor-keywords/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java @@ -2,10 +2,10 @@ package nu.marginalia.atags.source; import com.google.inject.Inject; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.WmsaHome; import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.model.EdgeDomain; +import nu.marginalia.process.ProcessConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java index 97935d79..b3e3521f 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java @@ -4,8 +4,6 @@ import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.converting.model.CrawlPlan; import nu.marginalia.converting.model.WorkDir; import nu.marginalia.converting.processor.DomainProcessor; @@ -17,14 +15,14 @@ import nu.marginalia.converting.writer.ConverterWriter; import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.MqMessage; -import nu.marginalia.mq.inbox.MqInboxResponse; -import nu.marginalia.mq.inbox.MqSingleShotInbox; +import nu.marginalia.mqapi.converting.ConvertRequest; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.process.log.WorkLog; import nu.marginalia.process.log.WorkLogEntry; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.storage.FileStorageService; import nu.marginalia.util.SimpleBlockingThreadPool; @@ -34,13 +32,13 @@ import org.apache.logging.log4j.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -50,12 +48,9 @@ import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX; public class ConverterMain extends ProcessMainClass { private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class); private final DomainProcessor processor; - private final Gson gson; private final ProcessHeartbeat heartbeat; - private final MessageQueueFactory messageQueueFactory; private final FileStorageService fileStorageService; private final SideloadSourceFactory sideloadSourceFactory; - private final int node; public static void main(String... args) throws Exception { @@ -70,8 +65,9 @@ public class ConverterMain extends ProcessMainClass { logger.info("Starting pipe"); - converter - .fetchInstructions() + Instructions instructions = converter.fetchInstructions(ConvertRequest.class); + + converter.createAction(instructions) .execute(converter); logger.info("Finished"); @@ -83,6 +79,65 @@ public class ConverterMain extends ProcessMainClass { System.exit(0); } + private Action createAction(Instructions instructions) throws SQLException, IOException { + var request = instructions.value(); + final Path inputPath = request.getInputPath(); + + return switch (request.action) { + case ConvertCrawlData -> { + var crawlData = fileStorageService.getStorage(request.crawlStorage); + var processData = fileStorageService.getStorage(request.processedDataStorage); + + var plan = new CrawlPlan(null, + new WorkDir(crawlData.asPath().toString(), "crawler.log"), + new WorkDir(processData.asPath().toString(), "processor.log") + ); + + yield new ConvertCrawlDataAction(plan, instructions); + } + case SideloadEncyclopedia -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl), + processData.asPath(), + instructions); + } + case SideloadDirtree -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadDirtree(inputPath), + processData.asPath(), + instructions); + } + case SideloadWarc -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadWarc(inputPath), + processData.asPath(), + instructions); + } + case SideloadReddit -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadReddit(inputPath), + processData.asPath(), + instructions); + } + case SideloadStackexchange -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadStackexchange(inputPath), + processData.asPath(), + instructions); + } + }; + } + @Inject public ConverterMain( DomainProcessor processor, @@ -94,13 +149,12 @@ public class ConverterMain extends ProcessMainClass { ProcessConfiguration processConfiguration ) { + super(messageQueueFactory, processConfiguration, gson, CONVERTER_INBOX); + this.processor = processor; - this.gson = gson; this.heartbeat = heartbeat; - this.messageQueueFactory = messageQueueFactory; this.fileStorageService = fileStorageService; this.sideloadSourceFactory = sideloadSourceFactory; - this.node = processConfiguration.node(); heartbeat.start(); } @@ -220,45 +274,44 @@ public class ConverterMain extends ProcessMainClass { } } - private abstract static class ConvertRequest { - private final MqMessage message; - private final MqSingleShotInbox inbox; + private abstract static class Action { + final Instructions instructions; - private ConvertRequest(MqMessage message, MqSingleShotInbox inbox) { - this.message = message; - this.inbox = inbox; + public Action(Instructions instructions) { + this.instructions = instructions; } public abstract void execute(ConverterMain converterMain) throws Exception; public void ok() { - inbox.sendResponse(message, MqInboxResponse.ok()); + instructions.ok(); } public void err() { - inbox.sendResponse(message, MqInboxResponse.err()); + instructions.err(); } } - private static class SideloadAction extends ConvertRequest { + private static class SideloadAction extends Action { private final Collection sideloadSources; private final Path workDir; SideloadAction(SideloadSource sideloadSource, Path workDir, - MqMessage message, MqSingleShotInbox inbox) { - super(message, inbox); + Instructions instructions) { + super(instructions); this.sideloadSources = List.of(sideloadSource); this.workDir = workDir; } SideloadAction(Collection sideloadSources, Path workDir, - MqMessage message, MqSingleShotInbox inbox) { - super(message, inbox); + Instructions instructions) { + super(instructions); this.sideloadSources = sideloadSources; this.workDir = workDir; } + @Override public void execute(ConverterMain converterMain) throws Exception { try { @@ -272,11 +325,12 @@ public class ConverterMain extends ProcessMainClass { } } - private static class ConvertCrawlDataAction extends ConvertRequest { + private static class ConvertCrawlDataAction extends Action { private final CrawlPlan plan; - private ConvertCrawlDataAction(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) { - super(message, inbox); + private ConvertCrawlDataAction(CrawlPlan plan, + Instructions instructions) { + super(instructions); this.plan = plan; } @@ -294,94 +348,4 @@ public class ConverterMain extends ProcessMainClass { } } - - private ConvertRequest fetchInstructions() throws Exception { - - var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, node, UUID.randomUUID()); - - var msgOpt = getMessage(inbox, nu.marginalia.mqapi.converting.ConvertRequest.class.getSimpleName()); - var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); - - try { - var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class); - - // will be null on ConvertCrawlData - final Path inputPath = request.getInputPath(); - - return switch (request.action) { - case ConvertCrawlData -> { - var crawlData = fileStorageService.getStorage(request.crawlStorage); - var processData = fileStorageService.getStorage(request.processedDataStorage); - - var plan = new CrawlPlan(null, - new WorkDir(crawlData.asPath().toString(), "crawler.log"), - new WorkDir(processData.asPath().toString(), "processor.log") - ); - - yield new ConvertCrawlDataAction(plan, msg, inbox); - } - case SideloadEncyclopedia -> { - var processData = fileStorageService.getStorage(request.processedDataStorage); - - yield new SideloadAction( - sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl), - processData.asPath(), - msg, inbox); - } - case SideloadDirtree -> { - var processData = fileStorageService.getStorage(request.processedDataStorage); - - yield new SideloadAction( - sideloadSourceFactory.sideloadDirtree(inputPath), - processData.asPath(), - msg, inbox); - } - case SideloadWarc -> { - var processData = fileStorageService.getStorage(request.processedDataStorage); - - yield new SideloadAction( - sideloadSourceFactory.sideloadWarc(inputPath), - processData.asPath(), - msg, inbox); - } - case SideloadReddit -> { - var processData = fileStorageService.getStorage(request.processedDataStorage); - - yield new SideloadAction( - sideloadSourceFactory.sideloadReddit(inputPath), - processData.asPath(), - msg, inbox); - } - case SideloadStackexchange -> { - var processData = fileStorageService.getStorage(request.processedDataStorage); - - yield new SideloadAction( - sideloadSourceFactory.sideloadStackexchange(inputPath), - processData.asPath(), - msg, inbox); - } - }; - } - catch (Exception ex) { - inbox.sendResponse(msg, MqInboxResponse.err(ex.getClass().getSimpleName() + ": " + ex.getMessage())); - - throw ex; - } - } - - private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { - var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (opt.isPresent()) { - if (!opt.get().function().equals(expectedFunction)) { - throw new RuntimeException("Unexpected function: " + opt.get().function()); - } - return opt; - } - else { - var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); - stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); - return stolenMessage; - } - } - } diff --git a/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTestModule.java b/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTestModule.java index 83f28882..d14750ef 100644 --- a/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTestModule.java +++ b/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTestModule.java @@ -3,9 +3,9 @@ package nu.marginalia.converting; import com.google.inject.AbstractModule; import com.google.inject.name.Names; import nu.marginalia.LanguageModels; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.WmsaHome; import nu.marginalia.converting.processor.ConverterDomainTypes; +import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.service.module.ServiceConfiguration; import org.mockito.Mockito; diff --git a/code/processes/converting-process/test/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java b/code/processes/converting-process/test/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java index 9c40bcab..4f964db3 100644 --- a/code/processes/converting-process/test/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java +++ b/code/processes/converting-process/test/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java @@ -2,10 +2,10 @@ package nu.marginalia.converting.sideload.reddit; import com.google.inject.AbstractModule; import com.google.inject.Guice; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.converting.ConverterModule; import nu.marginalia.converting.processor.ConverterDomainTypes; import nu.marginalia.converting.sideload.SideloadSourceFactory; +import nu.marginalia.process.ProcessConfiguration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; diff --git a/code/processes/crawling-process/build.gradle b/code/processes/crawling-process/build.gradle index 2d34904f..e955f86c 100644 --- a/code/processes/crawling-process/build.gradle +++ b/code/processes/crawling-process/build.gradle @@ -21,7 +21,6 @@ tasks.distZip.enabled = false apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:process') implementation project(':code:common:db') implementation project(':code:common:model') diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java index ce002903..e7fbe4f9 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java @@ -5,8 +5,6 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; import nu.marginalia.atags.model.DomainLinks; @@ -25,15 +23,15 @@ import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.CrawlerOutputFile; import nu.marginalia.model.EdgeDomain; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.MqMessage; -import nu.marginalia.mq.inbox.MqInboxResponse; -import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.process.log.WorkLog; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.util.SimpleBlockingThreadPool; import okhttp3.ConnectionPool; import okhttp3.Dispatcher; @@ -46,8 +44,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.security.Security; -import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -59,14 +59,12 @@ public class CrawlerMain extends ProcessMainClass { private final UserAgent userAgent; private final ProcessHeartbeatImpl heartbeat; - private final MessageQueueFactory messageQueueFactory; private final DomainProber domainProber; private final FileStorageService fileStorageService; private final AnchorTagsSourceFactory anchorTagsSourceFactory; private final WarcArchiverFactory warcArchiverFactory; private final HikariDataSource dataSource; private final DomainBlacklist blacklist; - private final Gson gson; private final int node; private final SimpleBlockingThreadPool pool; @@ -96,16 +94,17 @@ public class CrawlerMain extends ProcessMainClass { HikariDataSource dataSource, DomainBlacklist blacklist, Gson gson) throws InterruptedException { + + super(messageQueueFactory, processConfiguration, gson, CRAWLER_INBOX); + this.userAgent = userAgent; this.heartbeat = heartbeat; - this.messageQueueFactory = messageQueueFactory; this.domainProber = domainProber; this.fileStorageService = fileStorageService; this.anchorTagsSourceFactory = anchorTagsSourceFactory; this.warcArchiverFactory = warcArchiverFactory; this.dataSource = dataSource; this.blacklist = blacklist; - this.gson = gson; this.node = processConfiguration.node(); pool = new SimpleBlockingThreadPool("CrawlerPool", @@ -144,13 +143,14 @@ public class CrawlerMain extends ProcessMainClass { ); var crawler = injector.getInstance(CrawlerMain.class); - var instructions = crawler.fetchInstructions(); + var instructions = crawler.fetchInstructions(nu.marginalia.mqapi.crawling.CrawlRequest.class); try { - if (instructions.targetDomainName != null) { - crawler.runForSingleDomain(instructions.targetDomainName, instructions.outputDir); + var req = instructions.value(); + if (req.targetDomainName != null) { + crawler.runForSingleDomain(req.targetDomainName, req.crawlStorage); } else { - crawler.runForDatabaseDomains(instructions.outputDir); + crawler.runForDatabaseDomains(req.crawlStorage); } instructions.ok(); } catch (Exception ex) { @@ -166,6 +166,10 @@ public class CrawlerMain extends ProcessMainClass { System.exit(0); } + public void runForDatabaseDomains(FileStorageId fileStorageId) throws Exception { + runForDatabaseDomains(fileStorageService.getStorage(fileStorageId).asPath()); + } + public void runForDatabaseDomains(Path outputDir) throws Exception { heartbeat.start(); @@ -285,6 +289,11 @@ public class CrawlerMain extends ProcessMainClass { } } + + public void runForSingleDomain(String targetDomainName, FileStorageId fileStorageId) throws Exception { + runForSingleDomain(targetDomainName, fileStorageService.getStorage(fileStorageId).asPath()); + } + public void runForSingleDomain(String targetDomainName, Path outputDir) throws Exception { heartbeat.start(); @@ -410,70 +419,6 @@ public class CrawlerMain extends ProcessMainClass { } - - - private static class CrawlRequest { - private final Path outputDir; - private final MqMessage message; - private final MqSingleShotInbox inbox; - - private final String targetDomainName; - - CrawlRequest(String targetDomainName, - Path outputDir, - MqMessage message, - MqSingleShotInbox inbox) - { - this.message = message; - this.inbox = inbox; - this.outputDir = outputDir; - this.targetDomainName = targetDomainName; - } - - - public void ok() { - inbox.sendResponse(message, MqInboxResponse.ok()); - } - public void err() { - inbox.sendResponse(message, MqInboxResponse.err()); - } - - } - - private CrawlRequest fetchInstructions() throws Exception { - - var inbox = messageQueueFactory.createSingleShotInbox(CRAWLER_INBOX, node, UUID.randomUUID()); - - logger.info("Waiting for instructions"); - - var msgOpt = getMessage(inbox, nu.marginalia.mqapi.crawling.CrawlRequest.class.getSimpleName()); - var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); - - var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class); - var crawlStorage = fileStorageService.getStorage(request.crawlStorage); - - return new CrawlRequest( - request.targetDomainName, - crawlStorage.asPath(), - msg, - inbox); - } - - private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { - var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (opt.isPresent()) { - if (!opt.get().function().equals(expectedFunction)) { - throw new RuntimeException("Unexpected function: " + opt.get().function()); - } - return opt; - } - else { - var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); - stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); - return stolenMessage; - } - } - public record CrawlSpecRecord(@NotNull String domain, int crawlDepth, @NotNull List urls) { public CrawlSpecRecord(String domain, int crawlDepth) { diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java b/code/processes/crawling-process/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java index c1a53718..33c08165 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java @@ -1,8 +1,8 @@ package nu.marginalia.crawl.warc; import com.google.inject.Inject; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.nodecfg.NodeConfigurationService; +import nu.marginalia.process.ProcessConfiguration; import org.apache.commons.io.IOUtils; import java.io.IOException; diff --git a/code/processes/crawling-process/model/build.gradle b/code/processes/crawling-process/model/build.gradle index 50103c41..bcac359e 100644 --- a/code/processes/crawling-process/model/build.gradle +++ b/code/processes/crawling-process/model/build.gradle @@ -20,7 +20,6 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:db') implementation project(':code:common:config') - implementation project(':code:common:process') implementation project(':code:index:api') implementation project(':code:processes:crawling-process:ft-content-type') implementation project(':code:libraries:language-processing') diff --git a/code/execution/data-extractors/build.gradle b/code/processes/export-task-process/build.gradle similarity index 57% rename from code/execution/data-extractors/build.gradle rename to code/processes/export-task-process/build.gradle index 2a0c08c6..11df9ee1 100644 --- a/code/execution/data-extractors/build.gradle +++ b/code/processes/export-task-process/build.gradle @@ -1,24 +1,33 @@ plugins { id 'java' - - id "de.undercouch.download" version "5.1.0" - + id 'application' id 'jvm-test-suite' } - java { toolchain { languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion)) } } +application { + mainClass = 'nu.marginalia.task.ExportTaskMain' + applicationName = 'export-task-process' +} + +tasks.distZip.enabled = false + apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:config') - implementation project(':code:common:process') implementation project(':code:common:model') + implementation project(':code:common:db') + implementation project(':code:common:service') + implementation project(':code:common:config') + implementation project(':code:libraries:message-queue') + + implementation project(':code:functions:link-graph:api') + implementation project(':code:processes:process-mq-api') implementation project(':code:libraries:language-processing') implementation project(':code:libraries:term-frequency-dict') implementation project(':code:libraries:blocking-thread-pool') @@ -28,20 +37,32 @@ dependencies { implementation project(':code:processes:converting-process') implementation project(':third-party:commons-codec') - implementation libs.bundles.slf4j + implementation libs.guava implementation dependencies.create(libs.guice.get()) { exclude group: 'com.google.guava' } + implementation libs.roaringbitmap implementation libs.trove + implementation libs.fastutil + implementation libs.bundles.mariadb + implementation libs.gson implementation libs.commons.lang3 + implementation libs.commons.io implementation libs.commons.compress - implementation libs.notnull + implementation libs.commons.codec implementation libs.jsoup + + testImplementation libs.bundles.slf4j.test testImplementation libs.bundles.junit testImplementation libs.mockito -} + testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') + testImplementation libs.commons.codec + testImplementation 'org.testcontainers:mariadb:1.17.4' + testImplementation 'org.testcontainers:junit-jupiter:1.17.4' + testImplementation project(':code:libraries:test-helpers') +} diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/AdjacenciesData.java b/code/processes/export-task-process/java/nu/marginalia/adjacencies/AdjacenciesData.java similarity index 100% rename from code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/AdjacenciesData.java rename to code/processes/export-task-process/java/nu/marginalia/adjacencies/AdjacenciesData.java diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/AdjacenciesLoader.java b/code/processes/export-task-process/java/nu/marginalia/adjacencies/AdjacenciesLoader.java similarity index 100% rename from code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/AdjacenciesLoader.java rename to code/processes/export-task-process/java/nu/marginalia/adjacencies/AdjacenciesLoader.java diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/DomainAliases.java b/code/processes/export-task-process/java/nu/marginalia/adjacencies/DomainAliases.java similarity index 100% rename from code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/DomainAliases.java rename to code/processes/export-task-process/java/nu/marginalia/adjacencies/DomainAliases.java diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/SparseBitVector.java b/code/processes/export-task-process/java/nu/marginalia/adjacencies/SparseBitVector.java similarity index 100% rename from code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/SparseBitVector.java rename to code/processes/export-task-process/java/nu/marginalia/adjacencies/SparseBitVector.java diff --git a/code/processes/export-task-process/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java b/code/processes/export-task-process/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java new file mode 100644 index 00000000..c6ce4e91 --- /dev/null +++ b/code/processes/export-task-process/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java @@ -0,0 +1,127 @@ +package nu.marginalia.adjacencies; + +import com.google.inject.Inject; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.api.linkgraph.AggregateLinkGraphClient; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.control.ProcessHeartbeatImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.IntStream; + +import static nu.marginalia.adjacencies.SparseBitVector.andCardinality; +import static nu.marginalia.adjacencies.SparseBitVector.weightedProduct; + +public class WebsiteAdjacenciesCalculator { + private final AggregateLinkGraphClient domainLinksClient; + private final ProcessConfiguration configuration; + private final HikariDataSource dataSource; + public AdjacenciesData adjacenciesData; + public DomainAliases domainAliases; + private static final Logger logger = LoggerFactory.getLogger(WebsiteAdjacenciesCalculator.class); + + float[] weights; + + @Inject + public WebsiteAdjacenciesCalculator(AggregateLinkGraphClient domainLinksClient, + ProcessConfiguration configuration, + HikariDataSource dataSource) { + this.domainLinksClient = domainLinksClient; + this.configuration = configuration; + this.dataSource = dataSource; + } + + public void export() throws Exception { + try (var processHeartbeat = new ProcessHeartbeatImpl(configuration, dataSource)) { + domainAliases = new DomainAliases(dataSource); + adjacenciesData = new AdjacenciesData(domainLinksClient, domainAliases); + weights = adjacenciesData.getWeights(); + + AdjacenciesLoader loader = new AdjacenciesLoader(dataSource); + var executor = Executors.newFixedThreadPool(16); + + int total = adjacenciesData.getIdsList().size(); + AtomicInteger progress = new AtomicInteger(0); + IntStream.of(adjacenciesData.getIdsList().toArray()).parallel() + .filter(domainAliases::isNotAliased) + .forEach(id -> { + findAdjacent(id, loader::load); + processHeartbeat.setProgress(progress.incrementAndGet() / (double) total); + }); + + executor.shutdown(); + System.out.println("Waiting for wrap-up"); + loader.stop(); + } + } + + public void findAdjacent(int domainId, Consumer andThen) { + findAdjacentDtoS(domainId, andThen); + } + + double cosineSimilarity(SparseBitVector a, SparseBitVector b) { + double andCardinality = andCardinality(a, b); + andCardinality /= Math.sqrt(a.getCardinality()); + andCardinality /= Math.sqrt(b.getCardinality()); + return andCardinality; + } + + double expensiveCosineSimilarity(SparseBitVector a, SparseBitVector b) { + return weightedProduct(weights, a, b) / Math.sqrt(a.mulAndSum(weights) * b.mulAndSum(weights)); + } + + public record DomainSimilarities(int domainId, List similarities) {} + + public record DomainSimilarity(int domainId, double value) {} + + private void findAdjacentDtoS(int domainId, Consumer andThen) { + var vector = adjacenciesData.getVector(domainId); + + if (vector == null || !vector.cardinalityExceeds(10)) { + return; + } + + List similarities = new ArrayList<>(1000); + + var items = adjacenciesData.getCandidates(vector); + + + int cardMin = Math.max(2, (int) (0.01 * vector.getCardinality())); + + items.forEach(id -> { + var otherVec = adjacenciesData.getVector(id); + + if (null == otherVec || otherVec == vector) + return true; + + if (otherVec.getCardinality() < cardMin) + return true; + + double similarity = cosineSimilarity(vector, otherVec); + if (similarity > 0.1) { + var recalculated = expensiveCosineSimilarity(vector, otherVec); + if (recalculated > 0.1) { + similarities.add(new DomainSimilarity(id, recalculated)); + } + } + + return true; + }); + + if (similarities.size() > 128) { + similarities.sort(Comparator.comparing(DomainSimilarity::value)); + similarities.subList(0, similarities.size() - 128).clear(); + } + + + andThen.accept(new DomainSimilarities(domainId, similarities)); + } + +} diff --git a/code/execution/data-extractors/java/nu/marginalia/extractor/AtagExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java similarity index 100% rename from code/execution/data-extractors/java/nu/marginalia/extractor/AtagExporter.java rename to code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java diff --git a/code/execution/data-extractors/java/nu/marginalia/extractor/ExporterIf.java b/code/processes/export-task-process/java/nu/marginalia/extractor/ExporterIf.java similarity index 100% rename from code/execution/data-extractors/java/nu/marginalia/extractor/ExporterIf.java rename to code/processes/export-task-process/java/nu/marginalia/extractor/ExporterIf.java diff --git a/code/execution/data-extractors/java/nu/marginalia/extractor/FeedExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java similarity index 100% rename from code/execution/data-extractors/java/nu/marginalia/extractor/FeedExporter.java rename to code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java diff --git a/code/execution/data-extractors/java/nu/marginalia/extractor/SampleDataExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/SampleDataExporter.java similarity index 100% rename from code/execution/data-extractors/java/nu/marginalia/extractor/SampleDataExporter.java rename to code/processes/export-task-process/java/nu/marginalia/extractor/SampleDataExporter.java diff --git a/code/execution/data-extractors/java/nu/marginalia/extractor/TermFrequencyExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java similarity index 100% rename from code/execution/data-extractors/java/nu/marginalia/extractor/TermFrequencyExporter.java rename to code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java diff --git a/code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java b/code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java new file mode 100644 index 00000000..c5d7ed12 --- /dev/null +++ b/code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java @@ -0,0 +1,82 @@ +package nu.marginalia.task; + +import com.google.gson.Gson; +import com.google.inject.Guice; +import com.google.inject.Inject; +import nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator; +import nu.marginalia.extractor.AtagExporter; +import nu.marginalia.extractor.FeedExporter; +import nu.marginalia.extractor.SampleDataExporter; +import nu.marginalia.extractor.TermFrequencyExporter; +import nu.marginalia.mq.MessageQueueFactory; +import nu.marginalia.mqapi.ProcessInboxNames; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; +import nu.marginalia.service.module.DatabaseModule; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExportTasksMain extends ProcessMainClass { + + private static final Logger logger = LoggerFactory.getLogger(ExportTasksMain.class); + + private final AtagExporter atagExporter; + private final FeedExporter feedExporter; + private final SampleDataExporter sampleDataExporter; + private final TermFrequencyExporter termFrequencyExporter; + private final WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator; + + public static void main(String[] args) throws Exception { + + var injector = Guice.createInjector( + new ServiceDiscoveryModule(), + new ProcessConfigurationModule("export-tasks"), + new DatabaseModule(false) + ); + + var exportTasks = injector.getInstance(ExportTasksMain.class); + + Instructions instructions = exportTasks.fetchInstructions(ExportTaskRequest.class); + try { + exportTasks.run(instructions.value()); + instructions.ok(); + } + catch (Exception e) { + logger.error("Error running export task", e); + instructions.err(); + } + + } + + @Inject + public ExportTasksMain(MessageQueueFactory messageQueueFactory, + ProcessConfiguration config, + AtagExporter atagExporter, + FeedExporter feedExporter, + SampleDataExporter sampleDataExporter, + TermFrequencyExporter termFrequencyExporter, + Gson gson, WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator) + { + super(messageQueueFactory, config, gson, ProcessInboxNames.EXPORT_TASK_INBOX); + this.atagExporter = atagExporter; + this.feedExporter = feedExporter; + this.sampleDataExporter = sampleDataExporter; + this.termFrequencyExporter = termFrequencyExporter; + this.websiteAdjacenciesCalculator = websiteAdjacenciesCalculator; + } + + private void run(ExportTaskRequest request) throws Exception { + switch (request.task) { + case ATAGS: atagExporter.export(request.crawlId, request.destId); break; + case FEEDS: feedExporter.export(request.crawlId, request.destId); break; + case TERM_FREQ: termFrequencyExporter.export(request.crawlId, request.destId); break; + case SAMPLE_DATA: sampleDataExporter.export(request.crawlId, request.destId, request.size, request.name); break; + case ADJACENCIES: websiteAdjacenciesCalculator.export(); break; + } + } + + +} diff --git a/code/processes/website-adjacencies-calculator/test/nu/marginalia/adjacencies/AdjacenciesLoaderTest.java b/code/processes/export-task-process/test/nu/marginalia/adjacencies/AdjacenciesLoaderTest.java similarity index 100% rename from code/processes/website-adjacencies-calculator/test/nu/marginalia/adjacencies/AdjacenciesLoaderTest.java rename to code/processes/export-task-process/test/nu/marginalia/adjacencies/AdjacenciesLoaderTest.java diff --git a/code/processes/website-adjacencies-calculator/test/nu/marginalia/adjacencies/SparseBitVectorTest.java b/code/processes/export-task-process/test/nu/marginalia/adjacencies/SparseBitVectorTest.java similarity index 100% rename from code/processes/website-adjacencies-calculator/test/nu/marginalia/adjacencies/SparseBitVectorTest.java rename to code/processes/export-task-process/test/nu/marginalia/adjacencies/SparseBitVectorTest.java diff --git a/code/processes/index-constructor-process/build.gradle b/code/processes/index-constructor-process/build.gradle index 6de7e773..04416bc6 100644 --- a/code/processes/index-constructor-process/build.gradle +++ b/code/processes/index-constructor-process/build.gradle @@ -22,7 +22,6 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:processes:process-mq-api') - implementation project(':code:common:process') implementation project(':code:common:service') implementation project(':code:common:db') implementation project(':code:common:config') diff --git a/code/processes/index-constructor-process/java/nu/marginalia/index/IndexConstructorMain.java b/code/processes/index-constructor-process/java/nu/marginalia/index/IndexConstructorMain.java index ef93b554..470b1694 100644 --- a/code/processes/index-constructor-process/java/nu/marginalia/index/IndexConstructorMain.java +++ b/code/processes/index-constructor-process/java/nu/marginalia/index/IndexConstructorMain.java @@ -1,11 +1,8 @@ package nu.marginalia.index; -import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import nu.marginalia.IndexLocations; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.index.construction.full.FullIndexConstructor; import nu.marginalia.index.construction.prio.PrioIndexConstructor; import nu.marginalia.index.domainrankings.DomainRankings; @@ -15,13 +12,11 @@ import nu.marginalia.index.journal.IndexJournal; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.MqMessage; -import nu.marginalia.mq.inbox.MqInboxResponse; -import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.mqapi.index.CreateIndexRequest; -import nu.marginalia.mqapi.index.IndexName; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeatImpl; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.storage.FileStorageService; import org.slf4j.Logger; @@ -31,8 +26,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; -import java.util.Optional; -import java.util.UUID; import java.util.concurrent.TimeUnit; import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX; @@ -40,15 +33,12 @@ import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX; public class IndexConstructorMain extends ProcessMainClass { private final FileStorageService fileStorageService; private final ProcessHeartbeatImpl heartbeat; - private final MessageQueueFactory messageQueueFactory; private final DomainRankings domainRankings; - private final int node; private static final Logger logger = LoggerFactory.getLogger(IndexConstructorMain.class); - private final Gson gson = GsonFactory.get(); - public static void main(String[] args) throws Exception { - CreateIndexInstructions instructions = null; + public static void main(String[] args) throws Exception { + Instructions instructions = null; try { new org.mariadb.jdbc.Driver(); @@ -58,9 +48,8 @@ public class IndexConstructorMain extends ProcessMainClass { new DatabaseModule(false)) .getInstance(IndexConstructorMain.class); - instructions = main.fetchInstructions(); - - main.run(instructions); + instructions = main.fetchInstructions(CreateIndexRequest.class); + main.run(instructions.value()); instructions.ok(); } catch (Exception ex) { @@ -85,17 +74,17 @@ public class IndexConstructorMain extends ProcessMainClass { ProcessConfiguration processConfiguration, DomainRankings domainRankings) { + super(messageQueueFactory, processConfiguration, GsonFactory.get(), INDEX_CONSTRUCTOR_INBOX); + this.fileStorageService = fileStorageService; this.heartbeat = heartbeat; - this.messageQueueFactory = messageQueueFactory; this.domainRankings = domainRankings; - this.node = processConfiguration.node(); } - private void run(CreateIndexInstructions instructions) throws SQLException, IOException { + private void run(CreateIndexRequest instructions) throws SQLException, IOException { heartbeat.start(); - switch (instructions.name) { + switch (instructions.indexName()) { case FORWARD -> createForwardIndex(); case REVERSE_FULL -> createFullReverseIndex(); case REVERSE_PRIO -> createPrioReverseIndex(); @@ -171,52 +160,4 @@ public class IndexConstructorMain extends ProcessMainClass { docId); } - private static class CreateIndexInstructions { - - public final IndexName name; - private final MqSingleShotInbox inbox; - private final MqMessage message; - - private CreateIndexInstructions(IndexName name, MqSingleShotInbox inbox, MqMessage message) { - this.name = name; - this.inbox = inbox; - this.message = message; - } - - public void ok() { - inbox.sendResponse(message, MqInboxResponse.ok()); - } - public void err() { - inbox.sendResponse(message, MqInboxResponse.err()); - } - } - - private CreateIndexInstructions fetchInstructions() throws Exception { - - var inbox = messageQueueFactory.createSingleShotInbox(INDEX_CONSTRUCTOR_INBOX, node, UUID.randomUUID()); - - logger.info("Waiting for instructions"); - var msgOpt = getMessage(inbox, CreateIndexRequest.class.getSimpleName()); - var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); - - var payload = gson.fromJson(msg.payload(), CreateIndexRequest.class); - var name = payload.indexName(); - - return new CreateIndexInstructions(name, inbox, msg); - } - - private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { - var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (opt.isPresent()) { - if (!opt.get().function().equals(expectedFunction)) { - throw new RuntimeException("Unexpected function: " + opt.get().function()); - } - return opt; - } - else { - var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); - stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); - return stolenMessage; - } - } } diff --git a/code/processes/live-crawling-process/build.gradle b/code/processes/live-crawling-process/build.gradle index b60fd863..5dad1177 100644 --- a/code/processes/live-crawling-process/build.gradle +++ b/code/processes/live-crawling-process/build.gradle @@ -21,7 +21,6 @@ tasks.distZip.enabled = false apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:process') implementation project(':code:common:db') implementation project(':code:common:model') diff --git a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java index 0c390641..edc90909 100644 --- a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java +++ b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java @@ -4,8 +4,6 @@ import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.WmsaHome; import nu.marginalia.api.feeds.FeedsClient; import nu.marginalia.converting.ConverterModule; @@ -21,12 +19,11 @@ import nu.marginalia.loading.domains.DbDomainIdRegistry; import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.model.EdgeDomain; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.MqMessage; -import nu.marginalia.mq.inbox.MqInboxResponse; -import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.mqapi.crawling.LiveCrawlRequest; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeat; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.ServiceDiscoveryModule; import nu.marginalia.storage.FileStorageService; @@ -38,11 +35,11 @@ import org.slf4j.LoggerFactory; import java.nio.file.Files; import java.nio.file.Path; import java.security.Security; -import java.sql.SQLException; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static nu.marginalia.mqapi.ProcessInboxNames.LIVE_CRAWLER_INBOX; @@ -52,16 +49,13 @@ public class LiveCrawlerMain extends ProcessMainClass { LoggerFactory.getLogger(LiveCrawlerMain.class); private final FeedsClient feedsClient; - private final Gson gson; private final ProcessHeartbeat heartbeat; private final DbDomainQueries domainQueries; private final DomainBlacklist domainBlacklist; - private final MessageQueueFactory messageQueueFactory; private final DomainProcessor domainProcessor; private final FileStorageService fileStorageService; private final KeywordLoaderService keywordLoaderService; private final DocumentLoaderService documentLoaderService; - private final int node; @Inject public LiveCrawlerMain(FeedsClient feedsClient, @@ -77,13 +71,12 @@ public class LiveCrawlerMain extends ProcessMainClass { DocumentLoaderService documentLoaderService) throws Exception { + super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX); + this.feedsClient = feedsClient; - this.gson = gson; this.heartbeat = heartbeat; this.domainQueries = domainQueries; this.domainBlacklist = domainBlacklist; - this.messageQueueFactory = messageQueueFactory; - this.node = config.node(); this.domainProcessor = domainProcessor; this.fileStorageService = fileStorageService; this.keywordLoaderService = keywordLoaderService; @@ -117,7 +110,7 @@ public class LiveCrawlerMain extends ProcessMainClass { ); var crawler = injector.getInstance(LiveCrawlerMain.class); - LiveCrawlInstructions instructions = crawler.fetchInstructions(); + Instructions instructions = crawler.fetchInstructions(LiveCrawlRequest.class); try{ crawler.run(); @@ -225,57 +218,4 @@ public class LiveCrawlerMain extends ProcessMainClass { } } - private LiveCrawlInstructions fetchInstructions() throws Exception { - - var inbox = messageQueueFactory.createSingleShotInbox(LIVE_CRAWLER_INBOX, node, UUID.randomUUID()); - - logger.info("Waiting for instructions"); - - var msgOpt = getMessage(inbox, LiveCrawlRequest.class.getSimpleName()); - var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); - - // for live crawl, request is empty for now - LiveCrawlRequest request = gson.fromJson(msg.payload(), LiveCrawlRequest.class); - - return new LiveCrawlInstructions(msg, inbox); - } - - - private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { - var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (opt.isPresent()) { - if (!opt.get().function().equals(expectedFunction)) { - throw new RuntimeException("Unexpected function: " + opt.get().function()); - } - return opt; - } - else { - var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); - stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); - return stolenMessage; - } - } - - - private static class LiveCrawlInstructions { - private final MqMessage message; - private final MqSingleShotInbox inbox; - - LiveCrawlInstructions(MqMessage message, - MqSingleShotInbox inbox) - { - this.message = message; - this.inbox = inbox; - } - - - public void ok() { - inbox.sendResponse(message, MqInboxResponse.ok()); - } - public void err() { - inbox.sendResponse(message, MqInboxResponse.err()); - } - - } - } diff --git a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerModule.java b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerModule.java index 6088fb5e..5dc779b0 100644 --- a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerModule.java +++ b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerModule.java @@ -6,10 +6,10 @@ import com.google.inject.Provides; import com.google.inject.Singleton; import com.google.inject.name.Names; import nu.marginalia.IndexLocations; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; import nu.marginalia.linkdb.docs.DocumentDbWriter; +import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.storage.FileStorageService; diff --git a/code/processes/loading-process/build.gradle b/code/processes/loading-process/build.gradle index 0d9d51c1..4a97812e 100644 --- a/code/processes/loading-process/build.gradle +++ b/code/processes/loading-process/build.gradle @@ -20,7 +20,6 @@ tasks.distZip.enabled = false apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:process') implementation project(':code:processes:process-mq-api') implementation project(':code:index:api') implementation project(':code:common:model') diff --git a/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java index 5b59d972..cc639711 100644 --- a/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java @@ -4,8 +4,6 @@ import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.linkdb.docs.DocumentDbWriter; import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService; @@ -13,26 +11,21 @@ import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.loading.domains.DomainLoaderService; import nu.marginalia.loading.links.DomainLinksLoaderService; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.MqMessage; -import nu.marginalia.mq.MqMessageState; -import nu.marginalia.mq.inbox.MqInboxResponse; -import nu.marginalia.mq.inbox.MqSingleShotInbox; +import nu.marginalia.mqapi.loading.LoadRequest; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeatImpl; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.storage.FileStorageService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.file.Path; -import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.Optional; -import java.util.UUID; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX; @@ -40,15 +33,12 @@ public class LoaderMain extends ProcessMainClass { private static final Logger logger = LoggerFactory.getLogger(LoaderMain.class); private final ProcessHeartbeatImpl heartbeat; - private final MessageQueueFactory messageQueueFactory; private final FileStorageService fileStorageService; private final DocumentDbWriter documentDbWriter; private final DomainLoaderService domainService; private final DomainLinksLoaderService linksService; private final KeywordLoaderService keywordLoaderService; private final DocumentLoaderService documentLoaderService; - private final int node; - private final Gson gson; public static void main(String... args) { try { @@ -62,7 +52,7 @@ public class LoaderMain extends ProcessMainClass { var instance = injector.getInstance(LoaderMain.class); - var instructions = instance.fetchInstructions(); + var instructions = instance.fetchInstructions(LoadRequest.class); logger.info("Instructions received"); instance.run(instructions); } @@ -83,22 +73,27 @@ public class LoaderMain extends ProcessMainClass { ProcessConfiguration processConfiguration, Gson gson ) { - this.node = processConfiguration.node(); + + super(messageQueueFactory, processConfiguration, gson, LOADER_INBOX); + this.heartbeat = heartbeat; - this.messageQueueFactory = messageQueueFactory; this.fileStorageService = fileStorageService; this.documentDbWriter = documentDbWriter; this.domainService = domainService; this.linksService = linksService; this.keywordLoaderService = keywordLoaderService; this.documentLoaderService = documentLoaderService; - this.gson = gson; heartbeat.start(); } - void run(LoadRequest instructions) throws Throwable { - LoaderInputData inputData = instructions.inputData; + void run(Instructions instructions) throws Throwable { + + List inputSources = new ArrayList<>(); + for (var storageId : instructions.value().inputProcessDataStorageIds) { + inputSources.add(fileStorageService.getStorage(storageId).asPath()); + } + var inputData = new LoaderInputData(inputSources); DomainIdRegistry domainIdRegistry = domainService.getOrCreateDomainIds(heartbeat, inputData); @@ -134,67 +129,5 @@ public class LoaderMain extends ProcessMainClass { System.exit(0); } - private static class LoadRequest { - private final LoaderInputData inputData; - private final MqMessage message; - private final MqSingleShotInbox inbox; - - LoadRequest(LoaderInputData inputData, MqMessage message, MqSingleShotInbox inbox) { - this.inputData = inputData; - this.message = message; - this.inbox = inbox; - } - - public void ok() { - inbox.sendResponse(message, MqInboxResponse.ok()); - } - public void err() { - inbox.sendResponse(message, MqInboxResponse.err()); - } - } - - private LoadRequest fetchInstructions() throws Exception { - - var inbox = messageQueueFactory.createSingleShotInbox(LOADER_INBOX, node, UUID.randomUUID()); - - var msgOpt = getMessage(inbox, nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName()); - if (msgOpt.isEmpty()) - throw new RuntimeException("No instruction received in inbox"); - var msg = msgOpt.get(); - - if (!nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName().equals(msg.function())) { - throw new RuntimeException("Unexpected message in inbox: " + msg); - } - - try { - var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.loading.LoadRequest.class); - - List inputSources = new ArrayList<>(); - for (var storageId : request.inputProcessDataStorageIds) { - inputSources.add(fileStorageService.getStorage(storageId).asPath()); - } - - return new LoadRequest(new LoaderInputData(inputSources), msg, inbox); - } - catch (Exception ex) { - inbox.sendResponse(msg, new MqInboxResponse("FAILED", MqMessageState.ERR)); - throw ex; - } - } - - private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { - var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (opt.isPresent()) { - if (!opt.get().function().equals(expectedFunction)) { - throw new RuntimeException("Unexpected function: " + opt.get().function()); - } - return opt; - } - else { - var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); - stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); - return stolenMessage; - } - } } diff --git a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java index ae3229ad..383fac01 100644 --- a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java +++ b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java @@ -3,11 +3,11 @@ package nu.marginalia.loading.domains; import com.google.inject.Inject; import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.loading.LoaderInputData; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.processed.SlopDomainLinkRecord; import nu.marginalia.model.processed.SlopDomainRecord; +import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.slop.SlopTable; diff --git a/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java b/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java index ab8e3b37..e5582aed 100644 --- a/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java +++ b/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java @@ -6,5 +6,7 @@ public class ProcessInboxNames { public static final String CRAWLER_INBOX = "crawler"; public static final String LIVE_CRAWLER_INBOX = "live-crawler"; + public static final String EXPORT_TASK_INBOX = "export-task"; + public static final String INDEX_CONSTRUCTOR_INBOX = "index_constructor"; } diff --git a/code/processes/process-mq-api/java/nu/marginalia/mqapi/tasks/ExportTaskRequest.java b/code/processes/process-mq-api/java/nu/marginalia/mqapi/tasks/ExportTaskRequest.java new file mode 100644 index 00000000..036f3a10 --- /dev/null +++ b/code/processes/process-mq-api/java/nu/marginalia/mqapi/tasks/ExportTaskRequest.java @@ -0,0 +1,57 @@ +package nu.marginalia.mqapi.tasks; + +import nu.marginalia.storage.model.FileStorageId; + +public class ExportTaskRequest { + public enum Task { + ATAGS, + FEEDS, + TERM_FREQ, + SAMPLE_DATA, + ADJACENCIES, + } + + public Task task; + public FileStorageId crawlId; + public FileStorageId destId; + public int size; + public String name; + + public ExportTaskRequest(Task task) { + this.task = task; + } + + public static ExportTaskRequest atags(FileStorageId crawlId, FileStorageId destId) { + ExportTaskRequest request = new ExportTaskRequest(Task.ATAGS); + request.crawlId = crawlId; + request.destId = destId; + return request; + } + + public static ExportTaskRequest feeds(FileStorageId crawlId, FileStorageId destId) { + ExportTaskRequest request = new ExportTaskRequest(Task.FEEDS); + request.crawlId = crawlId; + request.destId = destId; + return request; + } + + public static ExportTaskRequest termFreq(FileStorageId crawlId, FileStorageId destId) { + ExportTaskRequest request = new ExportTaskRequest(Task.TERM_FREQ); + request.crawlId = crawlId; + request.destId = destId; + return request; + } + + public static ExportTaskRequest sampleData(FileStorageId crawlId, FileStorageId destId, int size, String name) { + ExportTaskRequest request = new ExportTaskRequest(Task.SAMPLE_DATA); + request.crawlId = crawlId; + request.destId = destId; + request.size = size; + request.name = name; + return request; + } + + public static ExportTaskRequest adjacencies() { + return new ExportTaskRequest(Task.ADJACENCIES); + } +} diff --git a/code/processes/website-adjacencies-calculator/build.gradle b/code/processes/website-adjacencies-calculator/build.gradle deleted file mode 100644 index 37787b6a..00000000 --- a/code/processes/website-adjacencies-calculator/build.gradle +++ /dev/null @@ -1,49 +0,0 @@ -plugins { - id 'java' - - id 'application' - id 'jvm-test-suite' -} -java { - toolchain { - languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion)) - } -} - -application { - mainClass = 'nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator' - applicationName = 'website-adjacencies-calculator' -} - -tasks.distZip.enabled = false - -apply from: "$rootProject.projectDir/srcsets.gradle" - -dependencies { - implementation project(':code:common:model') - implementation project(':code:common:db') - implementation project(':code:common:process') - implementation project(':code:common:service') - implementation project(':code:functions:link-graph:api') - - implementation libs.bundles.slf4j - - implementation libs.guava - implementation dependencies.create(libs.guice.get()) { - exclude group: 'com.google.guava' - } - implementation libs.roaringbitmap - implementation libs.trove - implementation libs.fastutil - implementation libs.bundles.mariadb - - testImplementation libs.bundles.slf4j.test - testImplementation libs.bundles.junit - testImplementation libs.mockito - - testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') - testImplementation libs.commons.codec - testImplementation 'org.testcontainers:mariadb:1.17.4' - testImplementation 'org.testcontainers:junit-jupiter:1.17.4' - testImplementation project(':code:libraries:test-helpers') -} diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java b/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java deleted file mode 100644 index 1389cd63..00000000 --- a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java +++ /dev/null @@ -1,198 +0,0 @@ -package nu.marginalia.adjacencies; - -import com.google.inject.Guice; -import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.api.linkgraph.AggregateLinkGraphClient; -import nu.marginalia.db.DbDomainQueries; -import nu.marginalia.model.EdgeDomain; -import nu.marginalia.process.control.ProcessHeartbeat; -import nu.marginalia.process.control.ProcessHeartbeatImpl; -import nu.marginalia.service.ProcessMainClass; -import nu.marginalia.service.module.DatabaseModule; -import nu.marginalia.service.module.ServiceDiscoveryModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.stream.IntStream; - -import static nu.marginalia.adjacencies.SparseBitVector.andCardinality; -import static nu.marginalia.adjacencies.SparseBitVector.weightedProduct; - -public class WebsiteAdjacenciesCalculator extends ProcessMainClass { - private final HikariDataSource dataSource; - public AdjacenciesData adjacenciesData; - public DomainAliases domainAliases; - private static final Logger logger = LoggerFactory.getLogger(WebsiteAdjacenciesCalculator.class); - - float[] weights; - public WebsiteAdjacenciesCalculator(AggregateLinkGraphClient domainLinksClient, HikariDataSource dataSource) throws SQLException { - this.dataSource = dataSource; - - domainAliases = new DomainAliases(dataSource); - adjacenciesData = new AdjacenciesData(domainLinksClient, domainAliases); - weights = adjacenciesData.getWeights(); - } - - public void tryDomains(String... domainName) { - var dataStoreDao = new DbDomainQueries(dataSource); - - System.out.println(Arrays.toString(domainName)); - - int[] domainIds = Arrays.stream(domainName).map(EdgeDomain::new) - .mapToInt(dataStoreDao::getDomainId) - .map(domainAliases::deAlias) - .toArray(); - - for (int domainId : domainIds) { - findAdjacentDtoS(domainId, similarities -> { - for (var similarity : similarities.similarities()) { - System.out.println(dataStoreDao.getDomain(similarity.domainId).map(Object::toString).orElse("") + " " + prettyPercent(similarity.value)); - } - }); - } - } - - private String prettyPercent(double val) { - return String.format("%2.2f%%", 100. * val); - } - - public void loadAll(ProcessHeartbeat processHeartbeat) throws InterruptedException { - AdjacenciesLoader loader = new AdjacenciesLoader(dataSource); - var executor = Executors.newFixedThreadPool(16); - - int total = adjacenciesData.getIdsList().size(); - AtomicInteger progress = new AtomicInteger(0); - IntStream.of(adjacenciesData.getIdsList().toArray()).parallel() - .filter(domainAliases::isNotAliased) - .forEach(id -> { - findAdjacent(id, loader::load); - processHeartbeat.setProgress(progress.incrementAndGet() / (double) total); - }); - - executor.shutdown(); - System.out.println("Waiting for wrap-up"); - loader.stop(); - } - - public void findAdjacent(int domainId, Consumer andThen) { - findAdjacentDtoS(domainId, andThen); - } - - double cosineSimilarity(SparseBitVector a, SparseBitVector b) { - double andCardinality = andCardinality(a, b); - andCardinality /= Math.sqrt(a.getCardinality()); - andCardinality /= Math.sqrt(b.getCardinality()); - return andCardinality; - } - - double expensiveCosineSimilarity(SparseBitVector a, SparseBitVector b) { - return weightedProduct(weights, a, b) / Math.sqrt(a.mulAndSum(weights) * b.mulAndSum(weights)); - } - - public record DomainSimilarities(int domainId, List similarities) {} - - public record DomainSimilarity(int domainId, double value) {} - - private void findAdjacentDtoS(int domainId, Consumer andThen) { - var vector = adjacenciesData.getVector(domainId); - - if (vector == null || !vector.cardinalityExceeds(10)) { - return; - } - - List similarities = new ArrayList<>(1000); - - var items = adjacenciesData.getCandidates(vector); - - - int cardMin = Math.max(2, (int) (0.01 * vector.getCardinality())); - - items.forEach(id -> { - var otherVec = adjacenciesData.getVector(id); - - if (null == otherVec || otherVec == vector) - return true; - - if (otherVec.getCardinality() < cardMin) - return true; - - double similarity = cosineSimilarity(vector, otherVec); - if (similarity > 0.1) { - var recalculated = expensiveCosineSimilarity(vector, otherVec); - if (recalculated > 0.1) { - similarities.add(new DomainSimilarity(id, recalculated)); - } - } - - return true; - }); - - if (similarities.size() > 128) { - similarities.sort(Comparator.comparing(DomainSimilarity::value)); - similarities.subList(0, similarities.size() - 128).clear(); - } - - - andThen.accept(new DomainSimilarities(domainId, similarities)); - } - - - public static void main(String[] args) throws SQLException, InterruptedException { - var injector = Guice.createInjector( - new DatabaseModule(false), - new ServiceDiscoveryModule()); - - - var dataSource = injector.getInstance(HikariDataSource.class); - var lc = injector.getInstance(AggregateLinkGraphClient.class); - - if (!lc.waitReady(Duration.ofSeconds(30))) { - throw new IllegalStateException("Failed to connect to domain-links"); - } - - var main = new WebsiteAdjacenciesCalculator(lc, dataSource); - - if (args.length == 1 && "load".equals(args[0])) { - var processHeartbeat = new ProcessHeartbeatImpl( - new ProcessConfiguration("website-adjacencies-calculator", 0, UUID.randomUUID()), - dataSource - ); - - try { - processHeartbeat.start(); - main.loadAll(processHeartbeat); - } - catch (Exception ex) { - logger.error("Failed to load", ex); - } - finally { - processHeartbeat.shutDown(); - } - return; - } - - for (;;) { - String domains = System.console().readLine("> "); - - if (domains.isBlank()) - break; - - var parts = domains.split("\\s+,\\s+"); - try { - main.tryDomains(parts); - } - catch (Exception ex) { - ex.printStackTrace(); - } - } - - } - -} diff --git a/code/processes/website-adjacencies-calculator/readme.md b/code/processes/website-adjacencies-calculator/readme.md deleted file mode 100644 index 6a69a10a..00000000 --- a/code/processes/website-adjacencies-calculator/readme.md +++ /dev/null @@ -1,8 +0,0 @@ -# Website Adjacencies Calculator - -This job updates the website similarity table based on the data in the domain and links-tables in the URL database. - -It performs a brute force cosine similarity calculation across the entire link graph. - -These adjacencies power the [explorer service](../../services-application/explorer-service) and -[random websites](../../features-search/random-websites)-functionality. \ No newline at end of file diff --git a/code/services-application/explorer-service/readme.md b/code/services-application/explorer-service/readme.md index 567886d6..4f3204b1 100644 --- a/code/services-application/explorer-service/readme.md +++ b/code/services-application/explorer-service/readme.md @@ -8,4 +8,3 @@ Externally the service is available at [https://explore2.marginalia.nu/](https:/ * [features-search/screenshots](../../features-search/screenshots) * [features-search/random-websites](../../features-search/random-websites) -* [processes/website-adjacencies-calculator](../../processes/website-adjacencies-calculator) diff --git a/code/services-core/executor-service/build.gradle b/code/services-core/executor-service/build.gradle index 2e7934bc..dc18f2e1 100644 --- a/code/services-core/executor-service/build.gradle +++ b/code/services-core/executor-service/build.gradle @@ -25,7 +25,6 @@ dependencies { // These look weird but they're needed to be able to spawn the processes // from the executor service - implementation project(':code:processes:website-adjacencies-calculator') implementation project(':code:processes:crawling-process') implementation project(':code:processes:loading-process') implementation project(':code:processes:converting-process') @@ -33,7 +32,6 @@ dependencies { implementation project(':code:common:config') implementation project(':code:common:model') - implementation project(':code:common:process') implementation project(':code:common:db') implementation project(':code:common:linkdb') @@ -48,7 +46,6 @@ dependencies { implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:ft-link-parser') - implementation project(':code:execution:data-extractors') implementation project(':code:index:index-journal') implementation project(':code:index:api') implementation project(':code:processes:process-mq-api') diff --git a/code/services-core/index-service/build.gradle b/code/services-core/index-service/build.gradle index df3773f9..af3a0fdd 100644 --- a/code/services-core/index-service/build.gradle +++ b/code/services-core/index-service/build.gradle @@ -37,7 +37,6 @@ dependencies { implementation project(':code:index:api') testImplementation project(path: ':code:services-core:control-service') - testImplementation project(':code:common:process') implementation libs.bundles.slf4j diff --git a/code/tools/experiment-runner/build.gradle b/code/tools/experiment-runner/build.gradle index d011a973..0e26439f 100644 --- a/code/tools/experiment-runner/build.gradle +++ b/code/tools/experiment-runner/build.gradle @@ -27,7 +27,6 @@ dependencies { implementation project(':code:common:db') implementation project(':code:common:model') implementation project(':code:common:config') - implementation project(':code:common:process') implementation project(':code:common:service') implementation project(':code:libraries:language-processing') implementation project(':code:libraries:term-frequency-dict') diff --git a/code/tools/integration-test/build.gradle b/code/tools/integration-test/build.gradle index 81e3cde9..0e5d3cf0 100644 --- a/code/tools/integration-test/build.gradle +++ b/code/tools/integration-test/build.gradle @@ -34,7 +34,6 @@ dependencies { implementation project(':code:common:db') implementation project(':code:common:config') implementation project(':code:common:linkdb') - implementation project(':code:common:process') implementation project(':code:common:service') implementation project(':code:common:model') diff --git a/code/tools/integration-test/test/nu/marginalia/test/IntegrationTestModule.java b/code/tools/integration-test/test/nu/marginalia/test/IntegrationTestModule.java index 83f79fbf..2869a229 100644 --- a/code/tools/integration-test/test/nu/marginalia/test/IntegrationTestModule.java +++ b/code/tools/integration-test/test/nu/marginalia/test/IntegrationTestModule.java @@ -8,7 +8,6 @@ import com.google.inject.name.Names; import gnu.trove.list.array.TIntArrayList; import nu.marginalia.IndexLocations; import nu.marginalia.LanguageModels; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.WmsaHome; import nu.marginalia.db.DomainTypes; import nu.marginalia.index.domainrankings.DomainRankings; @@ -18,6 +17,7 @@ import nu.marginalia.index.searchset.SearchSetsService; import nu.marginalia.linkdb.docs.DocumentDbReader; import nu.marginalia.linkdb.docs.DocumentDbWriter; import nu.marginalia.linkgraph.io.DomainLinksWriter; +import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.control.FakeProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.service.ServiceId; diff --git a/settings.gradle b/settings.gradle index 6eda9ab0..1ed535be 100644 --- a/settings.gradle +++ b/settings.gradle @@ -59,7 +59,6 @@ include 'code:features-search:screenshots' include 'code:features-search:random-websites' include 'code:processes:converting-process:ft-anchor-keywords' -include 'code:execution:data-extractors' include 'code:processes:crawling-process:ft-crawl-blocklist' include 'code:processes:crawling-process:ft-link-parser' @@ -74,7 +73,6 @@ include 'code:common:service' include 'code:common:config' include 'code:common:model' include 'code:common:renderer' -include 'code:common:process' include 'code:processes:converting-process' include 'code:processes:converting-process:model' @@ -86,7 +84,8 @@ include 'code:processes:crawling-process:model' include 'code:processes:loading-process' include 'code:processes:index-constructor-process' include 'code:processes:test-data' -include 'code:processes:website-adjacencies-calculator' + +include 'code:processes:export-task-process' include 'code:tools:experiment-runner' include 'code:tools:screenshot-capture-tool'