diff --git a/code/common/config/test/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java b/code/common/config/test/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java index 63a18c93..a7bab69c 100644 --- a/code/common/config/test/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java +++ b/code/common/config/test/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java @@ -2,6 +2,7 @@ package nu.marginalia.nodecfg; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.nodecfg.model.NodeProfile; import nu.marginalia.test.TestMigrationLoader; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; @@ -46,8 +47,8 @@ public class NodeConfigurationServiceTest { @Test public void test() throws SQLException { - var a = nodeConfigurationService.create(1, "Test", false, false); - var b = nodeConfigurationService.create(2, "Foo", true, false); + var a = nodeConfigurationService.create(1, "Test", false, false, NodeProfile.MIXED); + var b = nodeConfigurationService.create(2, "Foo", true, false, NodeProfile.MIXED); assertEquals(1, a.node()); assertEquals("Test", a.description()); diff --git a/code/common/process/build.gradle b/code/common/process/build.gradle deleted file mode 100644 index 51289ed4..00000000 --- a/code/common/process/build.gradle +++ /dev/null @@ -1,34 +0,0 @@ -plugins { - id 'java' - - id 'jvm-test-suite' -} - -java { - toolchain { - languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion)) - } -} - -apply from: "$rootProject.projectDir/srcsets.gradle" - -dependencies { - implementation libs.notnull - - implementation libs.bundles.slf4j - testImplementation libs.bundles.slf4j.test - - implementation libs.guava - implementation libs.guava - implementation dependencies.create(libs.guice.get()) { - exclude group: 'com.google.guava' - } - implementation libs.bundles.mariadb - implementation libs.commons.lang3 - - implementation libs.snakeyaml - - testImplementation libs.bundles.slf4j.test - testImplementation libs.bundles.junit - testImplementation libs.mockito -} diff --git a/code/common/process/readme.md b/code/common/process/readme.md deleted file mode 100644 index 989a6193..00000000 --- a/code/common/process/readme.md +++ /dev/null @@ -1,4 +0,0 @@ -# Process - -Basic functionality for a Process. Processes must include this dependency to ensure -their loggers are configured properly! \ No newline at end of file diff --git a/code/common/process/resources/log4j2.properties b/code/common/process/resources/log4j2.properties deleted file mode 100644 index 18eaf147..00000000 --- a/code/common/process/resources/log4j2.properties +++ /dev/null @@ -1,9 +0,0 @@ -log4j2.isThreadContextMapInheritable=true -status = info -appender.console.type = Console -appender.console.name = LogToConsole -appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %highlight{%-5level}{FATAL=red, ERROR=red, WARN=yellow} %c{1}- %msg{nolookups}%n -appender.console.filter.http.type = MarkerFilter -rootLogger.level = info -rootLogger.appenderRef.console.ref = LogToConsole diff --git a/code/common/process/java/nu/marginalia/ProcessConfiguration.java b/code/common/service/java/nu/marginalia/process/ProcessConfiguration.java similarity index 78% rename from code/common/process/java/nu/marginalia/ProcessConfiguration.java rename to code/common/service/java/nu/marginalia/process/ProcessConfiguration.java index 35e1433f..45c3cf22 100644 --- a/code/common/process/java/nu/marginalia/ProcessConfiguration.java +++ b/code/common/service/java/nu/marginalia/process/ProcessConfiguration.java @@ -1,4 +1,4 @@ -package nu.marginalia; +package nu.marginalia.process; import java.util.UUID; diff --git a/code/common/process/java/nu/marginalia/ProcessConfigurationModule.java b/code/common/service/java/nu/marginalia/process/ProcessConfigurationModule.java similarity index 97% rename from code/common/process/java/nu/marginalia/ProcessConfigurationModule.java rename to code/common/service/java/nu/marginalia/process/ProcessConfigurationModule.java index 61214074..12784230 100644 --- a/code/common/process/java/nu/marginalia/ProcessConfigurationModule.java +++ b/code/common/service/java/nu/marginalia/process/ProcessConfigurationModule.java @@ -1,4 +1,4 @@ -package nu.marginalia; +package nu.marginalia.process; import com.google.inject.AbstractModule; import com.google.inject.name.Names; diff --git a/code/common/service/java/nu/marginalia/process/ProcessMainClass.java b/code/common/service/java/nu/marginalia/process/ProcessMainClass.java new file mode 100644 index 00000000..17758007 --- /dev/null +++ b/code/common/service/java/nu/marginalia/process/ProcessMainClass.java @@ -0,0 +1,102 @@ +package nu.marginalia.process; + +import com.google.gson.Gson; +import nu.marginalia.mq.MessageQueueFactory; +import nu.marginalia.mq.MqMessage; +import nu.marginalia.mq.inbox.MqInboxResponse; +import nu.marginalia.mq.inbox.MqSingleShotInbox; +import nu.marginalia.service.ConfigLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public abstract class ProcessMainClass { + private static final Logger logger = LoggerFactory.getLogger(ProcessMainClass.class); + + private final MessageQueueFactory messageQueueFactory; + private final int node; + private final String inboxName; + + static { + // Load global config ASAP + ConfigLoader.loadConfig( + ConfigLoader.getConfigPath("system") + ); + } + + private final Gson gson; + + public ProcessMainClass(MessageQueueFactory messageQueueFactory, + ProcessConfiguration config, + Gson gson, + String inboxName + ) { + this.gson = gson; + new org.mariadb.jdbc.Driver(); + this.messageQueueFactory = messageQueueFactory; + this.node = config.node(); + this.inboxName = inboxName; + } + + + protected Instructions fetchInstructions(Class requestType) throws Exception { + + var inbox = messageQueueFactory.createSingleShotInbox(inboxName, node, UUID.randomUUID()); + + logger.info("Waiting for instructions"); + + var msgOpt = getMessage(inbox, requestType.getSimpleName()); + var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); + + // for live crawl, request is empty for now + T request = gson.fromJson(msg.payload(), requestType); + + return new Instructions<>(msg, inbox, request); + } + + + private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws InterruptedException, SQLException { + var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); + if (opt.isPresent()) { + if (!opt.get().function().equals(expectedFunction)) { + throw new RuntimeException("Unexpected function: " + opt.get().function()); + } + return opt; + } + else { + var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); + stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); + return stolenMessage; + } + } + + + protected static class Instructions { + private final MqMessage message; + private final MqSingleShotInbox inbox; + private final T value; + Instructions(MqMessage message, MqSingleShotInbox inbox, T value) + { + this.message = message; + this.inbox = inbox; + this.value = value; + } + + public T value() { + return value; + } + + public void ok() { + inbox.sendResponse(message, MqInboxResponse.ok()); + } + public void err() { + inbox.sendResponse(message, MqInboxResponse.err()); + } + + } + +} diff --git a/code/common/process/java/nu/marginalia/process/control/FakeProcessHeartbeat.java b/code/common/service/java/nu/marginalia/process/control/FakeProcessHeartbeat.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/control/FakeProcessHeartbeat.java rename to code/common/service/java/nu/marginalia/process/control/FakeProcessHeartbeat.java diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java b/code/common/service/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java rename to code/common/service/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeat.java diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java b/code/common/service/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java similarity index 99% rename from code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java rename to code/common/service/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java index 1faf72f0..1069446c 100644 --- a/code/common/process/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java +++ b/code/common/service/java/nu/marginalia/process/control/ProcessAdHocTaskHeartbeatImpl.java @@ -2,7 +2,7 @@ package nu.marginalia.process.control; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; +import nu.marginalia.process.ProcessConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessHeartbeat.java b/code/common/service/java/nu/marginalia/process/control/ProcessHeartbeat.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/control/ProcessHeartbeat.java rename to code/common/service/java/nu/marginalia/process/control/ProcessHeartbeat.java diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java b/code/common/service/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java similarity index 96% rename from code/common/process/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java rename to code/common/service/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java index 6f85d890..3e4bd948 100644 --- a/code/common/process/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java +++ b/code/common/service/java/nu/marginalia/process/control/ProcessHeartbeatImpl.java @@ -4,17 +4,18 @@ package nu.marginalia.process.control; import com.google.inject.Inject; import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; +import nu.marginalia.process.ProcessConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.sql.SQLException; import java.util.concurrent.TimeUnit; /** This service sends a heartbeat to the database every 5 seconds. */ @Singleton -public class ProcessHeartbeatImpl implements ProcessHeartbeat { +public class ProcessHeartbeatImpl implements ProcessHeartbeat, Closeable { private final Logger logger = LoggerFactory.getLogger(ProcessHeartbeatImpl.class); private final String processName; private final String processBase; @@ -169,5 +170,9 @@ public class ProcessHeartbeatImpl implements ProcessHeartbeat { } } } + + public void close() { + shutDown(); + } } diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java b/code/common/service/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java rename to code/common/service/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java diff --git a/code/common/process/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java b/code/common/service/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java similarity index 99% rename from code/common/process/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java rename to code/common/service/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java index f8d68b7e..2ea3da52 100644 --- a/code/common/process/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java +++ b/code/common/service/java/nu/marginalia/process/control/ProcessTaskHeartbeatImpl.java @@ -2,7 +2,7 @@ package nu.marginalia.process.control; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; +import nu.marginalia.process.ProcessConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/common/process/java/nu/marginalia/process/log/WorkLoadIterable.java b/code/common/service/java/nu/marginalia/process/log/WorkLoadIterable.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/log/WorkLoadIterable.java rename to code/common/service/java/nu/marginalia/process/log/WorkLoadIterable.java diff --git a/code/common/process/java/nu/marginalia/process/log/WorkLog.java b/code/common/service/java/nu/marginalia/process/log/WorkLog.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/log/WorkLog.java rename to code/common/service/java/nu/marginalia/process/log/WorkLog.java diff --git a/code/common/process/java/nu/marginalia/process/log/WorkLogEntry.java b/code/common/service/java/nu/marginalia/process/log/WorkLogEntry.java similarity index 100% rename from code/common/process/java/nu/marginalia/process/log/WorkLogEntry.java rename to code/common/service/java/nu/marginalia/process/log/WorkLogEntry.java diff --git a/code/common/service/java/nu/marginalia/service/ConfigLoader.java b/code/common/service/java/nu/marginalia/service/ConfigLoader.java index d5c9f627..b19fca65 100644 --- a/code/common/service/java/nu/marginalia/service/ConfigLoader.java +++ b/code/common/service/java/nu/marginalia/service/ConfigLoader.java @@ -9,11 +9,11 @@ import java.util.Properties; public class ConfigLoader { - static Path getConfigPath(String configName) { + public static Path getConfigPath(String configName) { return WmsaHome.getHomePath().resolve("conf/properties/" + configName + ".properties"); } - static void loadConfig(Path configPath) { + public static void loadConfig(Path configPath) { if (!Files.exists(configPath)) { System.err.println("No config file found at " + configPath); return; diff --git a/code/common/service/java/nu/marginalia/service/ProcessMainClass.java b/code/common/service/java/nu/marginalia/service/ProcessMainClass.java deleted file mode 100644 index 6f66b57d..00000000 --- a/code/common/service/java/nu/marginalia/service/ProcessMainClass.java +++ /dev/null @@ -1,20 +0,0 @@ -package nu.marginalia.service; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class ProcessMainClass { - private static final Logger logger = LoggerFactory.getLogger(ProcessMainClass.class); - - static { - // Load global config ASAP - ConfigLoader.loadConfig( - ConfigLoader.getConfigPath("system") - ); - } - - public ProcessMainClass() { - new org.mariadb.jdbc.Driver(); - } - -} diff --git a/code/common/process/test/nu/marginalia/process/log/WorkLogTest.java b/code/common/service/test/nu/marginalia/process/log/WorkLogTest.java similarity index 100% rename from code/common/process/test/nu/marginalia/process/log/WorkLogTest.java rename to code/common/service/test/nu/marginalia/process/log/WorkLogTest.java diff --git a/code/execution/build.gradle b/code/execution/build.gradle index 7dca6b96..641642db 100644 --- a/code/execution/build.gradle +++ b/code/execution/build.gradle @@ -15,7 +15,7 @@ dependencies { // These look weird but they're needed to be able to spawn the processes // from the executor service - implementation project(':code:processes:website-adjacencies-calculator') + implementation project(':code:processes:export-task-process') implementation project(':code:processes:crawling-process') implementation project(':code:processes:live-crawling-process') implementation project(':code:processes:loading-process') @@ -24,7 +24,6 @@ dependencies { implementation project(':code:common:config') implementation project(':code:common:model') - implementation project(':code:common:process') implementation project(':code:common:db') implementation project(':code:common:linkdb') @@ -43,7 +42,6 @@ dependencies { implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:ft-link-parser') - implementation project(':code:execution:data-extractors') implementation project(':code:index:index-journal') implementation project(':code:index:api') implementation project(':code:processes:process-mq-api') diff --git a/code/execution/data-extractors/readme.md b/code/execution/data-extractors/readme.md deleted file mode 100644 index ea318e9f..00000000 --- a/code/execution/data-extractors/readme.md +++ /dev/null @@ -1,7 +0,0 @@ -Contains converter-*like* extraction jobs that operate on crawled data to produce export files. - -## Important classes - -* [AtagExporter](java/nu/marginalia/extractor/AtagExporter.java) - extracts anchor texts from the crawled data. -* [FeedExporter](java/nu/marginalia/extractor/FeedExporter.java) - tries to find RSS/Atom feeds within the crawled data. -* [TermFrequencyExporter](java/nu/marginalia/extractor/TermFrequencyExporter.java) - exports the 'TF' part of TF-IDF. \ No newline at end of file diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActor.java b/code/execution/java/nu/marginalia/actor/ExecutorActor.java index bcb9d904..c4bd43fd 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActor.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActor.java @@ -10,6 +10,7 @@ public enum ExecutorActor { RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_CONVERTER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), + PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), EXPORT_SEGMENTATION_MODEL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java index e93b93b5..7cb3e91c 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java @@ -59,6 +59,7 @@ public class ExecutorActorControlService { ExportSampleDataActor exportSampleDataActor, ExportTermFreqActor exportTermFrequenciesActor, ExportSegmentationModelActor exportSegmentationModelActor, + ExportTaskMonitorActor exportTasksMonitorActor, DownloadSampleActor downloadSampleActor, ScrapeFeedsActor scrapeFeedsActor, ExecutorActorStateMachines stateMachines, @@ -83,6 +84,7 @@ public class ExecutorActorControlService { register(ExecutorActor.PROC_LOADER_SPAWNER, loaderMonitor); register(ExecutorActor.PROC_CRAWLER_SPAWNER, crawlerMonitorActor); register(ExecutorActor.PROC_LIVE_CRAWL_SPAWNER, liveCrawlerMonitorActor); + register(ExecutorActor.PROC_EXPORT_TASKS_SPAWNER, exportTasksMonitorActor); register(ExecutorActor.MONITOR_PROCESS_LIVENESS, processMonitorFSM); register(ExecutorActor.MONITOR_FILE_STORAGE, fileStorageMonitorActor); diff --git a/code/execution/java/nu/marginalia/actor/proc/ExportTaskMonitorActor.java b/code/execution/java/nu/marginalia/actor/proc/ExportTaskMonitorActor.java new file mode 100644 index 00000000..b07f80e8 --- /dev/null +++ b/code/execution/java/nu/marginalia/actor/proc/ExportTaskMonitorActor.java @@ -0,0 +1,29 @@ +package nu.marginalia.actor.proc; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqapi.ProcessInboxNames; +import nu.marginalia.process.ProcessService; +import nu.marginalia.service.module.ServiceConfiguration; + +@Singleton +public class ExportTaskMonitorActor extends AbstractProcessSpawnerActor { + + @Inject + public ExportTaskMonitorActor(Gson gson, + ServiceConfiguration configuration, + MqPersistence persistence, + ProcessService processService) { + super(gson, + configuration, + persistence, + processService, + ProcessInboxNames.EXPORT_TASK_INBOX, + ProcessService.ProcessId.EXPORT_TASKS); + } + + +} diff --git a/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java b/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java index 5323302b..e8b4f341 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportAtagsActor.java @@ -3,50 +3,68 @@ package nu.marginalia.actor.task; import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; -import nu.marginalia.extractor.AtagExporter; -import nu.marginalia.extractor.ExporterIf; -import nu.marginalia.storage.model.*; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.ProcessService; import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageId; +import nu.marginalia.storage.model.FileStorageState; +import nu.marginalia.storage.model.FileStorageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.LocalDateTime; @Singleton public class ExportAtagsActor extends RecordActorPrototype { private final FileStorageService storageService; - private final ExporterIf atagExporter; + private final ActorProcessWatcher processWatcher; + private final MqOutbox exportTasksOutbox; + private final Logger logger = LoggerFactory.getLogger(getClass()); public record Export(FileStorageId crawlId) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {} + public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { + public Run(FileStorageId crawlId, FileStorageId destId) { + this(crawlId, destId, -1); + } + } + @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { case Export(FileStorageId crawlId) -> { - var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atag-export", "Anchor Tags " + LocalDateTime.now()); + var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atags-export", "Atags " + LocalDateTime.now()); if (storage == null) yield new Error("Bad storage id"); yield new Run(crawlId, storage.id()); } - case Run(FileStorageId crawlId, FileStorageId destId) -> { + case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); - try { - atagExporter.export(crawlId, destId); - storageService.setFileStorageState(destId, FileStorageState.UNSET); - } - catch (Exception ex) { - storageService.setFileStorageState(destId, FileStorageState.DELETE); - yield new Error("Failed to export data"); - } - - yield new End(); + long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.atags(crawlId, destId)); + yield new Run(crawlId, destId, newMsgId); } + case Run(_, FileStorageId destId, long msgId) -> { + var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); + + if (rsp.state() != MqMessageState.OK) { + storageService.flagFileForDeletion(destId); + yield new Error("Exporter failed"); + } + else { + storageService.setFileStorageState(destId, FileStorageState.UNSET); + yield new End(); + } + } + default -> new Error(); }; } - @Override public String describe() { return "Export anchor tags from crawl data"; @@ -55,11 +73,13 @@ public class ExportAtagsActor extends RecordActorPrototype { @Inject public ExportAtagsActor(Gson gson, FileStorageService storageService, - AtagExporter atagExporter) + ProcessOutboxes processOutboxes, + ActorProcessWatcher processWatcher) { super(gson); + this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); this.storageService = storageService; - this.atagExporter = atagExporter; + this.processWatcher = processWatcher; } } diff --git a/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java b/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java index faaaf528..7a0d612c 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportFeedsActor.java @@ -5,8 +5,11 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; -import nu.marginalia.extractor.ExporterIf; -import nu.marginalia.extractor.FeedExporter; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.ProcessService; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageState; @@ -19,11 +22,17 @@ import java.time.LocalDateTime; @Singleton public class ExportFeedsActor extends RecordActorPrototype { private final FileStorageService storageService; + private final ActorProcessWatcher processWatcher; + private final MqOutbox exportTasksOutbox; private final Logger logger = LoggerFactory.getLogger(getClass()); - private final ExporterIf feedExporter; public record Export(FileStorageId crawlId) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {} + public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { + public Run(FileStorageId crawlId, FileStorageId destId) { + this(crawlId, destId, -1); + } + } + @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { @@ -33,20 +42,25 @@ public class ExportFeedsActor extends RecordActorPrototype { if (storage == null) yield new Error("Bad storage id"); yield new Run(crawlId, storage.id()); } - case Run(FileStorageId crawlId, FileStorageId destId) -> { + case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); - try { - feedExporter.export(crawlId, destId); - storageService.setFileStorageState(destId, FileStorageState.UNSET); - } - catch (Exception ex) { - storageService.setFileStorageState(destId, FileStorageState.DELETE); - yield new Error("Failed to export data"); - } - - yield new End(); + long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.feeds(crawlId, destId)); + yield new Run(crawlId, destId, newMsgId); } + case Run(_, FileStorageId destId, long msgId) -> { + var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); + + if (rsp.state() != MqMessageState.OK) { + storageService.flagFileForDeletion(destId); + yield new Error("Exporter failed"); + } + else { + storageService.setFileStorageState(destId, FileStorageState.UNSET); + yield new End(); + } + } + default -> new Error(); }; } @@ -60,11 +74,13 @@ public class ExportFeedsActor extends RecordActorPrototype { @Inject public ExportFeedsActor(Gson gson, FileStorageService storageService, - FeedExporter feedExporter) + ActorProcessWatcher processWatcher, + ProcessOutboxes outboxes) { super(gson); this.storageService = storageService; - this.feedExporter = feedExporter; + this.processWatcher = processWatcher; + this.exportTasksOutbox = outboxes.getExportTasksOutbox(); } } diff --git a/code/execution/java/nu/marginalia/actor/task/ExportSampleDataActor.java b/code/execution/java/nu/marginalia/actor/task/ExportSampleDataActor.java index e47a803e..fa63a523 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportSampleDataActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportSampleDataActor.java @@ -5,7 +5,11 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; -import nu.marginalia.extractor.SampleDataExporter; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.ProcessService; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageState; @@ -18,11 +22,17 @@ import java.time.LocalDateTime; @Singleton public class ExportSampleDataActor extends RecordActorPrototype { private final FileStorageService storageService; + private final ActorProcessWatcher processWatcher; + private final MqOutbox exportTasksOutbox; private final Logger logger = LoggerFactory.getLogger(getClass()); - private final SampleDataExporter dataExporter; public record Export(FileStorageId crawlId, int size, String name) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId, int size, String name) implements ActorStep {} + public record Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) implements ActorStep { + public Run(FileStorageId crawlId, FileStorageId destId, int size, String name) { + this(crawlId, destId, size, name, -1); + } + } + @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { @@ -35,28 +45,29 @@ public class ExportSampleDataActor extends RecordActorPrototype { if (storage == null) yield new Error("Bad storage id"); yield new Run(crawlId, storage.id(), size, name); } - case Run(FileStorageId crawlId, FileStorageId destId, int size, String name) -> { + case Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); - try { - dataExporter.export(crawlId, destId, size, name); - storageService.setFileStorageState(destId, FileStorageState.UNSET); - } - catch (Exception ex) { - storageService.setFileStorageState(destId, FileStorageState.DELETE); - - logger.error("Failed to export data", ex); - - yield new Error("Failed to export data"); - } - - yield new End(); + long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.sampleData(crawlId, destId, size, name)); + yield new Run(crawlId, destId, size, name, newMsgId); } + case Run(_, FileStorageId destId, _, _, long msgId) -> { + var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); + + if (rsp.state() != MqMessageState.OK) { + storageService.flagFileForDeletion(destId); + yield new Error("Exporter failed"); + } + else { + storageService.setFileStorageState(destId, FileStorageState.UNSET); + yield new End(); + } + } + default -> new Error(); }; } - @Override public String describe() { return "Export RSS/Atom feeds from crawl data"; @@ -65,11 +76,13 @@ public class ExportSampleDataActor extends RecordActorPrototype { @Inject public ExportSampleDataActor(Gson gson, FileStorageService storageService, - SampleDataExporter dataExporter) + ProcessOutboxes processOutboxes, + ActorProcessWatcher processWatcher) { super(gson); this.storageService = storageService; - this.dataExporter = dataExporter; + this.processWatcher = processWatcher; + this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); } } diff --git a/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java b/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java index a9191060..d4c50686 100644 --- a/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java +++ b/code/execution/java/nu/marginalia/actor/task/ExportTermFreqActor.java @@ -5,45 +5,62 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; -import nu.marginalia.extractor.ExporterIf; -import nu.marginalia.extractor.TermFrequencyExporter; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.ProcessService; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageState; import nu.marginalia.storage.model.FileStorageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.LocalDateTime; @Singleton public class ExportTermFreqActor extends RecordActorPrototype { private final FileStorageService storageService; - private final ExporterIf exporter; + private final ActorProcessWatcher processWatcher; + private final MqOutbox exportTasksOutbox; + private final Logger logger = LoggerFactory.getLogger(getClass()); + public record Export(FileStorageId crawlId) implements ActorStep {} - public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {} + public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep { + public Run(FileStorageId crawlId, FileStorageId destId) { + this(crawlId, destId, -1); + } + } @Override public ActorStep transition(ActorStep self) throws Exception { return switch(self) { case Export(FileStorageId crawlId) -> { - var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq-export", "Term Frequencies " + LocalDateTime.now()); + var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq", "Term Frequencies " + LocalDateTime.now()); if (storage == null) yield new Error("Bad storage id"); yield new Run(crawlId, storage.id()); } - case Run(FileStorageId crawlId, FileStorageId destId) -> { + case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> { storageService.setFileStorageState(destId, FileStorageState.NEW); - try { - exporter.export(crawlId, destId); - storageService.setFileStorageState(destId, FileStorageState.UNSET); - } - catch (Exception ex) { - storageService.setFileStorageState(destId, FileStorageState.DELETE); - yield new Error("Failed to export data"); - } - - yield new End(); + long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.termFreq(crawlId, destId)); + yield new Run(crawlId, destId, newMsgId); } + case Run(_, FileStorageId destId, long msgId) -> { + var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); + + if (rsp.state() != MqMessageState.OK) { + storageService.flagFileForDeletion(destId); + yield new Error("Exporter failed"); + } + else { + storageService.setFileStorageState(destId, FileStorageState.UNSET); + yield new End(); + } + } + default -> new Error(); }; } @@ -57,11 +74,13 @@ public class ExportTermFreqActor extends RecordActorPrototype { @Inject public ExportTermFreqActor(Gson gson, FileStorageService storageService, - TermFrequencyExporter exporter) + ProcessOutboxes processOutboxes, + ActorProcessWatcher processWatcher) { super(gson); this.storageService = storageService; - this.exporter = exporter; + this.processWatcher = processWatcher; + this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); } } diff --git a/code/execution/java/nu/marginalia/actor/task/TriggerAdjacencyCalculationActor.java b/code/execution/java/nu/marginalia/actor/task/TriggerAdjacencyCalculationActor.java index f81aeb79..e4a46d84 100644 --- a/code/execution/java/nu/marginalia/actor/task/TriggerAdjacencyCalculationActor.java +++ b/code/execution/java/nu/marginalia/actor/task/TriggerAdjacencyCalculationActor.java @@ -5,53 +5,59 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - @Singleton public class TriggerAdjacencyCalculationActor extends RecordActorPrototype { + private final ActorProcessWatcher processWatcher; + private final MqOutbox exportTasksOutbox; private final Logger logger = LoggerFactory.getLogger(getClass()); - private final ProcessService processService; - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - public record Run() implements ActorStep {} + public record Run(long msgId) implements ActorStep { + public Run() { + this(-1); + } + } @Override public ActorStep transition(ActorStep self) throws Exception { - return switch (self) { - case Run() -> { - AtomicBoolean hasError = new AtomicBoolean(false); - var future = executor.submit(() -> { - try { - processService.trigger(ProcessService.ProcessId.ADJACENCIES_CALCULATOR, "load"); - } - catch (Exception ex) { - logger.warn("Error triggering adjacency calculation", ex); - hasError.set(true); - } - }); - future.get(); - - if (hasError.get()) { - yield new Error("Error triggering adjacency calculation"); - } - yield new End(); + return switch(self) { + case Run(long msgId) when msgId < 0 -> { + long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.adjacencies()); + yield new Run(newMsgId); } + case Run(long msgId) -> { + var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId); + + if (rsp.state() != MqMessageState.OK) { + yield new Error("Exporter failed"); + } + else { + yield new End(); + } + } + default -> new Error(); }; } + @Inject public TriggerAdjacencyCalculationActor(Gson gson, - ProcessService processService) { + ProcessOutboxes processOutboxes, + ActorProcessWatcher processWatcher) { super(gson); - this.processService = processService; + + this.exportTasksOutbox = processOutboxes.getExportTasksOutbox(); + this.processWatcher = processWatcher; + } @Override diff --git a/code/execution/java/nu/marginalia/process/ProcessOutboxes.java b/code/execution/java/nu/marginalia/process/ProcessOutboxes.java index 8600b86c..4f2e79d5 100644 --- a/code/execution/java/nu/marginalia/process/ProcessOutboxes.java +++ b/code/execution/java/nu/marginalia/process/ProcessOutboxes.java @@ -14,6 +14,7 @@ public class ProcessOutboxes { private final MqOutbox crawlerOutbox; private final MqOutbox indexConstructorOutbox; private final MqOutbox liveCrawlerOutbox; + private final MqOutbox exportTasksOutbox; @Inject public ProcessOutboxes(BaseServiceParams params, MqPersistence persistence) { @@ -53,6 +54,14 @@ public class ProcessOutboxes { params.configuration.node(), params.configuration.instanceUuid() ); + + exportTasksOutbox = new MqOutbox(persistence, + ProcessInboxNames.EXPORT_TASK_INBOX, + params.configuration.node(), + params.configuration.serviceName(), + params.configuration.node(), + params.configuration.instanceUuid() + ); } @@ -71,4 +80,6 @@ public class ProcessOutboxes { public MqOutbox getIndexConstructorOutbox() { return indexConstructorOutbox; } public MqOutbox getLiveCrawlerOutbox() { return liveCrawlerOutbox; } + + public MqOutbox getExportTasksOutbox() { return exportTasksOutbox; } } diff --git a/code/execution/java/nu/marginalia/process/ProcessService.java b/code/execution/java/nu/marginalia/process/ProcessService.java index 89707f94..f38651e1 100644 --- a/code/execution/java/nu/marginalia/process/ProcessService.java +++ b/code/execution/java/nu/marginalia/process/ProcessService.java @@ -3,15 +3,14 @@ package nu.marginalia.process; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.WmsaHome; -import nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator; import nu.marginalia.converting.ConverterMain; import nu.marginalia.crawl.CrawlerMain; import nu.marginalia.index.IndexConstructorMain; import nu.marginalia.livecrawler.LiveCrawlerMain; import nu.marginalia.loading.LoaderMain; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.BaseServiceParams; +import nu.marginalia.task.ExportTasksMain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -38,13 +37,13 @@ public class ProcessService { private final int node; - public static ProcessService.ProcessId translateExternalIdBase(String id) { + public static ProcessId translateExternalIdBase(String id) { return switch (id) { - case "converter" -> ProcessService.ProcessId.CONVERTER; - case "crawler" -> ProcessService.ProcessId.CRAWLER; - case "loader" -> ProcessService.ProcessId.LOADER; - case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR; - case "index-constructor" -> ProcessService.ProcessId.INDEX_CONSTRUCTOR; + case "converter" -> ProcessId.CONVERTER; + case "crawler" -> ProcessId.CRAWLER; + case "loader" -> ProcessId.LOADER; + case "export-tasks" -> ProcessId.EXPORT_TASKS; + case "index-constructor" -> ProcessId.INDEX_CONSTRUCTOR; default -> null; }; } @@ -55,7 +54,7 @@ public class ProcessService { CONVERTER(ConverterMain.class), LOADER(LoaderMain.class), INDEX_CONSTRUCTOR(IndexConstructorMain.class), - ADJACENCIES_CALCULATOR(WebsiteAdjacenciesCalculator.class) + EXPORT_TASKS(ExportTasksMain.class), ; public final String mainClass; @@ -70,7 +69,7 @@ public class ProcessService { case CONVERTER -> "CONVERTER_PROCESS_OPTS"; case LOADER -> "LOADER_PROCESS_OPTS"; case INDEX_CONSTRUCTOR -> "INDEX_CONSTRUCTION_PROCESS_OPTS"; - case ADJACENCIES_CALCULATOR -> "ADJACENCIES_CALCULATOR_PROCESS_OPTS"; + case EXPORT_TASKS -> "EXPORT_TASKS_PROCESS_OPTS"; }; String value = System.getenv(variable); diff --git a/code/index/build.gradle b/code/index/build.gradle index ad1d1000..126de167 100644 --- a/code/index/build.gradle +++ b/code/index/build.gradle @@ -67,7 +67,6 @@ dependencies { testImplementation libs.bundles.junit testImplementation libs.mockito testImplementation libs.commons.lang3 - testImplementation project(':code:common:process') testImplementation project(':code:libraries:array') testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') diff --git a/code/index/index-forward/build.gradle b/code/index/index-forward/build.gradle index 946ef74b..53decc21 100644 --- a/code/index/index-forward/build.gradle +++ b/code/index/index-forward/build.gradle @@ -20,7 +20,7 @@ dependencies { implementation project(':code:index:query') implementation project(':code:index:index-journal') implementation project(':code:common:model') - implementation project(':code:common:process') + implementation project(':code:common:service') implementation project(':code:processes:converting-process:model') implementation libs.bundles.slf4j diff --git a/code/index/index-reverse/build.gradle b/code/index/index-reverse/build.gradle index bd0831ba..7e7504a4 100644 --- a/code/index/index-reverse/build.gradle +++ b/code/index/index-reverse/build.gradle @@ -21,8 +21,8 @@ dependencies { implementation project(':code:index:query') implementation project(':code:index:index-journal') implementation project(':code:common:model') + implementation project(':code:common:service') implementation project(':code:processes:converting-process:model') - implementation project(':code:common:process') implementation project(':third-party:parquet-floor') implementation project(':third-party:commons-codec') diff --git a/code/processes/converting-process/build.gradle b/code/processes/converting-process/build.gradle index 48c7a878..75987482 100644 --- a/code/processes/converting-process/build.gradle +++ b/code/processes/converting-process/build.gradle @@ -21,7 +21,6 @@ tasks.distZip.enabled = false apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:process') implementation project(':third-party:porterstemmer') implementation project(':third-party:count-min-sketch') diff --git a/code/processes/converting-process/ft-anchor-keywords/build.gradle b/code/processes/converting-process/ft-anchor-keywords/build.gradle index 7572cce0..dbf6b2cf 100644 --- a/code/processes/converting-process/ft-anchor-keywords/build.gradle +++ b/code/processes/converting-process/ft-anchor-keywords/build.gradle @@ -14,9 +14,9 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:common:config') + implementation project(':code:common:service') implementation project(':code:common:model') implementation project(':code:common:db') - implementation project(':code:common:process') implementation project(':code:processes:converting-process:ft-keyword-extraction') implementation project(':code:libraries:language-processing') implementation project(':code:libraries:term-frequency-dict') diff --git a/code/processes/converting-process/ft-anchor-keywords/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java b/code/processes/converting-process/ft-anchor-keywords/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java index aaed5ace..b0da1148 100644 --- a/code/processes/converting-process/ft-anchor-keywords/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java +++ b/code/processes/converting-process/ft-anchor-keywords/java/nu/marginalia/atags/source/AnchorTagsSourceFactory.java @@ -2,10 +2,10 @@ package nu.marginalia.atags.source; import com.google.inject.Inject; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.WmsaHome; import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.model.EdgeDomain; +import nu.marginalia.process.ProcessConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java index 97935d79..b3e3521f 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/ConverterMain.java @@ -4,8 +4,6 @@ import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.converting.model.CrawlPlan; import nu.marginalia.converting.model.WorkDir; import nu.marginalia.converting.processor.DomainProcessor; @@ -17,14 +15,14 @@ import nu.marginalia.converting.writer.ConverterWriter; import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.MqMessage; -import nu.marginalia.mq.inbox.MqInboxResponse; -import nu.marginalia.mq.inbox.MqSingleShotInbox; +import nu.marginalia.mqapi.converting.ConvertRequest; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.process.log.WorkLog; import nu.marginalia.process.log.WorkLogEntry; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.storage.FileStorageService; import nu.marginalia.util.SimpleBlockingThreadPool; @@ -34,13 +32,13 @@ import org.apache.logging.log4j.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -50,12 +48,9 @@ import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX; public class ConverterMain extends ProcessMainClass { private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class); private final DomainProcessor processor; - private final Gson gson; private final ProcessHeartbeat heartbeat; - private final MessageQueueFactory messageQueueFactory; private final FileStorageService fileStorageService; private final SideloadSourceFactory sideloadSourceFactory; - private final int node; public static void main(String... args) throws Exception { @@ -70,8 +65,9 @@ public class ConverterMain extends ProcessMainClass { logger.info("Starting pipe"); - converter - .fetchInstructions() + Instructions instructions = converter.fetchInstructions(ConvertRequest.class); + + converter.createAction(instructions) .execute(converter); logger.info("Finished"); @@ -83,6 +79,65 @@ public class ConverterMain extends ProcessMainClass { System.exit(0); } + private Action createAction(Instructions instructions) throws SQLException, IOException { + var request = instructions.value(); + final Path inputPath = request.getInputPath(); + + return switch (request.action) { + case ConvertCrawlData -> { + var crawlData = fileStorageService.getStorage(request.crawlStorage); + var processData = fileStorageService.getStorage(request.processedDataStorage); + + var plan = new CrawlPlan(null, + new WorkDir(crawlData.asPath().toString(), "crawler.log"), + new WorkDir(processData.asPath().toString(), "processor.log") + ); + + yield new ConvertCrawlDataAction(plan, instructions); + } + case SideloadEncyclopedia -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl), + processData.asPath(), + instructions); + } + case SideloadDirtree -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadDirtree(inputPath), + processData.asPath(), + instructions); + } + case SideloadWarc -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadWarc(inputPath), + processData.asPath(), + instructions); + } + case SideloadReddit -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadReddit(inputPath), + processData.asPath(), + instructions); + } + case SideloadStackexchange -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadStackexchange(inputPath), + processData.asPath(), + instructions); + } + }; + } + @Inject public ConverterMain( DomainProcessor processor, @@ -94,13 +149,12 @@ public class ConverterMain extends ProcessMainClass { ProcessConfiguration processConfiguration ) { + super(messageQueueFactory, processConfiguration, gson, CONVERTER_INBOX); + this.processor = processor; - this.gson = gson; this.heartbeat = heartbeat; - this.messageQueueFactory = messageQueueFactory; this.fileStorageService = fileStorageService; this.sideloadSourceFactory = sideloadSourceFactory; - this.node = processConfiguration.node(); heartbeat.start(); } @@ -220,45 +274,44 @@ public class ConverterMain extends ProcessMainClass { } } - private abstract static class ConvertRequest { - private final MqMessage message; - private final MqSingleShotInbox inbox; + private abstract static class Action { + final Instructions instructions; - private ConvertRequest(MqMessage message, MqSingleShotInbox inbox) { - this.message = message; - this.inbox = inbox; + public Action(Instructions instructions) { + this.instructions = instructions; } public abstract void execute(ConverterMain converterMain) throws Exception; public void ok() { - inbox.sendResponse(message, MqInboxResponse.ok()); + instructions.ok(); } public void err() { - inbox.sendResponse(message, MqInboxResponse.err()); + instructions.err(); } } - private static class SideloadAction extends ConvertRequest { + private static class SideloadAction extends Action { private final Collection sideloadSources; private final Path workDir; SideloadAction(SideloadSource sideloadSource, Path workDir, - MqMessage message, MqSingleShotInbox inbox) { - super(message, inbox); + Instructions instructions) { + super(instructions); this.sideloadSources = List.of(sideloadSource); this.workDir = workDir; } SideloadAction(Collection sideloadSources, Path workDir, - MqMessage message, MqSingleShotInbox inbox) { - super(message, inbox); + Instructions instructions) { + super(instructions); this.sideloadSources = sideloadSources; this.workDir = workDir; } + @Override public void execute(ConverterMain converterMain) throws Exception { try { @@ -272,11 +325,12 @@ public class ConverterMain extends ProcessMainClass { } } - private static class ConvertCrawlDataAction extends ConvertRequest { + private static class ConvertCrawlDataAction extends Action { private final CrawlPlan plan; - private ConvertCrawlDataAction(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) { - super(message, inbox); + private ConvertCrawlDataAction(CrawlPlan plan, + Instructions instructions) { + super(instructions); this.plan = plan; } @@ -294,94 +348,4 @@ public class ConverterMain extends ProcessMainClass { } } - - private ConvertRequest fetchInstructions() throws Exception { - - var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, node, UUID.randomUUID()); - - var msgOpt = getMessage(inbox, nu.marginalia.mqapi.converting.ConvertRequest.class.getSimpleName()); - var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); - - try { - var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class); - - // will be null on ConvertCrawlData - final Path inputPath = request.getInputPath(); - - return switch (request.action) { - case ConvertCrawlData -> { - var crawlData = fileStorageService.getStorage(request.crawlStorage); - var processData = fileStorageService.getStorage(request.processedDataStorage); - - var plan = new CrawlPlan(null, - new WorkDir(crawlData.asPath().toString(), "crawler.log"), - new WorkDir(processData.asPath().toString(), "processor.log") - ); - - yield new ConvertCrawlDataAction(plan, msg, inbox); - } - case SideloadEncyclopedia -> { - var processData = fileStorageService.getStorage(request.processedDataStorage); - - yield new SideloadAction( - sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl), - processData.asPath(), - msg, inbox); - } - case SideloadDirtree -> { - var processData = fileStorageService.getStorage(request.processedDataStorage); - - yield new SideloadAction( - sideloadSourceFactory.sideloadDirtree(inputPath), - processData.asPath(), - msg, inbox); - } - case SideloadWarc -> { - var processData = fileStorageService.getStorage(request.processedDataStorage); - - yield new SideloadAction( - sideloadSourceFactory.sideloadWarc(inputPath), - processData.asPath(), - msg, inbox); - } - case SideloadReddit -> { - var processData = fileStorageService.getStorage(request.processedDataStorage); - - yield new SideloadAction( - sideloadSourceFactory.sideloadReddit(inputPath), - processData.asPath(), - msg, inbox); - } - case SideloadStackexchange -> { - var processData = fileStorageService.getStorage(request.processedDataStorage); - - yield new SideloadAction( - sideloadSourceFactory.sideloadStackexchange(inputPath), - processData.asPath(), - msg, inbox); - } - }; - } - catch (Exception ex) { - inbox.sendResponse(msg, MqInboxResponse.err(ex.getClass().getSimpleName() + ": " + ex.getMessage())); - - throw ex; - } - } - - private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { - var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (opt.isPresent()) { - if (!opt.get().function().equals(expectedFunction)) { - throw new RuntimeException("Unexpected function: " + opt.get().function()); - } - return opt; - } - else { - var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); - stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); - return stolenMessage; - } - } - } diff --git a/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTestModule.java b/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTestModule.java index 83f28882..d14750ef 100644 --- a/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTestModule.java +++ b/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTestModule.java @@ -3,9 +3,9 @@ package nu.marginalia.converting; import com.google.inject.AbstractModule; import com.google.inject.name.Names; import nu.marginalia.LanguageModels; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.WmsaHome; import nu.marginalia.converting.processor.ConverterDomainTypes; +import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.service.module.ServiceConfiguration; import org.mockito.Mockito; diff --git a/code/processes/converting-process/test/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java b/code/processes/converting-process/test/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java index 9c40bcab..4f964db3 100644 --- a/code/processes/converting-process/test/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java +++ b/code/processes/converting-process/test/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java @@ -2,10 +2,10 @@ package nu.marginalia.converting.sideload.reddit; import com.google.inject.AbstractModule; import com.google.inject.Guice; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.converting.ConverterModule; import nu.marginalia.converting.processor.ConverterDomainTypes; import nu.marginalia.converting.sideload.SideloadSourceFactory; +import nu.marginalia.process.ProcessConfiguration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; diff --git a/code/processes/crawling-process/build.gradle b/code/processes/crawling-process/build.gradle index 2d34904f..e955f86c 100644 --- a/code/processes/crawling-process/build.gradle +++ b/code/processes/crawling-process/build.gradle @@ -21,7 +21,6 @@ tasks.distZip.enabled = false apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:process') implementation project(':code:common:db') implementation project(':code:common:model') diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java index ce002903..e7fbe4f9 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java @@ -5,8 +5,6 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; import nu.marginalia.atags.model.DomainLinks; @@ -25,15 +23,15 @@ import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.CrawlerOutputFile; import nu.marginalia.model.EdgeDomain; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.MqMessage; -import nu.marginalia.mq.inbox.MqInboxResponse; -import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.process.log.WorkLog; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.util.SimpleBlockingThreadPool; import okhttp3.ConnectionPool; import okhttp3.Dispatcher; @@ -46,8 +44,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.security.Security; -import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -59,14 +59,12 @@ public class CrawlerMain extends ProcessMainClass { private final UserAgent userAgent; private final ProcessHeartbeatImpl heartbeat; - private final MessageQueueFactory messageQueueFactory; private final DomainProber domainProber; private final FileStorageService fileStorageService; private final AnchorTagsSourceFactory anchorTagsSourceFactory; private final WarcArchiverFactory warcArchiverFactory; private final HikariDataSource dataSource; private final DomainBlacklist blacklist; - private final Gson gson; private final int node; private final SimpleBlockingThreadPool pool; @@ -96,16 +94,17 @@ public class CrawlerMain extends ProcessMainClass { HikariDataSource dataSource, DomainBlacklist blacklist, Gson gson) throws InterruptedException { + + super(messageQueueFactory, processConfiguration, gson, CRAWLER_INBOX); + this.userAgent = userAgent; this.heartbeat = heartbeat; - this.messageQueueFactory = messageQueueFactory; this.domainProber = domainProber; this.fileStorageService = fileStorageService; this.anchorTagsSourceFactory = anchorTagsSourceFactory; this.warcArchiverFactory = warcArchiverFactory; this.dataSource = dataSource; this.blacklist = blacklist; - this.gson = gson; this.node = processConfiguration.node(); pool = new SimpleBlockingThreadPool("CrawlerPool", @@ -144,13 +143,14 @@ public class CrawlerMain extends ProcessMainClass { ); var crawler = injector.getInstance(CrawlerMain.class); - var instructions = crawler.fetchInstructions(); + var instructions = crawler.fetchInstructions(nu.marginalia.mqapi.crawling.CrawlRequest.class); try { - if (instructions.targetDomainName != null) { - crawler.runForSingleDomain(instructions.targetDomainName, instructions.outputDir); + var req = instructions.value(); + if (req.targetDomainName != null) { + crawler.runForSingleDomain(req.targetDomainName, req.crawlStorage); } else { - crawler.runForDatabaseDomains(instructions.outputDir); + crawler.runForDatabaseDomains(req.crawlStorage); } instructions.ok(); } catch (Exception ex) { @@ -166,6 +166,10 @@ public class CrawlerMain extends ProcessMainClass { System.exit(0); } + public void runForDatabaseDomains(FileStorageId fileStorageId) throws Exception { + runForDatabaseDomains(fileStorageService.getStorage(fileStorageId).asPath()); + } + public void runForDatabaseDomains(Path outputDir) throws Exception { heartbeat.start(); @@ -285,6 +289,11 @@ public class CrawlerMain extends ProcessMainClass { } } + + public void runForSingleDomain(String targetDomainName, FileStorageId fileStorageId) throws Exception { + runForSingleDomain(targetDomainName, fileStorageService.getStorage(fileStorageId).asPath()); + } + public void runForSingleDomain(String targetDomainName, Path outputDir) throws Exception { heartbeat.start(); @@ -410,70 +419,6 @@ public class CrawlerMain extends ProcessMainClass { } - - - private static class CrawlRequest { - private final Path outputDir; - private final MqMessage message; - private final MqSingleShotInbox inbox; - - private final String targetDomainName; - - CrawlRequest(String targetDomainName, - Path outputDir, - MqMessage message, - MqSingleShotInbox inbox) - { - this.message = message; - this.inbox = inbox; - this.outputDir = outputDir; - this.targetDomainName = targetDomainName; - } - - - public void ok() { - inbox.sendResponse(message, MqInboxResponse.ok()); - } - public void err() { - inbox.sendResponse(message, MqInboxResponse.err()); - } - - } - - private CrawlRequest fetchInstructions() throws Exception { - - var inbox = messageQueueFactory.createSingleShotInbox(CRAWLER_INBOX, node, UUID.randomUUID()); - - logger.info("Waiting for instructions"); - - var msgOpt = getMessage(inbox, nu.marginalia.mqapi.crawling.CrawlRequest.class.getSimpleName()); - var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); - - var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class); - var crawlStorage = fileStorageService.getStorage(request.crawlStorage); - - return new CrawlRequest( - request.targetDomainName, - crawlStorage.asPath(), - msg, - inbox); - } - - private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { - var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (opt.isPresent()) { - if (!opt.get().function().equals(expectedFunction)) { - throw new RuntimeException("Unexpected function: " + opt.get().function()); - } - return opt; - } - else { - var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); - stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); - return stolenMessage; - } - } - public record CrawlSpecRecord(@NotNull String domain, int crawlDepth, @NotNull List urls) { public CrawlSpecRecord(String domain, int crawlDepth) { diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java b/code/processes/crawling-process/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java index c1a53718..33c08165 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java @@ -1,8 +1,8 @@ package nu.marginalia.crawl.warc; import com.google.inject.Inject; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.nodecfg.NodeConfigurationService; +import nu.marginalia.process.ProcessConfiguration; import org.apache.commons.io.IOUtils; import java.io.IOException; diff --git a/code/processes/crawling-process/model/build.gradle b/code/processes/crawling-process/model/build.gradle index 50103c41..bcac359e 100644 --- a/code/processes/crawling-process/model/build.gradle +++ b/code/processes/crawling-process/model/build.gradle @@ -20,7 +20,6 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:db') implementation project(':code:common:config') - implementation project(':code:common:process') implementation project(':code:index:api') implementation project(':code:processes:crawling-process:ft-content-type') implementation project(':code:libraries:language-processing') diff --git a/code/execution/data-extractors/build.gradle b/code/processes/export-task-process/build.gradle similarity index 57% rename from code/execution/data-extractors/build.gradle rename to code/processes/export-task-process/build.gradle index 2a0c08c6..11df9ee1 100644 --- a/code/execution/data-extractors/build.gradle +++ b/code/processes/export-task-process/build.gradle @@ -1,24 +1,33 @@ plugins { id 'java' - - id "de.undercouch.download" version "5.1.0" - + id 'application' id 'jvm-test-suite' } - java { toolchain { languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion)) } } +application { + mainClass = 'nu.marginalia.task.ExportTaskMain' + applicationName = 'export-task-process' +} + +tasks.distZip.enabled = false + apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:config') - implementation project(':code:common:process') implementation project(':code:common:model') + implementation project(':code:common:db') + implementation project(':code:common:service') + implementation project(':code:common:config') + implementation project(':code:libraries:message-queue') + + implementation project(':code:functions:link-graph:api') + implementation project(':code:processes:process-mq-api') implementation project(':code:libraries:language-processing') implementation project(':code:libraries:term-frequency-dict') implementation project(':code:libraries:blocking-thread-pool') @@ -28,20 +37,32 @@ dependencies { implementation project(':code:processes:converting-process') implementation project(':third-party:commons-codec') - implementation libs.bundles.slf4j + implementation libs.guava implementation dependencies.create(libs.guice.get()) { exclude group: 'com.google.guava' } + implementation libs.roaringbitmap implementation libs.trove + implementation libs.fastutil + implementation libs.bundles.mariadb + implementation libs.gson implementation libs.commons.lang3 + implementation libs.commons.io implementation libs.commons.compress - implementation libs.notnull + implementation libs.commons.codec implementation libs.jsoup + + testImplementation libs.bundles.slf4j.test testImplementation libs.bundles.junit testImplementation libs.mockito -} + testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') + testImplementation libs.commons.codec + testImplementation 'org.testcontainers:mariadb:1.17.4' + testImplementation 'org.testcontainers:junit-jupiter:1.17.4' + testImplementation project(':code:libraries:test-helpers') +} diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/AdjacenciesData.java b/code/processes/export-task-process/java/nu/marginalia/adjacencies/AdjacenciesData.java similarity index 100% rename from code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/AdjacenciesData.java rename to code/processes/export-task-process/java/nu/marginalia/adjacencies/AdjacenciesData.java diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/AdjacenciesLoader.java b/code/processes/export-task-process/java/nu/marginalia/adjacencies/AdjacenciesLoader.java similarity index 100% rename from code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/AdjacenciesLoader.java rename to code/processes/export-task-process/java/nu/marginalia/adjacencies/AdjacenciesLoader.java diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/DomainAliases.java b/code/processes/export-task-process/java/nu/marginalia/adjacencies/DomainAliases.java similarity index 100% rename from code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/DomainAliases.java rename to code/processes/export-task-process/java/nu/marginalia/adjacencies/DomainAliases.java diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/SparseBitVector.java b/code/processes/export-task-process/java/nu/marginalia/adjacencies/SparseBitVector.java similarity index 100% rename from code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/SparseBitVector.java rename to code/processes/export-task-process/java/nu/marginalia/adjacencies/SparseBitVector.java diff --git a/code/processes/export-task-process/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java b/code/processes/export-task-process/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java new file mode 100644 index 00000000..c6ce4e91 --- /dev/null +++ b/code/processes/export-task-process/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java @@ -0,0 +1,127 @@ +package nu.marginalia.adjacencies; + +import com.google.inject.Inject; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.api.linkgraph.AggregateLinkGraphClient; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.control.ProcessHeartbeatImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.IntStream; + +import static nu.marginalia.adjacencies.SparseBitVector.andCardinality; +import static nu.marginalia.adjacencies.SparseBitVector.weightedProduct; + +public class WebsiteAdjacenciesCalculator { + private final AggregateLinkGraphClient domainLinksClient; + private final ProcessConfiguration configuration; + private final HikariDataSource dataSource; + public AdjacenciesData adjacenciesData; + public DomainAliases domainAliases; + private static final Logger logger = LoggerFactory.getLogger(WebsiteAdjacenciesCalculator.class); + + float[] weights; + + @Inject + public WebsiteAdjacenciesCalculator(AggregateLinkGraphClient domainLinksClient, + ProcessConfiguration configuration, + HikariDataSource dataSource) { + this.domainLinksClient = domainLinksClient; + this.configuration = configuration; + this.dataSource = dataSource; + } + + public void export() throws Exception { + try (var processHeartbeat = new ProcessHeartbeatImpl(configuration, dataSource)) { + domainAliases = new DomainAliases(dataSource); + adjacenciesData = new AdjacenciesData(domainLinksClient, domainAliases); + weights = adjacenciesData.getWeights(); + + AdjacenciesLoader loader = new AdjacenciesLoader(dataSource); + var executor = Executors.newFixedThreadPool(16); + + int total = adjacenciesData.getIdsList().size(); + AtomicInteger progress = new AtomicInteger(0); + IntStream.of(adjacenciesData.getIdsList().toArray()).parallel() + .filter(domainAliases::isNotAliased) + .forEach(id -> { + findAdjacent(id, loader::load); + processHeartbeat.setProgress(progress.incrementAndGet() / (double) total); + }); + + executor.shutdown(); + System.out.println("Waiting for wrap-up"); + loader.stop(); + } + } + + public void findAdjacent(int domainId, Consumer andThen) { + findAdjacentDtoS(domainId, andThen); + } + + double cosineSimilarity(SparseBitVector a, SparseBitVector b) { + double andCardinality = andCardinality(a, b); + andCardinality /= Math.sqrt(a.getCardinality()); + andCardinality /= Math.sqrt(b.getCardinality()); + return andCardinality; + } + + double expensiveCosineSimilarity(SparseBitVector a, SparseBitVector b) { + return weightedProduct(weights, a, b) / Math.sqrt(a.mulAndSum(weights) * b.mulAndSum(weights)); + } + + public record DomainSimilarities(int domainId, List similarities) {} + + public record DomainSimilarity(int domainId, double value) {} + + private void findAdjacentDtoS(int domainId, Consumer andThen) { + var vector = adjacenciesData.getVector(domainId); + + if (vector == null || !vector.cardinalityExceeds(10)) { + return; + } + + List similarities = new ArrayList<>(1000); + + var items = adjacenciesData.getCandidates(vector); + + + int cardMin = Math.max(2, (int) (0.01 * vector.getCardinality())); + + items.forEach(id -> { + var otherVec = adjacenciesData.getVector(id); + + if (null == otherVec || otherVec == vector) + return true; + + if (otherVec.getCardinality() < cardMin) + return true; + + double similarity = cosineSimilarity(vector, otherVec); + if (similarity > 0.1) { + var recalculated = expensiveCosineSimilarity(vector, otherVec); + if (recalculated > 0.1) { + similarities.add(new DomainSimilarity(id, recalculated)); + } + } + + return true; + }); + + if (similarities.size() > 128) { + similarities.sort(Comparator.comparing(DomainSimilarity::value)); + similarities.subList(0, similarities.size() - 128).clear(); + } + + + andThen.accept(new DomainSimilarities(domainId, similarities)); + } + +} diff --git a/code/execution/data-extractors/java/nu/marginalia/extractor/AtagExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java similarity index 100% rename from code/execution/data-extractors/java/nu/marginalia/extractor/AtagExporter.java rename to code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java diff --git a/code/execution/data-extractors/java/nu/marginalia/extractor/ExporterIf.java b/code/processes/export-task-process/java/nu/marginalia/extractor/ExporterIf.java similarity index 100% rename from code/execution/data-extractors/java/nu/marginalia/extractor/ExporterIf.java rename to code/processes/export-task-process/java/nu/marginalia/extractor/ExporterIf.java diff --git a/code/execution/data-extractors/java/nu/marginalia/extractor/FeedExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java similarity index 100% rename from code/execution/data-extractors/java/nu/marginalia/extractor/FeedExporter.java rename to code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java diff --git a/code/execution/data-extractors/java/nu/marginalia/extractor/SampleDataExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/SampleDataExporter.java similarity index 100% rename from code/execution/data-extractors/java/nu/marginalia/extractor/SampleDataExporter.java rename to code/processes/export-task-process/java/nu/marginalia/extractor/SampleDataExporter.java diff --git a/code/execution/data-extractors/java/nu/marginalia/extractor/TermFrequencyExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java similarity index 100% rename from code/execution/data-extractors/java/nu/marginalia/extractor/TermFrequencyExporter.java rename to code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java diff --git a/code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java b/code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java new file mode 100644 index 00000000..c5d7ed12 --- /dev/null +++ b/code/processes/export-task-process/java/nu/marginalia/task/ExportTasksMain.java @@ -0,0 +1,82 @@ +package nu.marginalia.task; + +import com.google.gson.Gson; +import com.google.inject.Guice; +import com.google.inject.Inject; +import nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator; +import nu.marginalia.extractor.AtagExporter; +import nu.marginalia.extractor.FeedExporter; +import nu.marginalia.extractor.SampleDataExporter; +import nu.marginalia.extractor.TermFrequencyExporter; +import nu.marginalia.mq.MessageQueueFactory; +import nu.marginalia.mqapi.ProcessInboxNames; +import nu.marginalia.mqapi.tasks.ExportTaskRequest; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; +import nu.marginalia.service.module.DatabaseModule; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExportTasksMain extends ProcessMainClass { + + private static final Logger logger = LoggerFactory.getLogger(ExportTasksMain.class); + + private final AtagExporter atagExporter; + private final FeedExporter feedExporter; + private final SampleDataExporter sampleDataExporter; + private final TermFrequencyExporter termFrequencyExporter; + private final WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator; + + public static void main(String[] args) throws Exception { + + var injector = Guice.createInjector( + new ServiceDiscoveryModule(), + new ProcessConfigurationModule("export-tasks"), + new DatabaseModule(false) + ); + + var exportTasks = injector.getInstance(ExportTasksMain.class); + + Instructions instructions = exportTasks.fetchInstructions(ExportTaskRequest.class); + try { + exportTasks.run(instructions.value()); + instructions.ok(); + } + catch (Exception e) { + logger.error("Error running export task", e); + instructions.err(); + } + + } + + @Inject + public ExportTasksMain(MessageQueueFactory messageQueueFactory, + ProcessConfiguration config, + AtagExporter atagExporter, + FeedExporter feedExporter, + SampleDataExporter sampleDataExporter, + TermFrequencyExporter termFrequencyExporter, + Gson gson, WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator) + { + super(messageQueueFactory, config, gson, ProcessInboxNames.EXPORT_TASK_INBOX); + this.atagExporter = atagExporter; + this.feedExporter = feedExporter; + this.sampleDataExporter = sampleDataExporter; + this.termFrequencyExporter = termFrequencyExporter; + this.websiteAdjacenciesCalculator = websiteAdjacenciesCalculator; + } + + private void run(ExportTaskRequest request) throws Exception { + switch (request.task) { + case ATAGS: atagExporter.export(request.crawlId, request.destId); break; + case FEEDS: feedExporter.export(request.crawlId, request.destId); break; + case TERM_FREQ: termFrequencyExporter.export(request.crawlId, request.destId); break; + case SAMPLE_DATA: sampleDataExporter.export(request.crawlId, request.destId, request.size, request.name); break; + case ADJACENCIES: websiteAdjacenciesCalculator.export(); break; + } + } + + +} diff --git a/code/processes/website-adjacencies-calculator/test/nu/marginalia/adjacencies/AdjacenciesLoaderTest.java b/code/processes/export-task-process/test/nu/marginalia/adjacencies/AdjacenciesLoaderTest.java similarity index 100% rename from code/processes/website-adjacencies-calculator/test/nu/marginalia/adjacencies/AdjacenciesLoaderTest.java rename to code/processes/export-task-process/test/nu/marginalia/adjacencies/AdjacenciesLoaderTest.java diff --git a/code/processes/website-adjacencies-calculator/test/nu/marginalia/adjacencies/SparseBitVectorTest.java b/code/processes/export-task-process/test/nu/marginalia/adjacencies/SparseBitVectorTest.java similarity index 100% rename from code/processes/website-adjacencies-calculator/test/nu/marginalia/adjacencies/SparseBitVectorTest.java rename to code/processes/export-task-process/test/nu/marginalia/adjacencies/SparseBitVectorTest.java diff --git a/code/processes/index-constructor-process/build.gradle b/code/processes/index-constructor-process/build.gradle index 6de7e773..04416bc6 100644 --- a/code/processes/index-constructor-process/build.gradle +++ b/code/processes/index-constructor-process/build.gradle @@ -22,7 +22,6 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:processes:process-mq-api') - implementation project(':code:common:process') implementation project(':code:common:service') implementation project(':code:common:db') implementation project(':code:common:config') diff --git a/code/processes/index-constructor-process/java/nu/marginalia/index/IndexConstructorMain.java b/code/processes/index-constructor-process/java/nu/marginalia/index/IndexConstructorMain.java index ef93b554..470b1694 100644 --- a/code/processes/index-constructor-process/java/nu/marginalia/index/IndexConstructorMain.java +++ b/code/processes/index-constructor-process/java/nu/marginalia/index/IndexConstructorMain.java @@ -1,11 +1,8 @@ package nu.marginalia.index; -import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import nu.marginalia.IndexLocations; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.index.construction.full.FullIndexConstructor; import nu.marginalia.index.construction.prio.PrioIndexConstructor; import nu.marginalia.index.domainrankings.DomainRankings; @@ -15,13 +12,11 @@ import nu.marginalia.index.journal.IndexJournal; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.MqMessage; -import nu.marginalia.mq.inbox.MqInboxResponse; -import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.mqapi.index.CreateIndexRequest; -import nu.marginalia.mqapi.index.IndexName; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeatImpl; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.storage.FileStorageService; import org.slf4j.Logger; @@ -31,8 +26,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; -import java.util.Optional; -import java.util.UUID; import java.util.concurrent.TimeUnit; import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX; @@ -40,15 +33,12 @@ import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX; public class IndexConstructorMain extends ProcessMainClass { private final FileStorageService fileStorageService; private final ProcessHeartbeatImpl heartbeat; - private final MessageQueueFactory messageQueueFactory; private final DomainRankings domainRankings; - private final int node; private static final Logger logger = LoggerFactory.getLogger(IndexConstructorMain.class); - private final Gson gson = GsonFactory.get(); - public static void main(String[] args) throws Exception { - CreateIndexInstructions instructions = null; + public static void main(String[] args) throws Exception { + Instructions instructions = null; try { new org.mariadb.jdbc.Driver(); @@ -58,9 +48,8 @@ public class IndexConstructorMain extends ProcessMainClass { new DatabaseModule(false)) .getInstance(IndexConstructorMain.class); - instructions = main.fetchInstructions(); - - main.run(instructions); + instructions = main.fetchInstructions(CreateIndexRequest.class); + main.run(instructions.value()); instructions.ok(); } catch (Exception ex) { @@ -85,17 +74,17 @@ public class IndexConstructorMain extends ProcessMainClass { ProcessConfiguration processConfiguration, DomainRankings domainRankings) { + super(messageQueueFactory, processConfiguration, GsonFactory.get(), INDEX_CONSTRUCTOR_INBOX); + this.fileStorageService = fileStorageService; this.heartbeat = heartbeat; - this.messageQueueFactory = messageQueueFactory; this.domainRankings = domainRankings; - this.node = processConfiguration.node(); } - private void run(CreateIndexInstructions instructions) throws SQLException, IOException { + private void run(CreateIndexRequest instructions) throws SQLException, IOException { heartbeat.start(); - switch (instructions.name) { + switch (instructions.indexName()) { case FORWARD -> createForwardIndex(); case REVERSE_FULL -> createFullReverseIndex(); case REVERSE_PRIO -> createPrioReverseIndex(); @@ -171,52 +160,4 @@ public class IndexConstructorMain extends ProcessMainClass { docId); } - private static class CreateIndexInstructions { - - public final IndexName name; - private final MqSingleShotInbox inbox; - private final MqMessage message; - - private CreateIndexInstructions(IndexName name, MqSingleShotInbox inbox, MqMessage message) { - this.name = name; - this.inbox = inbox; - this.message = message; - } - - public void ok() { - inbox.sendResponse(message, MqInboxResponse.ok()); - } - public void err() { - inbox.sendResponse(message, MqInboxResponse.err()); - } - } - - private CreateIndexInstructions fetchInstructions() throws Exception { - - var inbox = messageQueueFactory.createSingleShotInbox(INDEX_CONSTRUCTOR_INBOX, node, UUID.randomUUID()); - - logger.info("Waiting for instructions"); - var msgOpt = getMessage(inbox, CreateIndexRequest.class.getSimpleName()); - var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); - - var payload = gson.fromJson(msg.payload(), CreateIndexRequest.class); - var name = payload.indexName(); - - return new CreateIndexInstructions(name, inbox, msg); - } - - private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { - var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (opt.isPresent()) { - if (!opt.get().function().equals(expectedFunction)) { - throw new RuntimeException("Unexpected function: " + opt.get().function()); - } - return opt; - } - else { - var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); - stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); - return stolenMessage; - } - } } diff --git a/code/processes/live-crawling-process/build.gradle b/code/processes/live-crawling-process/build.gradle index b60fd863..5dad1177 100644 --- a/code/processes/live-crawling-process/build.gradle +++ b/code/processes/live-crawling-process/build.gradle @@ -21,7 +21,6 @@ tasks.distZip.enabled = false apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:process') implementation project(':code:common:db') implementation project(':code:common:model') diff --git a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java index 0c390641..edc90909 100644 --- a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java +++ b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java @@ -4,8 +4,6 @@ import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.WmsaHome; import nu.marginalia.api.feeds.FeedsClient; import nu.marginalia.converting.ConverterModule; @@ -21,12 +19,11 @@ import nu.marginalia.loading.domains.DbDomainIdRegistry; import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.model.EdgeDomain; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.MqMessage; -import nu.marginalia.mq.inbox.MqInboxResponse; -import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.mqapi.crawling.LiveCrawlRequest; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeat; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.ServiceDiscoveryModule; import nu.marginalia.storage.FileStorageService; @@ -38,11 +35,11 @@ import org.slf4j.LoggerFactory; import java.nio.file.Files; import java.nio.file.Path; import java.security.Security; -import java.sql.SQLException; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static nu.marginalia.mqapi.ProcessInboxNames.LIVE_CRAWLER_INBOX; @@ -52,16 +49,13 @@ public class LiveCrawlerMain extends ProcessMainClass { LoggerFactory.getLogger(LiveCrawlerMain.class); private final FeedsClient feedsClient; - private final Gson gson; private final ProcessHeartbeat heartbeat; private final DbDomainQueries domainQueries; private final DomainBlacklist domainBlacklist; - private final MessageQueueFactory messageQueueFactory; private final DomainProcessor domainProcessor; private final FileStorageService fileStorageService; private final KeywordLoaderService keywordLoaderService; private final DocumentLoaderService documentLoaderService; - private final int node; @Inject public LiveCrawlerMain(FeedsClient feedsClient, @@ -77,13 +71,12 @@ public class LiveCrawlerMain extends ProcessMainClass { DocumentLoaderService documentLoaderService) throws Exception { + super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX); + this.feedsClient = feedsClient; - this.gson = gson; this.heartbeat = heartbeat; this.domainQueries = domainQueries; this.domainBlacklist = domainBlacklist; - this.messageQueueFactory = messageQueueFactory; - this.node = config.node(); this.domainProcessor = domainProcessor; this.fileStorageService = fileStorageService; this.keywordLoaderService = keywordLoaderService; @@ -117,7 +110,7 @@ public class LiveCrawlerMain extends ProcessMainClass { ); var crawler = injector.getInstance(LiveCrawlerMain.class); - LiveCrawlInstructions instructions = crawler.fetchInstructions(); + Instructions instructions = crawler.fetchInstructions(LiveCrawlRequest.class); try{ crawler.run(); @@ -225,57 +218,4 @@ public class LiveCrawlerMain extends ProcessMainClass { } } - private LiveCrawlInstructions fetchInstructions() throws Exception { - - var inbox = messageQueueFactory.createSingleShotInbox(LIVE_CRAWLER_INBOX, node, UUID.randomUUID()); - - logger.info("Waiting for instructions"); - - var msgOpt = getMessage(inbox, LiveCrawlRequest.class.getSimpleName()); - var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); - - // for live crawl, request is empty for now - LiveCrawlRequest request = gson.fromJson(msg.payload(), LiveCrawlRequest.class); - - return new LiveCrawlInstructions(msg, inbox); - } - - - private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { - var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (opt.isPresent()) { - if (!opt.get().function().equals(expectedFunction)) { - throw new RuntimeException("Unexpected function: " + opt.get().function()); - } - return opt; - } - else { - var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); - stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); - return stolenMessage; - } - } - - - private static class LiveCrawlInstructions { - private final MqMessage message; - private final MqSingleShotInbox inbox; - - LiveCrawlInstructions(MqMessage message, - MqSingleShotInbox inbox) - { - this.message = message; - this.inbox = inbox; - } - - - public void ok() { - inbox.sendResponse(message, MqInboxResponse.ok()); - } - public void err() { - inbox.sendResponse(message, MqInboxResponse.err()); - } - - } - } diff --git a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerModule.java b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerModule.java index 6088fb5e..5dc779b0 100644 --- a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerModule.java +++ b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerModule.java @@ -6,10 +6,10 @@ import com.google.inject.Provides; import com.google.inject.Singleton; import com.google.inject.name.Names; import nu.marginalia.IndexLocations; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; import nu.marginalia.linkdb.docs.DocumentDbWriter; +import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.storage.FileStorageService; diff --git a/code/processes/loading-process/build.gradle b/code/processes/loading-process/build.gradle index 0d9d51c1..4a97812e 100644 --- a/code/processes/loading-process/build.gradle +++ b/code/processes/loading-process/build.gradle @@ -20,7 +20,6 @@ tasks.distZip.enabled = false apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:process') implementation project(':code:processes:process-mq-api') implementation project(':code:index:api') implementation project(':code:common:model') diff --git a/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java index 5b59d972..cc639711 100644 --- a/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java @@ -4,8 +4,6 @@ import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.linkdb.docs.DocumentDbWriter; import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService; @@ -13,26 +11,21 @@ import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.loading.domains.DomainLoaderService; import nu.marginalia.loading.links.DomainLinksLoaderService; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.MqMessage; -import nu.marginalia.mq.MqMessageState; -import nu.marginalia.mq.inbox.MqInboxResponse; -import nu.marginalia.mq.inbox.MqSingleShotInbox; +import nu.marginalia.mqapi.loading.LoadRequest; +import nu.marginalia.process.ProcessConfiguration; +import nu.marginalia.process.ProcessConfigurationModule; +import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeatImpl; -import nu.marginalia.service.ProcessMainClass; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.storage.FileStorageService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.file.Path; -import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.Optional; -import java.util.UUID; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX; @@ -40,15 +33,12 @@ public class LoaderMain extends ProcessMainClass { private static final Logger logger = LoggerFactory.getLogger(LoaderMain.class); private final ProcessHeartbeatImpl heartbeat; - private final MessageQueueFactory messageQueueFactory; private final FileStorageService fileStorageService; private final DocumentDbWriter documentDbWriter; private final DomainLoaderService domainService; private final DomainLinksLoaderService linksService; private final KeywordLoaderService keywordLoaderService; private final DocumentLoaderService documentLoaderService; - private final int node; - private final Gson gson; public static void main(String... args) { try { @@ -62,7 +52,7 @@ public class LoaderMain extends ProcessMainClass { var instance = injector.getInstance(LoaderMain.class); - var instructions = instance.fetchInstructions(); + var instructions = instance.fetchInstructions(LoadRequest.class); logger.info("Instructions received"); instance.run(instructions); } @@ -83,22 +73,27 @@ public class LoaderMain extends ProcessMainClass { ProcessConfiguration processConfiguration, Gson gson ) { - this.node = processConfiguration.node(); + + super(messageQueueFactory, processConfiguration, gson, LOADER_INBOX); + this.heartbeat = heartbeat; - this.messageQueueFactory = messageQueueFactory; this.fileStorageService = fileStorageService; this.documentDbWriter = documentDbWriter; this.domainService = domainService; this.linksService = linksService; this.keywordLoaderService = keywordLoaderService; this.documentLoaderService = documentLoaderService; - this.gson = gson; heartbeat.start(); } - void run(LoadRequest instructions) throws Throwable { - LoaderInputData inputData = instructions.inputData; + void run(Instructions instructions) throws Throwable { + + List inputSources = new ArrayList<>(); + for (var storageId : instructions.value().inputProcessDataStorageIds) { + inputSources.add(fileStorageService.getStorage(storageId).asPath()); + } + var inputData = new LoaderInputData(inputSources); DomainIdRegistry domainIdRegistry = domainService.getOrCreateDomainIds(heartbeat, inputData); @@ -134,67 +129,5 @@ public class LoaderMain extends ProcessMainClass { System.exit(0); } - private static class LoadRequest { - private final LoaderInputData inputData; - private final MqMessage message; - private final MqSingleShotInbox inbox; - - LoadRequest(LoaderInputData inputData, MqMessage message, MqSingleShotInbox inbox) { - this.inputData = inputData; - this.message = message; - this.inbox = inbox; - } - - public void ok() { - inbox.sendResponse(message, MqInboxResponse.ok()); - } - public void err() { - inbox.sendResponse(message, MqInboxResponse.err()); - } - } - - private LoadRequest fetchInstructions() throws Exception { - - var inbox = messageQueueFactory.createSingleShotInbox(LOADER_INBOX, node, UUID.randomUUID()); - - var msgOpt = getMessage(inbox, nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName()); - if (msgOpt.isEmpty()) - throw new RuntimeException("No instruction received in inbox"); - var msg = msgOpt.get(); - - if (!nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName().equals(msg.function())) { - throw new RuntimeException("Unexpected message in inbox: " + msg); - } - - try { - var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.loading.LoadRequest.class); - - List inputSources = new ArrayList<>(); - for (var storageId : request.inputProcessDataStorageIds) { - inputSources.add(fileStorageService.getStorage(storageId).asPath()); - } - - return new LoadRequest(new LoaderInputData(inputSources), msg, inbox); - } - catch (Exception ex) { - inbox.sendResponse(msg, new MqInboxResponse("FAILED", MqMessageState.ERR)); - throw ex; - } - } - - private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { - var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); - if (opt.isPresent()) { - if (!opt.get().function().equals(expectedFunction)) { - throw new RuntimeException("Unexpected function: " + opt.get().function()); - } - return opt; - } - else { - var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); - stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); - return stolenMessage; - } - } } diff --git a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java index ae3229ad..383fac01 100644 --- a/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java +++ b/code/processes/loading-process/java/nu/marginalia/loading/domains/DomainLoaderService.java @@ -3,11 +3,11 @@ package nu.marginalia.loading.domains; import com.google.inject.Inject; import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.loading.LoaderInputData; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.processed.SlopDomainLinkRecord; import nu.marginalia.model.processed.SlopDomainRecord; +import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.slop.SlopTable; diff --git a/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java b/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java index ab8e3b37..e5582aed 100644 --- a/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java +++ b/code/processes/process-mq-api/java/nu/marginalia/mqapi/ProcessInboxNames.java @@ -6,5 +6,7 @@ public class ProcessInboxNames { public static final String CRAWLER_INBOX = "crawler"; public static final String LIVE_CRAWLER_INBOX = "live-crawler"; + public static final String EXPORT_TASK_INBOX = "export-task"; + public static final String INDEX_CONSTRUCTOR_INBOX = "index_constructor"; } diff --git a/code/processes/process-mq-api/java/nu/marginalia/mqapi/tasks/ExportTaskRequest.java b/code/processes/process-mq-api/java/nu/marginalia/mqapi/tasks/ExportTaskRequest.java new file mode 100644 index 00000000..036f3a10 --- /dev/null +++ b/code/processes/process-mq-api/java/nu/marginalia/mqapi/tasks/ExportTaskRequest.java @@ -0,0 +1,57 @@ +package nu.marginalia.mqapi.tasks; + +import nu.marginalia.storage.model.FileStorageId; + +public class ExportTaskRequest { + public enum Task { + ATAGS, + FEEDS, + TERM_FREQ, + SAMPLE_DATA, + ADJACENCIES, + } + + public Task task; + public FileStorageId crawlId; + public FileStorageId destId; + public int size; + public String name; + + public ExportTaskRequest(Task task) { + this.task = task; + } + + public static ExportTaskRequest atags(FileStorageId crawlId, FileStorageId destId) { + ExportTaskRequest request = new ExportTaskRequest(Task.ATAGS); + request.crawlId = crawlId; + request.destId = destId; + return request; + } + + public static ExportTaskRequest feeds(FileStorageId crawlId, FileStorageId destId) { + ExportTaskRequest request = new ExportTaskRequest(Task.FEEDS); + request.crawlId = crawlId; + request.destId = destId; + return request; + } + + public static ExportTaskRequest termFreq(FileStorageId crawlId, FileStorageId destId) { + ExportTaskRequest request = new ExportTaskRequest(Task.TERM_FREQ); + request.crawlId = crawlId; + request.destId = destId; + return request; + } + + public static ExportTaskRequest sampleData(FileStorageId crawlId, FileStorageId destId, int size, String name) { + ExportTaskRequest request = new ExportTaskRequest(Task.SAMPLE_DATA); + request.crawlId = crawlId; + request.destId = destId; + request.size = size; + request.name = name; + return request; + } + + public static ExportTaskRequest adjacencies() { + return new ExportTaskRequest(Task.ADJACENCIES); + } +} diff --git a/code/processes/website-adjacencies-calculator/build.gradle b/code/processes/website-adjacencies-calculator/build.gradle deleted file mode 100644 index 37787b6a..00000000 --- a/code/processes/website-adjacencies-calculator/build.gradle +++ /dev/null @@ -1,49 +0,0 @@ -plugins { - id 'java' - - id 'application' - id 'jvm-test-suite' -} -java { - toolchain { - languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion)) - } -} - -application { - mainClass = 'nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator' - applicationName = 'website-adjacencies-calculator' -} - -tasks.distZip.enabled = false - -apply from: "$rootProject.projectDir/srcsets.gradle" - -dependencies { - implementation project(':code:common:model') - implementation project(':code:common:db') - implementation project(':code:common:process') - implementation project(':code:common:service') - implementation project(':code:functions:link-graph:api') - - implementation libs.bundles.slf4j - - implementation libs.guava - implementation dependencies.create(libs.guice.get()) { - exclude group: 'com.google.guava' - } - implementation libs.roaringbitmap - implementation libs.trove - implementation libs.fastutil - implementation libs.bundles.mariadb - - testImplementation libs.bundles.slf4j.test - testImplementation libs.bundles.junit - testImplementation libs.mockito - - testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') - testImplementation libs.commons.codec - testImplementation 'org.testcontainers:mariadb:1.17.4' - testImplementation 'org.testcontainers:junit-jupiter:1.17.4' - testImplementation project(':code:libraries:test-helpers') -} diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java b/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java deleted file mode 100644 index 1389cd63..00000000 --- a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java +++ /dev/null @@ -1,198 +0,0 @@ -package nu.marginalia.adjacencies; - -import com.google.inject.Guice; -import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.api.linkgraph.AggregateLinkGraphClient; -import nu.marginalia.db.DbDomainQueries; -import nu.marginalia.model.EdgeDomain; -import nu.marginalia.process.control.ProcessHeartbeat; -import nu.marginalia.process.control.ProcessHeartbeatImpl; -import nu.marginalia.service.ProcessMainClass; -import nu.marginalia.service.module.DatabaseModule; -import nu.marginalia.service.module.ServiceDiscoveryModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.stream.IntStream; - -import static nu.marginalia.adjacencies.SparseBitVector.andCardinality; -import static nu.marginalia.adjacencies.SparseBitVector.weightedProduct; - -public class WebsiteAdjacenciesCalculator extends ProcessMainClass { - private final HikariDataSource dataSource; - public AdjacenciesData adjacenciesData; - public DomainAliases domainAliases; - private static final Logger logger = LoggerFactory.getLogger(WebsiteAdjacenciesCalculator.class); - - float[] weights; - public WebsiteAdjacenciesCalculator(AggregateLinkGraphClient domainLinksClient, HikariDataSource dataSource) throws SQLException { - this.dataSource = dataSource; - - domainAliases = new DomainAliases(dataSource); - adjacenciesData = new AdjacenciesData(domainLinksClient, domainAliases); - weights = adjacenciesData.getWeights(); - } - - public void tryDomains(String... domainName) { - var dataStoreDao = new DbDomainQueries(dataSource); - - System.out.println(Arrays.toString(domainName)); - - int[] domainIds = Arrays.stream(domainName).map(EdgeDomain::new) - .mapToInt(dataStoreDao::getDomainId) - .map(domainAliases::deAlias) - .toArray(); - - for (int domainId : domainIds) { - findAdjacentDtoS(domainId, similarities -> { - for (var similarity : similarities.similarities()) { - System.out.println(dataStoreDao.getDomain(similarity.domainId).map(Object::toString).orElse("") + " " + prettyPercent(similarity.value)); - } - }); - } - } - - private String prettyPercent(double val) { - return String.format("%2.2f%%", 100. * val); - } - - public void loadAll(ProcessHeartbeat processHeartbeat) throws InterruptedException { - AdjacenciesLoader loader = new AdjacenciesLoader(dataSource); - var executor = Executors.newFixedThreadPool(16); - - int total = adjacenciesData.getIdsList().size(); - AtomicInteger progress = new AtomicInteger(0); - IntStream.of(adjacenciesData.getIdsList().toArray()).parallel() - .filter(domainAliases::isNotAliased) - .forEach(id -> { - findAdjacent(id, loader::load); - processHeartbeat.setProgress(progress.incrementAndGet() / (double) total); - }); - - executor.shutdown(); - System.out.println("Waiting for wrap-up"); - loader.stop(); - } - - public void findAdjacent(int domainId, Consumer andThen) { - findAdjacentDtoS(domainId, andThen); - } - - double cosineSimilarity(SparseBitVector a, SparseBitVector b) { - double andCardinality = andCardinality(a, b); - andCardinality /= Math.sqrt(a.getCardinality()); - andCardinality /= Math.sqrt(b.getCardinality()); - return andCardinality; - } - - double expensiveCosineSimilarity(SparseBitVector a, SparseBitVector b) { - return weightedProduct(weights, a, b) / Math.sqrt(a.mulAndSum(weights) * b.mulAndSum(weights)); - } - - public record DomainSimilarities(int domainId, List similarities) {} - - public record DomainSimilarity(int domainId, double value) {} - - private void findAdjacentDtoS(int domainId, Consumer andThen) { - var vector = adjacenciesData.getVector(domainId); - - if (vector == null || !vector.cardinalityExceeds(10)) { - return; - } - - List similarities = new ArrayList<>(1000); - - var items = adjacenciesData.getCandidates(vector); - - - int cardMin = Math.max(2, (int) (0.01 * vector.getCardinality())); - - items.forEach(id -> { - var otherVec = adjacenciesData.getVector(id); - - if (null == otherVec || otherVec == vector) - return true; - - if (otherVec.getCardinality() < cardMin) - return true; - - double similarity = cosineSimilarity(vector, otherVec); - if (similarity > 0.1) { - var recalculated = expensiveCosineSimilarity(vector, otherVec); - if (recalculated > 0.1) { - similarities.add(new DomainSimilarity(id, recalculated)); - } - } - - return true; - }); - - if (similarities.size() > 128) { - similarities.sort(Comparator.comparing(DomainSimilarity::value)); - similarities.subList(0, similarities.size() - 128).clear(); - } - - - andThen.accept(new DomainSimilarities(domainId, similarities)); - } - - - public static void main(String[] args) throws SQLException, InterruptedException { - var injector = Guice.createInjector( - new DatabaseModule(false), - new ServiceDiscoveryModule()); - - - var dataSource = injector.getInstance(HikariDataSource.class); - var lc = injector.getInstance(AggregateLinkGraphClient.class); - - if (!lc.waitReady(Duration.ofSeconds(30))) { - throw new IllegalStateException("Failed to connect to domain-links"); - } - - var main = new WebsiteAdjacenciesCalculator(lc, dataSource); - - if (args.length == 1 && "load".equals(args[0])) { - var processHeartbeat = new ProcessHeartbeatImpl( - new ProcessConfiguration("website-adjacencies-calculator", 0, UUID.randomUUID()), - dataSource - ); - - try { - processHeartbeat.start(); - main.loadAll(processHeartbeat); - } - catch (Exception ex) { - logger.error("Failed to load", ex); - } - finally { - processHeartbeat.shutDown(); - } - return; - } - - for (;;) { - String domains = System.console().readLine("> "); - - if (domains.isBlank()) - break; - - var parts = domains.split("\\s+,\\s+"); - try { - main.tryDomains(parts); - } - catch (Exception ex) { - ex.printStackTrace(); - } - } - - } - -} diff --git a/code/processes/website-adjacencies-calculator/readme.md b/code/processes/website-adjacencies-calculator/readme.md deleted file mode 100644 index 6a69a10a..00000000 --- a/code/processes/website-adjacencies-calculator/readme.md +++ /dev/null @@ -1,8 +0,0 @@ -# Website Adjacencies Calculator - -This job updates the website similarity table based on the data in the domain and links-tables in the URL database. - -It performs a brute force cosine similarity calculation across the entire link graph. - -These adjacencies power the [explorer service](../../services-application/explorer-service) and -[random websites](../../features-search/random-websites)-functionality. \ No newline at end of file diff --git a/code/services-application/explorer-service/readme.md b/code/services-application/explorer-service/readme.md index 567886d6..4f3204b1 100644 --- a/code/services-application/explorer-service/readme.md +++ b/code/services-application/explorer-service/readme.md @@ -8,4 +8,3 @@ Externally the service is available at [https://explore2.marginalia.nu/](https:/ * [features-search/screenshots](../../features-search/screenshots) * [features-search/random-websites](../../features-search/random-websites) -* [processes/website-adjacencies-calculator](../../processes/website-adjacencies-calculator) diff --git a/code/services-core/executor-service/build.gradle b/code/services-core/executor-service/build.gradle index 2e7934bc..dc18f2e1 100644 --- a/code/services-core/executor-service/build.gradle +++ b/code/services-core/executor-service/build.gradle @@ -25,7 +25,6 @@ dependencies { // These look weird but they're needed to be able to spawn the processes // from the executor service - implementation project(':code:processes:website-adjacencies-calculator') implementation project(':code:processes:crawling-process') implementation project(':code:processes:loading-process') implementation project(':code:processes:converting-process') @@ -33,7 +32,6 @@ dependencies { implementation project(':code:common:config') implementation project(':code:common:model') - implementation project(':code:common:process') implementation project(':code:common:db') implementation project(':code:common:linkdb') @@ -48,7 +46,6 @@ dependencies { implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:model') implementation project(':code:processes:crawling-process:ft-link-parser') - implementation project(':code:execution:data-extractors') implementation project(':code:index:index-journal') implementation project(':code:index:api') implementation project(':code:processes:process-mq-api') diff --git a/code/services-core/index-service/build.gradle b/code/services-core/index-service/build.gradle index df3773f9..af3a0fdd 100644 --- a/code/services-core/index-service/build.gradle +++ b/code/services-core/index-service/build.gradle @@ -37,7 +37,6 @@ dependencies { implementation project(':code:index:api') testImplementation project(path: ':code:services-core:control-service') - testImplementation project(':code:common:process') implementation libs.bundles.slf4j diff --git a/code/tools/experiment-runner/build.gradle b/code/tools/experiment-runner/build.gradle index d011a973..0e26439f 100644 --- a/code/tools/experiment-runner/build.gradle +++ b/code/tools/experiment-runner/build.gradle @@ -27,7 +27,6 @@ dependencies { implementation project(':code:common:db') implementation project(':code:common:model') implementation project(':code:common:config') - implementation project(':code:common:process') implementation project(':code:common:service') implementation project(':code:libraries:language-processing') implementation project(':code:libraries:term-frequency-dict') diff --git a/code/tools/integration-test/build.gradle b/code/tools/integration-test/build.gradle index 81e3cde9..0e5d3cf0 100644 --- a/code/tools/integration-test/build.gradle +++ b/code/tools/integration-test/build.gradle @@ -34,7 +34,6 @@ dependencies { implementation project(':code:common:db') implementation project(':code:common:config') implementation project(':code:common:linkdb') - implementation project(':code:common:process') implementation project(':code:common:service') implementation project(':code:common:model') diff --git a/code/tools/integration-test/test/nu/marginalia/test/IntegrationTestModule.java b/code/tools/integration-test/test/nu/marginalia/test/IntegrationTestModule.java index 83f79fbf..2869a229 100644 --- a/code/tools/integration-test/test/nu/marginalia/test/IntegrationTestModule.java +++ b/code/tools/integration-test/test/nu/marginalia/test/IntegrationTestModule.java @@ -8,7 +8,6 @@ import com.google.inject.name.Names; import gnu.trove.list.array.TIntArrayList; import nu.marginalia.IndexLocations; import nu.marginalia.LanguageModels; -import nu.marginalia.ProcessConfiguration; import nu.marginalia.WmsaHome; import nu.marginalia.db.DomainTypes; import nu.marginalia.index.domainrankings.DomainRankings; @@ -18,6 +17,7 @@ import nu.marginalia.index.searchset.SearchSetsService; import nu.marginalia.linkdb.docs.DocumentDbReader; import nu.marginalia.linkdb.docs.DocumentDbWriter; import nu.marginalia.linkgraph.io.DomainLinksWriter; +import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.control.FakeProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.service.ServiceId; diff --git a/settings.gradle b/settings.gradle index 6eda9ab0..1ed535be 100644 --- a/settings.gradle +++ b/settings.gradle @@ -59,7 +59,6 @@ include 'code:features-search:screenshots' include 'code:features-search:random-websites' include 'code:processes:converting-process:ft-anchor-keywords' -include 'code:execution:data-extractors' include 'code:processes:crawling-process:ft-crawl-blocklist' include 'code:processes:crawling-process:ft-link-parser' @@ -74,7 +73,6 @@ include 'code:common:service' include 'code:common:config' include 'code:common:model' include 'code:common:renderer' -include 'code:common:process' include 'code:processes:converting-process' include 'code:processes:converting-process:model' @@ -86,7 +84,8 @@ include 'code:processes:crawling-process:model' include 'code:processes:loading-process' include 'code:processes:index-constructor-process' include 'code:processes:test-data' -include 'code:processes:website-adjacencies-calculator' + +include 'code:processes:export-task-process' include 'code:tools:experiment-runner' include 'code:tools:screenshot-capture-tool'