(refac) Move export tasks to a process and clean up process initialization for all ProcessMainClass descendents

Since some of the export tasks have been memory hungry, sometimes killing the executor-services, they've been moved to a separate process that can be given a larger Xmx.

While doing this, the ProcessMainClass was given utilities for the boilerplate surrounding receiving mq requests and responding to them, some effort was also put toward making the process boot process a bit more uniform.  It's still a bit heterogeneous between different processes, but a bit less so for now.
This commit is contained in:
Viktor Lofgren 2024-11-21 16:00:09 +01:00
parent 47dfbacb00
commit 51e46ad2b0
79 changed files with 802 additions and 912 deletions

View File

@ -2,6 +2,7 @@ package nu.marginalia.nodecfg;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.nodecfg.model.NodeProfile;
import nu.marginalia.test.TestMigrationLoader;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
@ -46,8 +47,8 @@ public class NodeConfigurationServiceTest {
@Test
public void test() throws SQLException {
var a = nodeConfigurationService.create(1, "Test", false, false);
var b = nodeConfigurationService.create(2, "Foo", true, false);
var a = nodeConfigurationService.create(1, "Test", false, false, NodeProfile.MIXED);
var b = nodeConfigurationService.create(2, "Foo", true, false, NodeProfile.MIXED);
assertEquals(1, a.node());
assertEquals("Test", a.description());

View File

@ -1,34 +0,0 @@
plugins {
id 'java'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation libs.notnull
implementation libs.bundles.slf4j
testImplementation libs.bundles.slf4j.test
implementation libs.guava
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
implementation libs.bundles.mariadb
implementation libs.commons.lang3
implementation libs.snakeyaml
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}

View File

@ -1,4 +0,0 @@
# Process
Basic functionality for a Process. Processes must include this dependency to ensure
their loggers are configured properly!

View File

@ -1,9 +0,0 @@
log4j2.isThreadContextMapInheritable=true
status = info
appender.console.type = Console
appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %highlight{%-5level}{FATAL=red, ERROR=red, WARN=yellow} %c{1}- %msg{nolookups}%n
appender.console.filter.http.type = MarkerFilter
rootLogger.level = info
rootLogger.appenderRef.console.ref = LogToConsole

View File

@ -1,4 +1,4 @@
package nu.marginalia;
package nu.marginalia.process;
import java.util.UUID;

View File

@ -1,4 +1,4 @@
package nu.marginalia;
package nu.marginalia.process;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;

View File

@ -0,0 +1,102 @@
package nu.marginalia.process;
import com.google.gson.Gson;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.service.ConfigLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public abstract class ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(ProcessMainClass.class);
private final MessageQueueFactory messageQueueFactory;
private final int node;
private final String inboxName;
static {
// Load global config ASAP
ConfigLoader.loadConfig(
ConfigLoader.getConfigPath("system")
);
}
private final Gson gson;
public ProcessMainClass(MessageQueueFactory messageQueueFactory,
ProcessConfiguration config,
Gson gson,
String inboxName
) {
this.gson = gson;
new org.mariadb.jdbc.Driver();
this.messageQueueFactory = messageQueueFactory;
this.node = config.node();
this.inboxName = inboxName;
}
protected <T> Instructions<T> fetchInstructions(Class<T> requestType) throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(inboxName, node, UUID.randomUUID());
logger.info("Waiting for instructions");
var msgOpt = getMessage(inbox, requestType.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
// for live crawl, request is empty for now
T request = gson.fromJson(msg.payload(), requestType);
return new Instructions<>(msg, inbox, request);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws InterruptedException, SQLException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
protected static class Instructions<T> {
private final MqMessage message;
private final MqSingleShotInbox inbox;
private final T value;
Instructions(MqMessage message, MqSingleShotInbox inbox, T value)
{
this.message = message;
this.inbox = inbox;
this.value = value;
}
public T value() {
return value;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
}

View File

@ -2,7 +2,7 @@ package nu.marginalia.process.control;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.process.ProcessConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -4,17 +4,18 @@ package nu.marginalia.process.control;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.process.ProcessConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
/** This service sends a heartbeat to the database every 5 seconds.
*/
@Singleton
public class ProcessHeartbeatImpl implements ProcessHeartbeat {
public class ProcessHeartbeatImpl implements ProcessHeartbeat, Closeable {
private final Logger logger = LoggerFactory.getLogger(ProcessHeartbeatImpl.class);
private final String processName;
private final String processBase;
@ -169,5 +170,9 @@ public class ProcessHeartbeatImpl implements ProcessHeartbeat {
}
}
}
public void close() {
shutDown();
}
}

View File

@ -2,7 +2,7 @@ package nu.marginalia.process.control;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.process.ProcessConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -9,11 +9,11 @@ import java.util.Properties;
public class ConfigLoader {
static Path getConfigPath(String configName) {
public static Path getConfigPath(String configName) {
return WmsaHome.getHomePath().resolve("conf/properties/" + configName + ".properties");
}
static void loadConfig(Path configPath) {
public static void loadConfig(Path configPath) {
if (!Files.exists(configPath)) {
System.err.println("No config file found at " + configPath);
return;

View File

@ -1,20 +0,0 @@
package nu.marginalia.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(ProcessMainClass.class);
static {
// Load global config ASAP
ConfigLoader.loadConfig(
ConfigLoader.getConfigPath("system")
);
}
public ProcessMainClass() {
new org.mariadb.jdbc.Driver();
}
}

View File

@ -15,7 +15,7 @@ dependencies {
// These look weird but they're needed to be able to spawn the processes
// from the executor service
implementation project(':code:processes:website-adjacencies-calculator')
implementation project(':code:processes:export-task-process')
implementation project(':code:processes:crawling-process')
implementation project(':code:processes:live-crawling-process')
implementation project(':code:processes:loading-process')
@ -24,7 +24,6 @@ dependencies {
implementation project(':code:common:config')
implementation project(':code:common:model')
implementation project(':code:common:process')
implementation project(':code:common:db')
implementation project(':code:common:linkdb')
@ -43,7 +42,6 @@ dependencies {
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:ft-link-parser')
implementation project(':code:execution:data-extractors')
implementation project(':code:index:index-journal')
implementation project(':code:index:api')
implementation project(':code:processes:process-mq-api')

View File

@ -1,7 +0,0 @@
Contains converter-*like* extraction jobs that operate on crawled data to produce export files.
## Important classes
* [AtagExporter](java/nu/marginalia/extractor/AtagExporter.java) - extracts anchor texts from the crawled data.
* [FeedExporter](java/nu/marginalia/extractor/FeedExporter.java) - tries to find RSS/Atom feeds within the crawled data.
* [TermFrequencyExporter](java/nu/marginalia/extractor/TermFrequencyExporter.java) - exports the 'TF' part of TF-IDF.

View File

@ -10,6 +10,7 @@ public enum ExecutorActor {
RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_CONVERTER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_SEGMENTATION_MODEL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),

View File

@ -59,6 +59,7 @@ public class ExecutorActorControlService {
ExportSampleDataActor exportSampleDataActor,
ExportTermFreqActor exportTermFrequenciesActor,
ExportSegmentationModelActor exportSegmentationModelActor,
ExportTaskMonitorActor exportTasksMonitorActor,
DownloadSampleActor downloadSampleActor,
ScrapeFeedsActor scrapeFeedsActor,
ExecutorActorStateMachines stateMachines,
@ -83,6 +84,7 @@ public class ExecutorActorControlService {
register(ExecutorActor.PROC_LOADER_SPAWNER, loaderMonitor);
register(ExecutorActor.PROC_CRAWLER_SPAWNER, crawlerMonitorActor);
register(ExecutorActor.PROC_LIVE_CRAWL_SPAWNER, liveCrawlerMonitorActor);
register(ExecutorActor.PROC_EXPORT_TASKS_SPAWNER, exportTasksMonitorActor);
register(ExecutorActor.MONITOR_PROCESS_LIVENESS, processMonitorFSM);
register(ExecutorActor.MONITOR_FILE_STORAGE, fileStorageMonitorActor);

View File

@ -0,0 +1,29 @@
package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration;
@Singleton
public class ExportTaskMonitorActor extends AbstractProcessSpawnerActor {
@Inject
public ExportTaskMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) {
super(gson,
configuration,
persistence,
processService,
ProcessInboxNames.EXPORT_TASK_INBOX,
ProcessService.ProcessId.EXPORT_TASKS);
}
}

View File

@ -3,50 +3,68 @@ package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.extractor.AtagExporter;
import nu.marginalia.extractor.ExporterIf;
import nu.marginalia.storage.model.*;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState;
import nu.marginalia.storage.model.FileStorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
@Singleton
public class ExportAtagsActor extends RecordActorPrototype {
private final FileStorageService storageService;
private final ExporterIf atagExporter;
private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
public record Export(FileStorageId crawlId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId) {
this(crawlId, destId, -1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
case Export(FileStorageId crawlId) -> {
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atag-export", "Anchor Tags " + LocalDateTime.now());
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atags-export", "Atags " + LocalDateTime.now());
if (storage == null) yield new Error("Bad storage id");
yield new Run(crawlId, storage.id());
}
case Run(FileStorageId crawlId, FileStorageId destId) -> {
case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW);
try {
atagExporter.export(crawlId, destId);
storageService.setFileStorageState(destId, FileStorageState.UNSET);
}
catch (Exception ex) {
storageService.setFileStorageState(destId, FileStorageState.DELETE);
yield new Error("Failed to export data");
}
yield new End();
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.atags(crawlId, destId));
yield new Run(crawlId, destId, newMsgId);
}
case Run(_, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed");
}
else {
storageService.setFileStorageState(destId, FileStorageState.UNSET);
yield new End();
}
}
default -> new Error();
};
}
@Override
public String describe() {
return "Export anchor tags from crawl data";
@ -55,11 +73,13 @@ public class ExportAtagsActor extends RecordActorPrototype {
@Inject
public ExportAtagsActor(Gson gson,
FileStorageService storageService,
AtagExporter atagExporter)
ProcessOutboxes processOutboxes,
ActorProcessWatcher processWatcher)
{
super(gson);
this.exportTasksOutbox = processOutboxes.getExportTasksOutbox();
this.storageService = storageService;
this.atagExporter = atagExporter;
this.processWatcher = processWatcher;
}
}

View File

@ -5,8 +5,11 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.extractor.ExporterIf;
import nu.marginalia.extractor.FeedExporter;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState;
@ -19,11 +22,17 @@ import java.time.LocalDateTime;
@Singleton
public class ExportFeedsActor extends RecordActorPrototype {
private final FileStorageService storageService;
private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final ExporterIf feedExporter;
public record Export(FileStorageId crawlId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId) {
this(crawlId, destId, -1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
@ -33,20 +42,25 @@ public class ExportFeedsActor extends RecordActorPrototype {
if (storage == null) yield new Error("Bad storage id");
yield new Run(crawlId, storage.id());
}
case Run(FileStorageId crawlId, FileStorageId destId) -> {
case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW);
try {
feedExporter.export(crawlId, destId);
storageService.setFileStorageState(destId, FileStorageState.UNSET);
}
catch (Exception ex) {
storageService.setFileStorageState(destId, FileStorageState.DELETE);
yield new Error("Failed to export data");
}
yield new End();
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.feeds(crawlId, destId));
yield new Run(crawlId, destId, newMsgId);
}
case Run(_, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed");
}
else {
storageService.setFileStorageState(destId, FileStorageState.UNSET);
yield new End();
}
}
default -> new Error();
};
}
@ -60,11 +74,13 @@ public class ExportFeedsActor extends RecordActorPrototype {
@Inject
public ExportFeedsActor(Gson gson,
FileStorageService storageService,
FeedExporter feedExporter)
ActorProcessWatcher processWatcher,
ProcessOutboxes outboxes)
{
super(gson);
this.storageService = storageService;
this.feedExporter = feedExporter;
this.processWatcher = processWatcher;
this.exportTasksOutbox = outboxes.getExportTasksOutbox();
}
}

View File

@ -5,7 +5,11 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.extractor.SampleDataExporter;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState;
@ -18,11 +22,17 @@ import java.time.LocalDateTime;
@Singleton
public class ExportSampleDataActor extends RecordActorPrototype {
private final FileStorageService storageService;
private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final SampleDataExporter dataExporter;
public record Export(FileStorageId crawlId, int size, String name) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, int size, String name) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId, int size, String name) {
this(crawlId, destId, size, name, -1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
@ -35,28 +45,29 @@ public class ExportSampleDataActor extends RecordActorPrototype {
if (storage == null) yield new Error("Bad storage id");
yield new Run(crawlId, storage.id(), size, name);
}
case Run(FileStorageId crawlId, FileStorageId destId, int size, String name) -> {
case Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW);
try {
dataExporter.export(crawlId, destId, size, name);
storageService.setFileStorageState(destId, FileStorageState.UNSET);
}
catch (Exception ex) {
storageService.setFileStorageState(destId, FileStorageState.DELETE);
logger.error("Failed to export data", ex);
yield new Error("Failed to export data");
}
yield new End();
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.sampleData(crawlId, destId, size, name));
yield new Run(crawlId, destId, size, name, newMsgId);
}
case Run(_, FileStorageId destId, _, _, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed");
}
else {
storageService.setFileStorageState(destId, FileStorageState.UNSET);
yield new End();
}
}
default -> new Error();
};
}
@Override
public String describe() {
return "Export RSS/Atom feeds from crawl data";
@ -65,11 +76,13 @@ public class ExportSampleDataActor extends RecordActorPrototype {
@Inject
public ExportSampleDataActor(Gson gson,
FileStorageService storageService,
SampleDataExporter dataExporter)
ProcessOutboxes processOutboxes,
ActorProcessWatcher processWatcher)
{
super(gson);
this.storageService = storageService;
this.dataExporter = dataExporter;
this.processWatcher = processWatcher;
this.exportTasksOutbox = processOutboxes.getExportTasksOutbox();
}
}

View File

@ -5,45 +5,62 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.extractor.ExporterIf;
import nu.marginalia.extractor.TermFrequencyExporter;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState;
import nu.marginalia.storage.model.FileStorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
@Singleton
public class ExportTermFreqActor extends RecordActorPrototype {
private final FileStorageService storageService;
private final ExporterIf exporter;
private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
public record Export(FileStorageId crawlId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId) {
this(crawlId, destId, -1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
case Export(FileStorageId crawlId) -> {
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq-export", "Term Frequencies " + LocalDateTime.now());
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq", "Term Frequencies " + LocalDateTime.now());
if (storage == null) yield new Error("Bad storage id");
yield new Run(crawlId, storage.id());
}
case Run(FileStorageId crawlId, FileStorageId destId) -> {
case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW);
try {
exporter.export(crawlId, destId);
storageService.setFileStorageState(destId, FileStorageState.UNSET);
}
catch (Exception ex) {
storageService.setFileStorageState(destId, FileStorageState.DELETE);
yield new Error("Failed to export data");
}
yield new End();
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.termFreq(crawlId, destId));
yield new Run(crawlId, destId, newMsgId);
}
case Run(_, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed");
}
else {
storageService.setFileStorageState(destId, FileStorageState.UNSET);
yield new End();
}
}
default -> new Error();
};
}
@ -57,11 +74,13 @@ public class ExportTermFreqActor extends RecordActorPrototype {
@Inject
public ExportTermFreqActor(Gson gson,
FileStorageService storageService,
TermFrequencyExporter exporter)
ProcessOutboxes processOutboxes,
ActorProcessWatcher processWatcher)
{
super(gson);
this.storageService = storageService;
this.exporter = exporter;
this.processWatcher = processWatcher;
this.exportTasksOutbox = processOutboxes.getExportTasksOutbox();
}
}

View File

@ -5,53 +5,59 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class TriggerAdjacencyCalculationActor extends RecordActorPrototype {
private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final ProcessService processService;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public record Run() implements ActorStep {}
public record Run(long msgId) implements ActorStep {
public Run() {
this(-1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Run() -> {
AtomicBoolean hasError = new AtomicBoolean(false);
var future = executor.submit(() -> {
try {
processService.trigger(ProcessService.ProcessId.ADJACENCIES_CALCULATOR, "load");
}
catch (Exception ex) {
logger.warn("Error triggering adjacency calculation", ex);
hasError.set(true);
}
});
future.get();
if (hasError.get()) {
yield new Error("Error triggering adjacency calculation");
}
yield new End();
return switch(self) {
case Run(long msgId) when msgId < 0 -> {
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.adjacencies());
yield new Run(newMsgId);
}
case Run(long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) {
yield new Error("Exporter failed");
}
else {
yield new End();
}
}
default -> new Error();
};
}
@Inject
public TriggerAdjacencyCalculationActor(Gson gson,
ProcessService processService) {
ProcessOutboxes processOutboxes,
ActorProcessWatcher processWatcher) {
super(gson);
this.processService = processService;
this.exportTasksOutbox = processOutboxes.getExportTasksOutbox();
this.processWatcher = processWatcher;
}
@Override

View File

@ -14,6 +14,7 @@ public class ProcessOutboxes {
private final MqOutbox crawlerOutbox;
private final MqOutbox indexConstructorOutbox;
private final MqOutbox liveCrawlerOutbox;
private final MqOutbox exportTasksOutbox;
@Inject
public ProcessOutboxes(BaseServiceParams params, MqPersistence persistence) {
@ -53,6 +54,14 @@ public class ProcessOutboxes {
params.configuration.node(),
params.configuration.instanceUuid()
);
exportTasksOutbox = new MqOutbox(persistence,
ProcessInboxNames.EXPORT_TASK_INBOX,
params.configuration.node(),
params.configuration.serviceName(),
params.configuration.node(),
params.configuration.instanceUuid()
);
}
@ -71,4 +80,6 @@ public class ProcessOutboxes {
public MqOutbox getIndexConstructorOutbox() { return indexConstructorOutbox; }
public MqOutbox getLiveCrawlerOutbox() { return liveCrawlerOutbox; }
public MqOutbox getExportTasksOutbox() { return exportTasksOutbox; }
}

View File

@ -3,15 +3,14 @@ package nu.marginalia.process;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.WmsaHome;
import nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator;
import nu.marginalia.converting.ConverterMain;
import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.index.IndexConstructorMain;
import nu.marginalia.livecrawler.LiveCrawlerMain;
import nu.marginalia.loading.LoaderMain;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import nu.marginalia.task.ExportTasksMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
@ -38,13 +37,13 @@ public class ProcessService {
private final int node;
public static ProcessService.ProcessId translateExternalIdBase(String id) {
public static ProcessId translateExternalIdBase(String id) {
return switch (id) {
case "converter" -> ProcessService.ProcessId.CONVERTER;
case "crawler" -> ProcessService.ProcessId.CRAWLER;
case "loader" -> ProcessService.ProcessId.LOADER;
case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR;
case "index-constructor" -> ProcessService.ProcessId.INDEX_CONSTRUCTOR;
case "converter" -> ProcessId.CONVERTER;
case "crawler" -> ProcessId.CRAWLER;
case "loader" -> ProcessId.LOADER;
case "export-tasks" -> ProcessId.EXPORT_TASKS;
case "index-constructor" -> ProcessId.INDEX_CONSTRUCTOR;
default -> null;
};
}
@ -55,7 +54,7 @@ public class ProcessService {
CONVERTER(ConverterMain.class),
LOADER(LoaderMain.class),
INDEX_CONSTRUCTOR(IndexConstructorMain.class),
ADJACENCIES_CALCULATOR(WebsiteAdjacenciesCalculator.class)
EXPORT_TASKS(ExportTasksMain.class),
;
public final String mainClass;
@ -70,7 +69,7 @@ public class ProcessService {
case CONVERTER -> "CONVERTER_PROCESS_OPTS";
case LOADER -> "LOADER_PROCESS_OPTS";
case INDEX_CONSTRUCTOR -> "INDEX_CONSTRUCTION_PROCESS_OPTS";
case ADJACENCIES_CALCULATOR -> "ADJACENCIES_CALCULATOR_PROCESS_OPTS";
case EXPORT_TASKS -> "EXPORT_TASKS_PROCESS_OPTS";
};
String value = System.getenv(variable);

View File

@ -67,7 +67,6 @@ dependencies {
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation libs.commons.lang3
testImplementation project(':code:common:process')
testImplementation project(':code:libraries:array')
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')

View File

@ -20,7 +20,7 @@ dependencies {
implementation project(':code:index:query')
implementation project(':code:index:index-journal')
implementation project(':code:common:model')
implementation project(':code:common:process')
implementation project(':code:common:service')
implementation project(':code:processes:converting-process:model')
implementation libs.bundles.slf4j

View File

@ -21,8 +21,8 @@ dependencies {
implementation project(':code:index:query')
implementation project(':code:index:index-journal')
implementation project(':code:common:model')
implementation project(':code:common:service')
implementation project(':code:processes:converting-process:model')
implementation project(':code:common:process')
implementation project(':third-party:parquet-floor')
implementation project(':third-party:commons-codec')

View File

@ -21,7 +21,6 @@ tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:process')
implementation project(':third-party:porterstemmer')
implementation project(':third-party:count-min-sketch')

View File

@ -14,9 +14,9 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:config')
implementation project(':code:common:service')
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:common:process')
implementation project(':code:processes:converting-process:ft-keyword-extraction')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:term-frequency-dict')

View File

@ -2,10 +2,10 @@ package nu.marginalia.atags.source;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.process.ProcessConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -4,8 +4,6 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.converting.model.CrawlPlan;
import nu.marginalia.converting.model.WorkDir;
import nu.marginalia.converting.processor.DomainProcessor;
@ -17,14 +15,14 @@ import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.process.log.WorkLogEntry;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.util.SimpleBlockingThreadPool;
@ -34,13 +32,13 @@ import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@ -50,12 +48,9 @@ import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX;
public class ConverterMain extends ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class);
private final DomainProcessor processor;
private final Gson gson;
private final ProcessHeartbeat heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final SideloadSourceFactory sideloadSourceFactory;
private final int node;
public static void main(String... args) throws Exception {
@ -70,8 +65,9 @@ public class ConverterMain extends ProcessMainClass {
logger.info("Starting pipe");
converter
.fetchInstructions()
Instructions<ConvertRequest> instructions = converter.fetchInstructions(ConvertRequest.class);
converter.createAction(instructions)
.execute(converter);
logger.info("Finished");
@ -83,6 +79,65 @@ public class ConverterMain extends ProcessMainClass {
System.exit(0);
}
private Action createAction(Instructions<ConvertRequest> instructions) throws SQLException, IOException {
var request = instructions.value();
final Path inputPath = request.getInputPath();
return switch (request.action) {
case ConvertCrawlData -> {
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var processData = fileStorageService.getStorage(request.processedDataStorage);
var plan = new CrawlPlan(null,
new WorkDir(crawlData.asPath().toString(), "crawler.log"),
new WorkDir(processData.asPath().toString(), "processor.log")
);
yield new ConvertCrawlDataAction(plan, instructions);
}
case SideloadEncyclopedia -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl),
processData.asPath(),
instructions);
}
case SideloadDirtree -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadDirtree(inputPath),
processData.asPath(),
instructions);
}
case SideloadWarc -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadWarc(inputPath),
processData.asPath(),
instructions);
}
case SideloadReddit -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadReddit(inputPath),
processData.asPath(),
instructions);
}
case SideloadStackexchange -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadStackexchange(inputPath),
processData.asPath(),
instructions);
}
};
}
@Inject
public ConverterMain(
DomainProcessor processor,
@ -94,13 +149,12 @@ public class ConverterMain extends ProcessMainClass {
ProcessConfiguration processConfiguration
)
{
super(messageQueueFactory, processConfiguration, gson, CONVERTER_INBOX);
this.processor = processor;
this.gson = gson;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.sideloadSourceFactory = sideloadSourceFactory;
this.node = processConfiguration.node();
heartbeat.start();
}
@ -220,45 +274,44 @@ public class ConverterMain extends ProcessMainClass {
}
}
private abstract static class ConvertRequest {
private final MqMessage message;
private final MqSingleShotInbox inbox;
private abstract static class Action {
final Instructions<?> instructions;
private ConvertRequest(MqMessage message, MqSingleShotInbox inbox) {
this.message = message;
this.inbox = inbox;
public Action(Instructions<?> instructions) {
this.instructions = instructions;
}
public abstract void execute(ConverterMain converterMain) throws Exception;
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
instructions.ok();
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
instructions.err();
}
}
private static class SideloadAction extends ConvertRequest {
private static class SideloadAction extends Action {
private final Collection<? extends SideloadSource> sideloadSources;
private final Path workDir;
SideloadAction(SideloadSource sideloadSource,
Path workDir,
MqMessage message, MqSingleShotInbox inbox) {
super(message, inbox);
Instructions<?> instructions) {
super(instructions);
this.sideloadSources = List.of(sideloadSource);
this.workDir = workDir;
}
SideloadAction(Collection<? extends SideloadSource> sideloadSources,
Path workDir,
MqMessage message, MqSingleShotInbox inbox) {
super(message, inbox);
Instructions<?> instructions) {
super(instructions);
this.sideloadSources = sideloadSources;
this.workDir = workDir;
}
@Override
public void execute(ConverterMain converterMain) throws Exception {
try {
@ -272,11 +325,12 @@ public class ConverterMain extends ProcessMainClass {
}
}
private static class ConvertCrawlDataAction extends ConvertRequest {
private static class ConvertCrawlDataAction extends Action {
private final CrawlPlan plan;
private ConvertCrawlDataAction(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) {
super(message, inbox);
private ConvertCrawlDataAction(CrawlPlan plan,
Instructions<?> instructions) {
super(instructions);
this.plan = plan;
}
@ -294,94 +348,4 @@ public class ConverterMain extends ProcessMainClass {
}
}
private ConvertRequest fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, node, UUID.randomUUID());
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.converting.ConvertRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
try {
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class);
// will be null on ConvertCrawlData
final Path inputPath = request.getInputPath();
return switch (request.action) {
case ConvertCrawlData -> {
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var processData = fileStorageService.getStorage(request.processedDataStorage);
var plan = new CrawlPlan(null,
new WorkDir(crawlData.asPath().toString(), "crawler.log"),
new WorkDir(processData.asPath().toString(), "processor.log")
);
yield new ConvertCrawlDataAction(plan, msg, inbox);
}
case SideloadEncyclopedia -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl),
processData.asPath(),
msg, inbox);
}
case SideloadDirtree -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadDirtree(inputPath),
processData.asPath(),
msg, inbox);
}
case SideloadWarc -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadWarc(inputPath),
processData.asPath(),
msg, inbox);
}
case SideloadReddit -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadReddit(inputPath),
processData.asPath(),
msg, inbox);
}
case SideloadStackexchange -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadStackexchange(inputPath),
processData.asPath(),
msg, inbox);
}
};
}
catch (Exception ex) {
inbox.sendResponse(msg, MqInboxResponse.err(ex.getClass().getSimpleName() + ": " + ex.getMessage()));
throw ex;
}
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
}

View File

@ -3,9 +3,9 @@ package nu.marginalia.converting;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import nu.marginalia.LanguageModels;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.WmsaHome;
import nu.marginalia.converting.processor.ConverterDomainTypes;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.service.module.ServiceConfiguration;
import org.mockito.Mockito;

View File

@ -2,10 +2,10 @@ package nu.marginalia.converting.sideload.reddit;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.converting.ConverterModule;
import nu.marginalia.converting.processor.ConverterDomainTypes;
import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.process.ProcessConfiguration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

View File

@ -21,7 +21,6 @@ tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:process')
implementation project(':code:common:db')
implementation project(':code:common:model')

View File

@ -5,8 +5,6 @@ import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks;
@ -25,15 +23,15 @@ import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.util.SimpleBlockingThreadPool;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
@ -46,8 +44,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.Security;
import java.sql.SQLException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -59,14 +59,12 @@ public class CrawlerMain extends ProcessMainClass {
private final UserAgent userAgent;
private final ProcessHeartbeatImpl heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final DomainProber domainProber;
private final FileStorageService fileStorageService;
private final AnchorTagsSourceFactory anchorTagsSourceFactory;
private final WarcArchiverFactory warcArchiverFactory;
private final HikariDataSource dataSource;
private final DomainBlacklist blacklist;
private final Gson gson;
private final int node;
private final SimpleBlockingThreadPool pool;
@ -96,16 +94,17 @@ public class CrawlerMain extends ProcessMainClass {
HikariDataSource dataSource,
DomainBlacklist blacklist,
Gson gson) throws InterruptedException {
super(messageQueueFactory, processConfiguration, gson, CRAWLER_INBOX);
this.userAgent = userAgent;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.domainProber = domainProber;
this.fileStorageService = fileStorageService;
this.anchorTagsSourceFactory = anchorTagsSourceFactory;
this.warcArchiverFactory = warcArchiverFactory;
this.dataSource = dataSource;
this.blacklist = blacklist;
this.gson = gson;
this.node = processConfiguration.node();
pool = new SimpleBlockingThreadPool("CrawlerPool",
@ -144,13 +143,14 @@ public class CrawlerMain extends ProcessMainClass {
);
var crawler = injector.getInstance(CrawlerMain.class);
var instructions = crawler.fetchInstructions();
var instructions = crawler.fetchInstructions(nu.marginalia.mqapi.crawling.CrawlRequest.class);
try {
if (instructions.targetDomainName != null) {
crawler.runForSingleDomain(instructions.targetDomainName, instructions.outputDir);
var req = instructions.value();
if (req.targetDomainName != null) {
crawler.runForSingleDomain(req.targetDomainName, req.crawlStorage);
}
else {
crawler.runForDatabaseDomains(instructions.outputDir);
crawler.runForDatabaseDomains(req.crawlStorage);
}
instructions.ok();
} catch (Exception ex) {
@ -166,6 +166,10 @@ public class CrawlerMain extends ProcessMainClass {
System.exit(0);
}
public void runForDatabaseDomains(FileStorageId fileStorageId) throws Exception {
runForDatabaseDomains(fileStorageService.getStorage(fileStorageId).asPath());
}
public void runForDatabaseDomains(Path outputDir) throws Exception {
heartbeat.start();
@ -285,6 +289,11 @@ public class CrawlerMain extends ProcessMainClass {
}
}
public void runForSingleDomain(String targetDomainName, FileStorageId fileStorageId) throws Exception {
runForSingleDomain(targetDomainName, fileStorageService.getStorage(fileStorageId).asPath());
}
public void runForSingleDomain(String targetDomainName, Path outputDir) throws Exception {
heartbeat.start();
@ -410,70 +419,6 @@ public class CrawlerMain extends ProcessMainClass {
}
private static class CrawlRequest {
private final Path outputDir;
private final MqMessage message;
private final MqSingleShotInbox inbox;
private final String targetDomainName;
CrawlRequest(String targetDomainName,
Path outputDir,
MqMessage message,
MqSingleShotInbox inbox)
{
this.message = message;
this.inbox = inbox;
this.outputDir = outputDir;
this.targetDomainName = targetDomainName;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private CrawlRequest fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(CRAWLER_INBOX, node, UUID.randomUUID());
logger.info("Waiting for instructions");
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.crawling.CrawlRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class);
var crawlStorage = fileStorageService.getStorage(request.crawlStorage);
return new CrawlRequest(
request.targetDomainName,
crawlStorage.asPath(),
msg,
inbox);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
public record CrawlSpecRecord(@NotNull String domain, int crawlDepth, @NotNull List<String> urls) {
public CrawlSpecRecord(String domain, int crawlDepth) {

View File

@ -1,8 +1,8 @@
package nu.marginalia.crawl.warc;
import com.google.inject.Inject;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.process.ProcessConfiguration;
import org.apache.commons.io.IOUtils;
import java.io.IOException;

View File

@ -20,7 +20,6 @@ dependencies {
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:common:config')
implementation project(':code:common:process')
implementation project(':code:index:api')
implementation project(':code:processes:crawling-process:ft-content-type')
implementation project(':code:libraries:language-processing')

View File

@ -1,24 +1,33 @@
plugins {
id 'java'
id "de.undercouch.download" version "5.1.0"
id 'application'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
application {
mainClass = 'nu.marginalia.task.ExportTaskMain'
applicationName = 'export-task-process'
}
tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:config')
implementation project(':code:common:process')
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:common:service')
implementation project(':code:common:config')
implementation project(':code:libraries:message-queue')
implementation project(':code:functions:link-graph:api')
implementation project(':code:processes:process-mq-api')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:term-frequency-dict')
implementation project(':code:libraries:blocking-thread-pool')
@ -28,20 +37,32 @@ dependencies {
implementation project(':code:processes:converting-process')
implementation project(':third-party:commons-codec')
implementation libs.bundles.slf4j
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
implementation libs.roaringbitmap
implementation libs.trove
implementation libs.fastutil
implementation libs.bundles.mariadb
implementation libs.gson
implementation libs.commons.lang3
implementation libs.commons.io
implementation libs.commons.compress
implementation libs.notnull
implementation libs.commons.codec
implementation libs.jsoup
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation libs.commons.codec
testImplementation 'org.testcontainers:mariadb:1.17.4'
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
testImplementation project(':code:libraries:test-helpers')
}

View File

@ -0,0 +1,127 @@
package nu.marginalia.adjacencies;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.api.linkgraph.AggregateLinkGraphClient;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import static nu.marginalia.adjacencies.SparseBitVector.andCardinality;
import static nu.marginalia.adjacencies.SparseBitVector.weightedProduct;
public class WebsiteAdjacenciesCalculator {
private final AggregateLinkGraphClient domainLinksClient;
private final ProcessConfiguration configuration;
private final HikariDataSource dataSource;
public AdjacenciesData adjacenciesData;
public DomainAliases domainAliases;
private static final Logger logger = LoggerFactory.getLogger(WebsiteAdjacenciesCalculator.class);
float[] weights;
@Inject
public WebsiteAdjacenciesCalculator(AggregateLinkGraphClient domainLinksClient,
ProcessConfiguration configuration,
HikariDataSource dataSource) {
this.domainLinksClient = domainLinksClient;
this.configuration = configuration;
this.dataSource = dataSource;
}
public void export() throws Exception {
try (var processHeartbeat = new ProcessHeartbeatImpl(configuration, dataSource)) {
domainAliases = new DomainAliases(dataSource);
adjacenciesData = new AdjacenciesData(domainLinksClient, domainAliases);
weights = adjacenciesData.getWeights();
AdjacenciesLoader loader = new AdjacenciesLoader(dataSource);
var executor = Executors.newFixedThreadPool(16);
int total = adjacenciesData.getIdsList().size();
AtomicInteger progress = new AtomicInteger(0);
IntStream.of(adjacenciesData.getIdsList().toArray()).parallel()
.filter(domainAliases::isNotAliased)
.forEach(id -> {
findAdjacent(id, loader::load);
processHeartbeat.setProgress(progress.incrementAndGet() / (double) total);
});
executor.shutdown();
System.out.println("Waiting for wrap-up");
loader.stop();
}
}
public void findAdjacent(int domainId, Consumer<DomainSimilarities> andThen) {
findAdjacentDtoS(domainId, andThen);
}
double cosineSimilarity(SparseBitVector a, SparseBitVector b) {
double andCardinality = andCardinality(a, b);
andCardinality /= Math.sqrt(a.getCardinality());
andCardinality /= Math.sqrt(b.getCardinality());
return andCardinality;
}
double expensiveCosineSimilarity(SparseBitVector a, SparseBitVector b) {
return weightedProduct(weights, a, b) / Math.sqrt(a.mulAndSum(weights) * b.mulAndSum(weights));
}
public record DomainSimilarities(int domainId, List<DomainSimilarity> similarities) {}
public record DomainSimilarity(int domainId, double value) {}
private void findAdjacentDtoS(int domainId, Consumer<DomainSimilarities> andThen) {
var vector = adjacenciesData.getVector(domainId);
if (vector == null || !vector.cardinalityExceeds(10)) {
return;
}
List<DomainSimilarity> similarities = new ArrayList<>(1000);
var items = adjacenciesData.getCandidates(vector);
int cardMin = Math.max(2, (int) (0.01 * vector.getCardinality()));
items.forEach(id -> {
var otherVec = adjacenciesData.getVector(id);
if (null == otherVec || otherVec == vector)
return true;
if (otherVec.getCardinality() < cardMin)
return true;
double similarity = cosineSimilarity(vector, otherVec);
if (similarity > 0.1) {
var recalculated = expensiveCosineSimilarity(vector, otherVec);
if (recalculated > 0.1) {
similarities.add(new DomainSimilarity(id, recalculated));
}
}
return true;
});
if (similarities.size() > 128) {
similarities.sort(Comparator.comparing(DomainSimilarity::value));
similarities.subList(0, similarities.size() - 128).clear();
}
andThen.accept(new DomainSimilarities(domainId, similarities));
}
}

View File

@ -0,0 +1,82 @@
package nu.marginalia.task;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator;
import nu.marginalia.extractor.AtagExporter;
import nu.marginalia.extractor.FeedExporter;
import nu.marginalia.extractor.SampleDataExporter;
import nu.marginalia.extractor.TermFrequencyExporter;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExportTasksMain extends ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(ExportTasksMain.class);
private final AtagExporter atagExporter;
private final FeedExporter feedExporter;
private final SampleDataExporter sampleDataExporter;
private final TermFrequencyExporter termFrequencyExporter;
private final WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator;
public static void main(String[] args) throws Exception {
var injector = Guice.createInjector(
new ServiceDiscoveryModule(),
new ProcessConfigurationModule("export-tasks"),
new DatabaseModule(false)
);
var exportTasks = injector.getInstance(ExportTasksMain.class);
Instructions<ExportTaskRequest> instructions = exportTasks.fetchInstructions(ExportTaskRequest.class);
try {
exportTasks.run(instructions.value());
instructions.ok();
}
catch (Exception e) {
logger.error("Error running export task", e);
instructions.err();
}
}
@Inject
public ExportTasksMain(MessageQueueFactory messageQueueFactory,
ProcessConfiguration config,
AtagExporter atagExporter,
FeedExporter feedExporter,
SampleDataExporter sampleDataExporter,
TermFrequencyExporter termFrequencyExporter,
Gson gson, WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator)
{
super(messageQueueFactory, config, gson, ProcessInboxNames.EXPORT_TASK_INBOX);
this.atagExporter = atagExporter;
this.feedExporter = feedExporter;
this.sampleDataExporter = sampleDataExporter;
this.termFrequencyExporter = termFrequencyExporter;
this.websiteAdjacenciesCalculator = websiteAdjacenciesCalculator;
}
private void run(ExportTaskRequest request) throws Exception {
switch (request.task) {
case ATAGS: atagExporter.export(request.crawlId, request.destId); break;
case FEEDS: feedExporter.export(request.crawlId, request.destId); break;
case TERM_FREQ: termFrequencyExporter.export(request.crawlId, request.destId); break;
case SAMPLE_DATA: sampleDataExporter.export(request.crawlId, request.destId, request.size, request.name); break;
case ADJACENCIES: websiteAdjacenciesCalculator.export(); break;
}
}
}

View File

@ -22,7 +22,6 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:processes:process-mq-api')
implementation project(':code:common:process')
implementation project(':code:common:service')
implementation project(':code:common:db')
implementation project(':code:common:config')

View File

@ -1,11 +1,8 @@
package nu.marginalia.index;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import nu.marginalia.IndexLocations;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.index.construction.full.FullIndexConstructor;
import nu.marginalia.index.construction.prio.PrioIndexConstructor;
import nu.marginalia.index.domainrankings.DomainRankings;
@ -15,13 +12,11 @@ import nu.marginalia.index.journal.IndexJournal;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.mqapi.index.CreateIndexRequest;
import nu.marginalia.mqapi.index.IndexName;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
import org.slf4j.Logger;
@ -31,8 +26,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX;
@ -40,15 +33,12 @@ import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX;
public class IndexConstructorMain extends ProcessMainClass {
private final FileStorageService fileStorageService;
private final ProcessHeartbeatImpl heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final DomainRankings domainRankings;
private final int node;
private static final Logger logger = LoggerFactory.getLogger(IndexConstructorMain.class);
private final Gson gson = GsonFactory.get();
public static void main(String[] args) throws Exception {
CreateIndexInstructions instructions = null;
public static void main(String[] args) throws Exception {
Instructions<CreateIndexRequest> instructions = null;
try {
new org.mariadb.jdbc.Driver();
@ -58,9 +48,8 @@ public class IndexConstructorMain extends ProcessMainClass {
new DatabaseModule(false))
.getInstance(IndexConstructorMain.class);
instructions = main.fetchInstructions();
main.run(instructions);
instructions = main.fetchInstructions(CreateIndexRequest.class);
main.run(instructions.value());
instructions.ok();
}
catch (Exception ex) {
@ -85,17 +74,17 @@ public class IndexConstructorMain extends ProcessMainClass {
ProcessConfiguration processConfiguration,
DomainRankings domainRankings) {
super(messageQueueFactory, processConfiguration, GsonFactory.get(), INDEX_CONSTRUCTOR_INBOX);
this.fileStorageService = fileStorageService;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.domainRankings = domainRankings;
this.node = processConfiguration.node();
}
private void run(CreateIndexInstructions instructions) throws SQLException, IOException {
private void run(CreateIndexRequest instructions) throws SQLException, IOException {
heartbeat.start();
switch (instructions.name) {
switch (instructions.indexName()) {
case FORWARD -> createForwardIndex();
case REVERSE_FULL -> createFullReverseIndex();
case REVERSE_PRIO -> createPrioReverseIndex();
@ -171,52 +160,4 @@ public class IndexConstructorMain extends ProcessMainClass {
docId);
}
private static class CreateIndexInstructions {
public final IndexName name;
private final MqSingleShotInbox inbox;
private final MqMessage message;
private CreateIndexInstructions(IndexName name, MqSingleShotInbox inbox, MqMessage message) {
this.name = name;
this.inbox = inbox;
this.message = message;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private CreateIndexInstructions fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(INDEX_CONSTRUCTOR_INBOX, node, UUID.randomUUID());
logger.info("Waiting for instructions");
var msgOpt = getMessage(inbox, CreateIndexRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
var payload = gson.fromJson(msg.payload(), CreateIndexRequest.class);
var name = payload.indexName();
return new CreateIndexInstructions(name, inbox, msg);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
}

View File

@ -21,7 +21,6 @@ tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:process')
implementation project(':code:common:db')
implementation project(':code:common:model')

View File

@ -4,8 +4,6 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.WmsaHome;
import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.converting.ConverterModule;
@ -21,12 +19,11 @@ import nu.marginalia.loading.domains.DbDomainIdRegistry;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.mqapi.crawling.LiveCrawlRequest;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.storage.FileStorageService;
@ -38,11 +35,11 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Security;
import java.sql.SQLException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static nu.marginalia.mqapi.ProcessInboxNames.LIVE_CRAWLER_INBOX;
@ -52,16 +49,13 @@ public class LiveCrawlerMain extends ProcessMainClass {
LoggerFactory.getLogger(LiveCrawlerMain.class);
private final FeedsClient feedsClient;
private final Gson gson;
private final ProcessHeartbeat heartbeat;
private final DbDomainQueries domainQueries;
private final DomainBlacklist domainBlacklist;
private final MessageQueueFactory messageQueueFactory;
private final DomainProcessor domainProcessor;
private final FileStorageService fileStorageService;
private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService;
private final int node;
@Inject
public LiveCrawlerMain(FeedsClient feedsClient,
@ -77,13 +71,12 @@ public class LiveCrawlerMain extends ProcessMainClass {
DocumentLoaderService documentLoaderService)
throws Exception
{
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
this.feedsClient = feedsClient;
this.gson = gson;
this.heartbeat = heartbeat;
this.domainQueries = domainQueries;
this.domainBlacklist = domainBlacklist;
this.messageQueueFactory = messageQueueFactory;
this.node = config.node();
this.domainProcessor = domainProcessor;
this.fileStorageService = fileStorageService;
this.keywordLoaderService = keywordLoaderService;
@ -117,7 +110,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
);
var crawler = injector.getInstance(LiveCrawlerMain.class);
LiveCrawlInstructions instructions = crawler.fetchInstructions();
Instructions<LiveCrawlRequest> instructions = crawler.fetchInstructions(LiveCrawlRequest.class);
try{
crawler.run();
@ -225,57 +218,4 @@ public class LiveCrawlerMain extends ProcessMainClass {
}
}
private LiveCrawlInstructions fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(LIVE_CRAWLER_INBOX, node, UUID.randomUUID());
logger.info("Waiting for instructions");
var msgOpt = getMessage(inbox, LiveCrawlRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
// for live crawl, request is empty for now
LiveCrawlRequest request = gson.fromJson(msg.payload(), LiveCrawlRequest.class);
return new LiveCrawlInstructions(msg, inbox);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
private static class LiveCrawlInstructions {
private final MqMessage message;
private final MqSingleShotInbox inbox;
LiveCrawlInstructions(MqMessage message,
MqSingleShotInbox inbox)
{
this.message = message;
this.inbox = inbox;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
}

View File

@ -6,10 +6,10 @@ import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Names;
import nu.marginalia.IndexLocations;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.service.ServiceId;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;

View File

@ -20,7 +20,6 @@ tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:process')
implementation project(':code:processes:process-mq-api')
implementation project(':code:index:api')
implementation project(':code:common:model')

View File

@ -4,8 +4,6 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService;
@ -13,26 +11,21 @@ import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.loading.domains.DomainLoaderService;
import nu.marginalia.loading.links.DomainLinksLoaderService;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.mqapi.loading.LoadRequest;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX;
@ -40,15 +33,12 @@ public class LoaderMain extends ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(LoaderMain.class);
private final ProcessHeartbeatImpl heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final DocumentDbWriter documentDbWriter;
private final DomainLoaderService domainService;
private final DomainLinksLoaderService linksService;
private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService;
private final int node;
private final Gson gson;
public static void main(String... args) {
try {
@ -62,7 +52,7 @@ public class LoaderMain extends ProcessMainClass {
var instance = injector.getInstance(LoaderMain.class);
var instructions = instance.fetchInstructions();
var instructions = instance.fetchInstructions(LoadRequest.class);
logger.info("Instructions received");
instance.run(instructions);
}
@ -83,22 +73,27 @@ public class LoaderMain extends ProcessMainClass {
ProcessConfiguration processConfiguration,
Gson gson
) {
this.node = processConfiguration.node();
super(messageQueueFactory, processConfiguration, gson, LOADER_INBOX);
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.documentDbWriter = documentDbWriter;
this.domainService = domainService;
this.linksService = linksService;
this.keywordLoaderService = keywordLoaderService;
this.documentLoaderService = documentLoaderService;
this.gson = gson;
heartbeat.start();
}
void run(LoadRequest instructions) throws Throwable {
LoaderInputData inputData = instructions.inputData;
void run(Instructions<LoadRequest> instructions) throws Throwable {
List<Path> inputSources = new ArrayList<>();
for (var storageId : instructions.value().inputProcessDataStorageIds) {
inputSources.add(fileStorageService.getStorage(storageId).asPath());
}
var inputData = new LoaderInputData(inputSources);
DomainIdRegistry domainIdRegistry = domainService.getOrCreateDomainIds(heartbeat, inputData);
@ -134,67 +129,5 @@ public class LoaderMain extends ProcessMainClass {
System.exit(0);
}
private static class LoadRequest {
private final LoaderInputData inputData;
private final MqMessage message;
private final MqSingleShotInbox inbox;
LoadRequest(LoaderInputData inputData, MqMessage message, MqSingleShotInbox inbox) {
this.inputData = inputData;
this.message = message;
this.inbox = inbox;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private LoadRequest fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(LOADER_INBOX, node, UUID.randomUUID());
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName());
if (msgOpt.isEmpty())
throw new RuntimeException("No instruction received in inbox");
var msg = msgOpt.get();
if (!nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName().equals(msg.function())) {
throw new RuntimeException("Unexpected message in inbox: " + msg);
}
try {
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.loading.LoadRequest.class);
List<Path> inputSources = new ArrayList<>();
for (var storageId : request.inputProcessDataStorageIds) {
inputSources.add(fileStorageService.getStorage(storageId).asPath());
}
return new LoadRequest(new LoaderInputData(inputSources), msg, inbox);
}
catch (Exception ex) {
inbox.sendResponse(msg, new MqInboxResponse("FAILED", MqMessageState.ERR));
throw ex;
}
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
}

View File

@ -3,11 +3,11 @@ package nu.marginalia.loading.domains;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.processed.SlopDomainLinkRecord;
import nu.marginalia.model.processed.SlopDomainRecord;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.slop.SlopTable;

View File

@ -6,5 +6,7 @@ public class ProcessInboxNames {
public static final String CRAWLER_INBOX = "crawler";
public static final String LIVE_CRAWLER_INBOX = "live-crawler";
public static final String EXPORT_TASK_INBOX = "export-task";
public static final String INDEX_CONSTRUCTOR_INBOX = "index_constructor";
}

View File

@ -0,0 +1,57 @@
package nu.marginalia.mqapi.tasks;
import nu.marginalia.storage.model.FileStorageId;
public class ExportTaskRequest {
public enum Task {
ATAGS,
FEEDS,
TERM_FREQ,
SAMPLE_DATA,
ADJACENCIES,
}
public Task task;
public FileStorageId crawlId;
public FileStorageId destId;
public int size;
public String name;
public ExportTaskRequest(Task task) {
this.task = task;
}
public static ExportTaskRequest atags(FileStorageId crawlId, FileStorageId destId) {
ExportTaskRequest request = new ExportTaskRequest(Task.ATAGS);
request.crawlId = crawlId;
request.destId = destId;
return request;
}
public static ExportTaskRequest feeds(FileStorageId crawlId, FileStorageId destId) {
ExportTaskRequest request = new ExportTaskRequest(Task.FEEDS);
request.crawlId = crawlId;
request.destId = destId;
return request;
}
public static ExportTaskRequest termFreq(FileStorageId crawlId, FileStorageId destId) {
ExportTaskRequest request = new ExportTaskRequest(Task.TERM_FREQ);
request.crawlId = crawlId;
request.destId = destId;
return request;
}
public static ExportTaskRequest sampleData(FileStorageId crawlId, FileStorageId destId, int size, String name) {
ExportTaskRequest request = new ExportTaskRequest(Task.SAMPLE_DATA);
request.crawlId = crawlId;
request.destId = destId;
request.size = size;
request.name = name;
return request;
}
public static ExportTaskRequest adjacencies() {
return new ExportTaskRequest(Task.ADJACENCIES);
}
}

View File

@ -1,49 +0,0 @@
plugins {
id 'java'
id 'application'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
application {
mainClass = 'nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator'
applicationName = 'website-adjacencies-calculator'
}
tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:common:process')
implementation project(':code:common:service')
implementation project(':code:functions:link-graph:api')
implementation libs.bundles.slf4j
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
implementation libs.roaringbitmap
implementation libs.trove
implementation libs.fastutil
implementation libs.bundles.mariadb
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation libs.commons.codec
testImplementation 'org.testcontainers:mariadb:1.17.4'
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
testImplementation project(':code:libraries:test-helpers')
}

View File

@ -1,198 +0,0 @@
package nu.marginalia.adjacencies;
import com.google.inject.Guice;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.api.linkgraph.AggregateLinkGraphClient;
import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import static nu.marginalia.adjacencies.SparseBitVector.andCardinality;
import static nu.marginalia.adjacencies.SparseBitVector.weightedProduct;
public class WebsiteAdjacenciesCalculator extends ProcessMainClass {
private final HikariDataSource dataSource;
public AdjacenciesData adjacenciesData;
public DomainAliases domainAliases;
private static final Logger logger = LoggerFactory.getLogger(WebsiteAdjacenciesCalculator.class);
float[] weights;
public WebsiteAdjacenciesCalculator(AggregateLinkGraphClient domainLinksClient, HikariDataSource dataSource) throws SQLException {
this.dataSource = dataSource;
domainAliases = new DomainAliases(dataSource);
adjacenciesData = new AdjacenciesData(domainLinksClient, domainAliases);
weights = adjacenciesData.getWeights();
}
public void tryDomains(String... domainName) {
var dataStoreDao = new DbDomainQueries(dataSource);
System.out.println(Arrays.toString(domainName));
int[] domainIds = Arrays.stream(domainName).map(EdgeDomain::new)
.mapToInt(dataStoreDao::getDomainId)
.map(domainAliases::deAlias)
.toArray();
for (int domainId : domainIds) {
findAdjacentDtoS(domainId, similarities -> {
for (var similarity : similarities.similarities()) {
System.out.println(dataStoreDao.getDomain(similarity.domainId).map(Object::toString).orElse("") + " " + prettyPercent(similarity.value));
}
});
}
}
private String prettyPercent(double val) {
return String.format("%2.2f%%", 100. * val);
}
public void loadAll(ProcessHeartbeat processHeartbeat) throws InterruptedException {
AdjacenciesLoader loader = new AdjacenciesLoader(dataSource);
var executor = Executors.newFixedThreadPool(16);
int total = adjacenciesData.getIdsList().size();
AtomicInteger progress = new AtomicInteger(0);
IntStream.of(adjacenciesData.getIdsList().toArray()).parallel()
.filter(domainAliases::isNotAliased)
.forEach(id -> {
findAdjacent(id, loader::load);
processHeartbeat.setProgress(progress.incrementAndGet() / (double) total);
});
executor.shutdown();
System.out.println("Waiting for wrap-up");
loader.stop();
}
public void findAdjacent(int domainId, Consumer<DomainSimilarities> andThen) {
findAdjacentDtoS(domainId, andThen);
}
double cosineSimilarity(SparseBitVector a, SparseBitVector b) {
double andCardinality = andCardinality(a, b);
andCardinality /= Math.sqrt(a.getCardinality());
andCardinality /= Math.sqrt(b.getCardinality());
return andCardinality;
}
double expensiveCosineSimilarity(SparseBitVector a, SparseBitVector b) {
return weightedProduct(weights, a, b) / Math.sqrt(a.mulAndSum(weights) * b.mulAndSum(weights));
}
public record DomainSimilarities(int domainId, List<DomainSimilarity> similarities) {}
public record DomainSimilarity(int domainId, double value) {}
private void findAdjacentDtoS(int domainId, Consumer<DomainSimilarities> andThen) {
var vector = adjacenciesData.getVector(domainId);
if (vector == null || !vector.cardinalityExceeds(10)) {
return;
}
List<DomainSimilarity> similarities = new ArrayList<>(1000);
var items = adjacenciesData.getCandidates(vector);
int cardMin = Math.max(2, (int) (0.01 * vector.getCardinality()));
items.forEach(id -> {
var otherVec = adjacenciesData.getVector(id);
if (null == otherVec || otherVec == vector)
return true;
if (otherVec.getCardinality() < cardMin)
return true;
double similarity = cosineSimilarity(vector, otherVec);
if (similarity > 0.1) {
var recalculated = expensiveCosineSimilarity(vector, otherVec);
if (recalculated > 0.1) {
similarities.add(new DomainSimilarity(id, recalculated));
}
}
return true;
});
if (similarities.size() > 128) {
similarities.sort(Comparator.comparing(DomainSimilarity::value));
similarities.subList(0, similarities.size() - 128).clear();
}
andThen.accept(new DomainSimilarities(domainId, similarities));
}
public static void main(String[] args) throws SQLException, InterruptedException {
var injector = Guice.createInjector(
new DatabaseModule(false),
new ServiceDiscoveryModule());
var dataSource = injector.getInstance(HikariDataSource.class);
var lc = injector.getInstance(AggregateLinkGraphClient.class);
if (!lc.waitReady(Duration.ofSeconds(30))) {
throw new IllegalStateException("Failed to connect to domain-links");
}
var main = new WebsiteAdjacenciesCalculator(lc, dataSource);
if (args.length == 1 && "load".equals(args[0])) {
var processHeartbeat = new ProcessHeartbeatImpl(
new ProcessConfiguration("website-adjacencies-calculator", 0, UUID.randomUUID()),
dataSource
);
try {
processHeartbeat.start();
main.loadAll(processHeartbeat);
}
catch (Exception ex) {
logger.error("Failed to load", ex);
}
finally {
processHeartbeat.shutDown();
}
return;
}
for (;;) {
String domains = System.console().readLine("> ");
if (domains.isBlank())
break;
var parts = domains.split("\\s+,\\s+");
try {
main.tryDomains(parts);
}
catch (Exception ex) {
ex.printStackTrace();
}
}
}
}

View File

@ -1,8 +0,0 @@
# Website Adjacencies Calculator
This job updates the website similarity table based on the data in the domain and links-tables in the URL database.
It performs a brute force cosine similarity calculation across the entire link graph.
These adjacencies power the [explorer service](../../services-application/explorer-service) and
[random websites](../../features-search/random-websites)-functionality.

View File

@ -8,4 +8,3 @@ Externally the service is available at [https://explore2.marginalia.nu/](https:/
* [features-search/screenshots](../../features-search/screenshots)
* [features-search/random-websites](../../features-search/random-websites)
* [processes/website-adjacencies-calculator](../../processes/website-adjacencies-calculator)

View File

@ -25,7 +25,6 @@ dependencies {
// These look weird but they're needed to be able to spawn the processes
// from the executor service
implementation project(':code:processes:website-adjacencies-calculator')
implementation project(':code:processes:crawling-process')
implementation project(':code:processes:loading-process')
implementation project(':code:processes:converting-process')
@ -33,7 +32,6 @@ dependencies {
implementation project(':code:common:config')
implementation project(':code:common:model')
implementation project(':code:common:process')
implementation project(':code:common:db')
implementation project(':code:common:linkdb')
@ -48,7 +46,6 @@ dependencies {
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:ft-link-parser')
implementation project(':code:execution:data-extractors')
implementation project(':code:index:index-journal')
implementation project(':code:index:api')
implementation project(':code:processes:process-mq-api')

View File

@ -37,7 +37,6 @@ dependencies {
implementation project(':code:index:api')
testImplementation project(path: ':code:services-core:control-service')
testImplementation project(':code:common:process')
implementation libs.bundles.slf4j

View File

@ -27,7 +27,6 @@ dependencies {
implementation project(':code:common:db')
implementation project(':code:common:model')
implementation project(':code:common:config')
implementation project(':code:common:process')
implementation project(':code:common:service')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:term-frequency-dict')

View File

@ -34,7 +34,6 @@ dependencies {
implementation project(':code:common:db')
implementation project(':code:common:config')
implementation project(':code:common:linkdb')
implementation project(':code:common:process')
implementation project(':code:common:service')
implementation project(':code:common:model')

View File

@ -8,7 +8,6 @@ import com.google.inject.name.Names;
import gnu.trove.list.array.TIntArrayList;
import nu.marginalia.IndexLocations;
import nu.marginalia.LanguageModels;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.WmsaHome;
import nu.marginalia.db.DomainTypes;
import nu.marginalia.index.domainrankings.DomainRankings;
@ -18,6 +17,7 @@ import nu.marginalia.index.searchset.SearchSetsService;
import nu.marginalia.linkdb.docs.DocumentDbReader;
import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.linkgraph.io.DomainLinksWriter;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.control.FakeProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.service.ServiceId;

View File

@ -59,7 +59,6 @@ include 'code:features-search:screenshots'
include 'code:features-search:random-websites'
include 'code:processes:converting-process:ft-anchor-keywords'
include 'code:execution:data-extractors'
include 'code:processes:crawling-process:ft-crawl-blocklist'
include 'code:processes:crawling-process:ft-link-parser'
@ -74,7 +73,6 @@ include 'code:common:service'
include 'code:common:config'
include 'code:common:model'
include 'code:common:renderer'
include 'code:common:process'
include 'code:processes:converting-process'
include 'code:processes:converting-process:model'
@ -86,7 +84,8 @@ include 'code:processes:crawling-process:model'
include 'code:processes:loading-process'
include 'code:processes:index-constructor-process'
include 'code:processes:test-data'
include 'code:processes:website-adjacencies-calculator'
include 'code:processes:export-task-process'
include 'code:tools:experiment-runner'
include 'code:tools:screenshot-capture-tool'