Merge pull request #125 from MarginaliaSearch/live-search

Add near real-time crawling from RSS feeds to supplement the slower batch based crawls
This commit is contained in:
Viktor 2024-11-22 16:38:37 +00:00 committed by GitHub
commit df298df852
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
126 changed files with 2533 additions and 1110 deletions

View File

@ -3,6 +3,7 @@ package nu.marginalia.nodecfg;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.nodecfg.model.NodeProfile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,10 +21,10 @@ public class NodeConfigurationService {
this.dataSource = dataSource;
}
public NodeConfiguration create(int id, String description, boolean acceptQueries, boolean keepWarcs) throws SQLException {
public NodeConfiguration create(int id, String description, boolean acceptQueries, boolean keepWarcs, NodeProfile nodeProfile) throws SQLException {
try (var conn = dataSource.getConnection();
var is = conn.prepareStatement("""
INSERT IGNORE INTO NODE_CONFIGURATION(ID, DESCRIPTION, ACCEPT_QUERIES, KEEP_WARCS) VALUES(?, ?, ?, ?)
INSERT IGNORE INTO NODE_CONFIGURATION(ID, DESCRIPTION, ACCEPT_QUERIES, KEEP_WARCS, NODE_PROFILE) VALUES(?, ?, ?, ?, ?)
""")
)
{
@ -31,6 +32,7 @@ public class NodeConfigurationService {
is.setString(2, description);
is.setBoolean(3, acceptQueries);
is.setBoolean(4, keepWarcs);
is.setString(5, nodeProfile.name());
if (is.executeUpdate() <= 0) {
throw new IllegalStateException("Failed to insert configuration");
@ -43,7 +45,7 @@ public class NodeConfigurationService {
public List<NodeConfiguration> getAll() {
try (var conn = dataSource.getConnection();
var qs = conn.prepareStatement("""
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, DISABLED
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, NODE_PROFILE, DISABLED
FROM NODE_CONFIGURATION
""")) {
var rs = qs.executeQuery();
@ -58,6 +60,7 @@ public class NodeConfigurationService {
rs.getBoolean("AUTO_CLEAN"),
rs.getBoolean("PRECESSION"),
rs.getBoolean("KEEP_WARCS"),
NodeProfile.valueOf(rs.getString("NODE_PROFILE")),
rs.getBoolean("DISABLED")
));
}
@ -72,7 +75,7 @@ public class NodeConfigurationService {
public NodeConfiguration get(int nodeId) throws SQLException {
try (var conn = dataSource.getConnection();
var qs = conn.prepareStatement("""
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, DISABLED
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, NODE_PROFILE, DISABLED
FROM NODE_CONFIGURATION
WHERE ID=?
""")) {
@ -86,6 +89,7 @@ public class NodeConfigurationService {
rs.getBoolean("AUTO_CLEAN"),
rs.getBoolean("PRECESSION"),
rs.getBoolean("KEEP_WARCS"),
NodeProfile.valueOf(rs.getString("NODE_PROFILE")),
rs.getBoolean("DISABLED")
);
}
@ -98,7 +102,7 @@ public class NodeConfigurationService {
try (var conn = dataSource.getConnection();
var us = conn.prepareStatement("""
UPDATE NODE_CONFIGURATION
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, KEEP_WARCS=?, DISABLED=?
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, KEEP_WARCS=?, DISABLED=?, NODE_PROFILE=?
WHERE ID=?
"""))
{
@ -108,7 +112,8 @@ public class NodeConfigurationService {
us.setBoolean(4, config.includeInPrecession());
us.setBoolean(5, config.keepWarcs());
us.setBoolean(6, config.disabled());
us.setInt(7, config.node());
us.setString(7, config.profile().name());
us.setInt(8, config.node());
if (us.executeUpdate() <= 0)
throw new IllegalStateException("Failed to update configuration");

View File

@ -6,6 +6,7 @@ public record NodeConfiguration(int node,
boolean autoClean,
boolean includeInPrecession,
boolean keepWarcs,
NodeProfile profile,
boolean disabled
)
{

View File

@ -0,0 +1,28 @@
package nu.marginalia.nodecfg.model;
public enum NodeProfile {
BATCH_CRAWL,
REALTIME,
MIXED,
SIDELOAD;
public boolean isBatchCrawl() {
return this == BATCH_CRAWL;
}
public boolean isRealtime() {
return this == REALTIME;
}
public boolean isMixed() {
return this == MIXED;
}
public boolean isSideload() {
return this == SIDELOAD;
}
public boolean permitBatchCrawl() {
return isBatchCrawl() ||isMixed();
}
public boolean permitSideload() {
return isMixed() || isSideload();
}
}

View File

@ -2,6 +2,7 @@ package nu.marginalia.nodecfg;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.nodecfg.model.NodeProfile;
import nu.marginalia.test.TestMigrationLoader;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
@ -46,8 +47,8 @@ public class NodeConfigurationServiceTest {
@Test
public void test() throws SQLException {
var a = nodeConfigurationService.create(1, "Test", false, false);
var b = nodeConfigurationService.create(2, "Foo", true, false);
var a = nodeConfigurationService.create(1, "Test", false, false, NodeProfile.MIXED);
var b = nodeConfigurationService.create(2, "Foo", true, false, NodeProfile.MIXED);
assertEquals(1, a.node());
assertEquals("Test", a.description());

View File

@ -0,0 +1 @@
ALTER TABLE WMSA_prod.NODE_CONFIGURATION ADD COLUMN NODE_PROFILE VARCHAR(255) DEFAULT 'MIXED';

View File

@ -1,34 +0,0 @@
plugins {
id 'java'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation libs.notnull
implementation libs.bundles.slf4j
testImplementation libs.bundles.slf4j.test
implementation libs.guava
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
implementation libs.bundles.mariadb
implementation libs.commons.lang3
implementation libs.snakeyaml
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}

View File

@ -1,7 +0,0 @@
package nu.marginalia.process.control;
public interface ProcessAdHocTaskHeartbeat extends AutoCloseable {
void progress(String step, int progress, int total);
void close();
}

View File

@ -1,4 +0,0 @@
# Process
Basic functionality for a Process. Processes must include this dependency to ensure
their loggers are configured properly!

View File

@ -1,9 +0,0 @@
log4j2.isThreadContextMapInheritable=true
status = info
appender.console.type = Console
appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %highlight{%-5level}{FATAL=red, ERROR=red, WARN=yellow} %c{1}- %msg{nolookups}%n
appender.console.filter.http.type = MarkerFilter
rootLogger.level = info
rootLogger.appenderRef.console.ref = LogToConsole

View File

@ -1,4 +1,4 @@
package nu.marginalia;
package nu.marginalia.process;
import java.util.UUID;

View File

@ -1,4 +1,4 @@
package nu.marginalia;
package nu.marginalia.process;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;

View File

@ -0,0 +1,102 @@
package nu.marginalia.process;
import com.google.gson.Gson;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.service.ConfigLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public abstract class ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(ProcessMainClass.class);
private final MessageQueueFactory messageQueueFactory;
private final int node;
private final String inboxName;
static {
// Load global config ASAP
ConfigLoader.loadConfig(
ConfigLoader.getConfigPath("system")
);
}
private final Gson gson;
public ProcessMainClass(MessageQueueFactory messageQueueFactory,
ProcessConfiguration config,
Gson gson,
String inboxName
) {
this.gson = gson;
new org.mariadb.jdbc.Driver();
this.messageQueueFactory = messageQueueFactory;
this.node = config.node();
this.inboxName = inboxName;
}
protected <T> Instructions<T> fetchInstructions(Class<T> requestType) throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(inboxName, node, UUID.randomUUID());
logger.info("Waiting for instructions");
var msgOpt = getMessage(inbox, requestType.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
// for live crawl, request is empty for now
T request = gson.fromJson(msg.payload(), requestType);
return new Instructions<>(msg, inbox, request);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws InterruptedException, SQLException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
protected static class Instructions<T> {
private final MqMessage message;
private final MqSingleShotInbox inbox;
private final T value;
Instructions(MqMessage message, MqSingleShotInbox inbox, T value)
{
this.message = message;
this.inbox = inbox;
this.value = value;
}
public T value() {
return value;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
}

View File

@ -3,6 +3,8 @@ package nu.marginalia.process.control;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
/** Dummy implementation of ProcessHeartbeat that does nothing */
public class FakeProcessHeartbeat implements ProcessHeartbeat {
private static final Logger logger = LoggerFactory.getLogger(FakeProcessHeartbeat.class);
@ -30,6 +32,11 @@ public class FakeProcessHeartbeat implements ProcessHeartbeat {
logger.info("Progress: {}, {}/{}", step, progress, total);
}
@Override
public <T> Iterable<T> wrap(String step, Collection<T> collection) {
return collection;
}
@Override
public void close() {}
};

View File

@ -0,0 +1,12 @@
package nu.marginalia.process.control;
import java.util.Collection;
public interface ProcessAdHocTaskHeartbeat extends AutoCloseable {
void progress(String step, int progress, int total);
/** Wrap a collection to provide heartbeat progress updates as it's iterated through */
<T> Iterable<T> wrap(String step, Collection<T> collection);
void close();
}

View File

@ -2,11 +2,13 @@ package nu.marginalia.process.control;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.process.ProcessConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -69,6 +71,35 @@ public class ProcessAdHocTaskHeartbeatImpl implements AutoCloseable, ProcessAdHo
logger.info("ProcessTask {} progress: {}%", taskBase, progress);
}
/** Wrap a collection to provide heartbeat progress updates as it's iterated through */
@Override
public <T> Iterable<T> wrap(String step, Collection<T> collection) {
return () -> new Iterator<>() {
private final Iterator<T> base = collection.iterator();
private final int size = collection.size();
private final int updateInterval = Math.max(1, size / 100); // update every 1% of the collection, or at least once
private int pos = 0;
@Override
public boolean hasNext() {
boolean ret = base.hasNext();
if (!ret) {
progress(step, size, size);
}
return ret;
}
@Override
public T next() {
// update every 1% of the collection, to avoid hammering the database with updates
if (pos++ % updateInterval == 0) {
progress(step, pos, size);
}
return base.next();
}
};
}
public void shutDown() {
if (!running)
return;
@ -185,6 +216,5 @@ public class ProcessAdHocTaskHeartbeatImpl implements AutoCloseable, ProcessAdHo
public void close() {
shutDown();
}
}

View File

@ -4,17 +4,18 @@ package nu.marginalia.process.control;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.process.ProcessConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
/** This service sends a heartbeat to the database every 5 seconds.
*/
@Singleton
public class ProcessHeartbeatImpl implements ProcessHeartbeat {
public class ProcessHeartbeatImpl implements ProcessHeartbeat, Closeable {
private final Logger logger = LoggerFactory.getLogger(ProcessHeartbeatImpl.class);
private final String processName;
private final String processBase;
@ -169,5 +170,9 @@ public class ProcessHeartbeatImpl implements ProcessHeartbeat {
}
}
}
public void close() {
shutDown();
}
}

View File

@ -2,7 +2,7 @@ package nu.marginalia.process.control;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.process.ProcessConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -9,11 +9,11 @@ import java.util.Properties;
public class ConfigLoader {
static Path getConfigPath(String configName) {
public static Path getConfigPath(String configName) {
return WmsaHome.getHomePath().resolve("conf/properties/" + configName + ".properties");
}
static void loadConfig(Path configPath) {
public static void loadConfig(Path configPath) {
if (!Files.exists(configPath)) {
System.err.println("No config file found at " + configPath);
return;

View File

@ -1,20 +0,0 @@
package nu.marginalia.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(ProcessMainClass.class);
static {
// Load global config ASAP
ConfigLoader.loadConfig(
ConfigLoader.getConfigPath("system")
);
}
public ProcessMainClass() {
new org.mariadb.jdbc.Driver();
}
}

View File

@ -13,7 +13,10 @@ public enum ServiceId {
Dating("dating-service"),
Status("setatus-service"),
Explorer("explorer-service");
Explorer("explorer-service"),
NOT_A_SERVICE("NOT_A_SERVICE")
;
public final String serviceName;

View File

@ -4,6 +4,7 @@ import com.google.inject.Inject;
import com.google.inject.name.Named;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeProfile;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType;
import org.slf4j.Logger;
@ -56,7 +57,9 @@ public class NodeStatusWatcher {
private void setupNode() {
try {
configurationService.create(nodeId, "Node " + nodeId, true, false);
NodeProfile profile = NodeProfile.MIXED;
configurationService.create(nodeId, "Node " + nodeId, true, false, profile);
fileStorageService.createStorageBase("Index Data", Path.of("/idx"), nodeId, FileStorageBaseType.CURRENT);
fileStorageService.createStorageBase("Index Backups", Path.of("/backup"), nodeId, FileStorageBaseType.BACKUP);

View File

@ -182,4 +182,10 @@ public class ExecutorClient {
}
}
public void restartExecutorService(int node) {
channelPool.call(ExecutorApiBlockingStub::restartExecutorService)
.forNode(node)
.run(Empty.getDefaultInstance());
}
}

View File

@ -17,6 +17,8 @@ service ExecutorApi {
rpc downloadSampleData(RpcDownloadSampleData) returns (Empty) {}
rpc calculateAdjacencies(Empty) returns (Empty) {}
rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
rpc restartExecutorService(Empty) returns (Empty) {}
}
service ExecutorCrawlApi {

View File

@ -15,15 +15,15 @@ dependencies {
// These look weird but they're needed to be able to spawn the processes
// from the executor service
implementation project(':code:processes:website-adjacencies-calculator')
implementation project(':code:processes:export-task-process')
implementation project(':code:processes:crawling-process')
implementation project(':code:processes:live-crawling-process')
implementation project(':code:processes:loading-process')
implementation project(':code:processes:converting-process')
implementation project(':code:processes:index-constructor-process')
implementation project(':code:common:config')
implementation project(':code:common:model')
implementation project(':code:common:process')
implementation project(':code:common:db')
implementation project(':code:common:linkdb')
@ -42,7 +42,6 @@ dependencies {
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:crawling-process:ft-link-parser')
implementation project(':code:execution:data-extractors')
implementation project(':code:index:index-journal')
implementation project(':code:index:api')
implementation project(':code:processes:process-mq-api')

View File

@ -1,7 +0,0 @@
Contains converter-*like* extraction jobs that operate on crawled data to produce export files.
## Important classes
* [AtagExporter](java/nu/marginalia/extractor/AtagExporter.java) - extracts anchor texts from the crawled data.
* [FeedExporter](java/nu/marginalia/extractor/FeedExporter.java) - tries to find RSS/Atom feeds within the crawled data.
* [TermFrequencyExporter](java/nu/marginalia/extractor/TermFrequencyExporter.java) - exports the 'TF' part of TF-IDF.

View File

@ -1,29 +1,38 @@
package nu.marginalia.actor;
import nu.marginalia.nodecfg.model.NodeProfile;
import java.util.Set;
public enum ExecutorActor {
CRAWL,
RECRAWL,
RECRAWL_SINGLE_DOMAIN,
CONVERT_AND_LOAD,
PROC_CONVERTER_SPAWNER,
PROC_LOADER_SPAWNER,
PROC_CRAWLER_SPAWNER,
MONITOR_PROCESS_LIVENESS,
MONITOR_FILE_STORAGE,
ADJACENCY_CALCULATION,
CRAWL_JOB_EXTRACTOR,
EXPORT_DATA,
EXPORT_SEGMENTATION_MODEL,
EXPORT_ATAGS,
EXPORT_TERM_FREQUENCIES,
EXPORT_FEEDS,
PROC_INDEX_CONSTRUCTOR_SPAWNER,
CONVERT,
RESTORE_BACKUP,
EXPORT_SAMPLE_DATA,
DOWNLOAD_SAMPLE,
SCRAPE_FEEDS,
UPDATE_RSS;
CRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
RECRAWL_SINGLE_DOMAIN(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_CONVERTER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_CRAWLER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_EXPORT_TASKS_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
ADJACENCY_CALCULATION(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_SEGMENTATION_MODEL(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_ATAGS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_TERM_FREQUENCIES(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_FEEDS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
EXPORT_SAMPLE_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
DOWNLOAD_SAMPLE(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
PROC_LOADER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD),
RESTORE_BACKUP(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD),
CONVERT(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD),
CONVERT_AND_LOAD(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.REALTIME, NodeProfile.SIDELOAD),
MONITOR_PROCESS_LIVENESS(NodeProfile.BATCH_CRAWL, NodeProfile.REALTIME, NodeProfile.MIXED, NodeProfile.SIDELOAD),
MONITOR_FILE_STORAGE(NodeProfile.BATCH_CRAWL, NodeProfile.REALTIME, NodeProfile.MIXED, NodeProfile.SIDELOAD),
PROC_INDEX_CONSTRUCTOR_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.REALTIME, NodeProfile.MIXED, NodeProfile.SIDELOAD),
LIVE_CRAWL(NodeProfile.REALTIME),
PROC_LIVE_CRAWL_SPAWNER(NodeProfile.REALTIME),
SCRAPE_FEEDS(NodeProfile.REALTIME),
UPDATE_RSS(NodeProfile.REALTIME);
public String id() {
return "fsm:" + name().toLowerCase();
@ -33,4 +42,9 @@ public enum ExecutorActor {
return "fsm:" + name().toLowerCase() + ":" + node;
}
ExecutorActor(NodeProfile... profileSet) {
this.profileSet = Set.of(profileSet);
}
public Set<NodeProfile> profileSet;
}

View File

@ -10,11 +10,14 @@ import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.task.*;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@ -28,18 +31,23 @@ public class ExecutorActorControlService {
public Map<ExecutorActor, ActorPrototype> actorDefinitions = new HashMap<>();
private final int node;
private final NodeConfiguration nodeConfiguration;
private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject
public ExecutorActorControlService(MessageQueueFactory messageQueueFactory,
NodeConfigurationService configurationService,
BaseServiceParams baseServiceParams,
ConvertActor convertActor,
ConvertAndLoadActor convertAndLoadActor,
CrawlActor crawlActor,
LiveCrawlActor liveCrawlActor,
RecrawlSingleDomainActor recrawlSingleDomainActor,
RestoreBackupActor restoreBackupActor,
ConverterMonitorActor converterMonitorFSM,
CrawlerMonitorActor crawlerMonitorActor,
LiveCrawlerMonitorActor liveCrawlerMonitorActor,
LoaderMonitorActor loaderMonitor,
ProcessLivenessMonitorActor processMonitorFSM,
FileStorageMonitorActor fileStorageMonitorActor,
@ -51,16 +59,20 @@ public class ExecutorActorControlService {
ExportSampleDataActor exportSampleDataActor,
ExportTermFreqActor exportTermFrequenciesActor,
ExportSegmentationModelActor exportSegmentationModelActor,
ExportTaskMonitorActor exportTasksMonitorActor,
DownloadSampleActor downloadSampleActor,
ScrapeFeedsActor scrapeFeedsActor,
ExecutorActorStateMachines stateMachines,
UpdateRssActor updateRssActor) {
UpdateRssActor updateRssActor) throws SQLException {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
this.stateMachines = stateMachines;
this.node = baseServiceParams.configuration.node();
this.nodeConfiguration = configurationService.get(node);
register(ExecutorActor.CRAWL, crawlActor);
register(ExecutorActor.LIVE_CRAWL, liveCrawlActor);
register(ExecutorActor.RECRAWL_SINGLE_DOMAIN, recrawlSingleDomainActor);
register(ExecutorActor.CONVERT, convertActor);
@ -71,6 +83,8 @@ public class ExecutorActorControlService {
register(ExecutorActor.PROC_CONVERTER_SPAWNER, converterMonitorFSM);
register(ExecutorActor.PROC_LOADER_SPAWNER, loaderMonitor);
register(ExecutorActor.PROC_CRAWLER_SPAWNER, crawlerMonitorActor);
register(ExecutorActor.PROC_LIVE_CRAWL_SPAWNER, liveCrawlerMonitorActor);
register(ExecutorActor.PROC_EXPORT_TASKS_SPAWNER, exportTasksMonitorActor);
register(ExecutorActor.MONITOR_PROCESS_LIVENESS, processMonitorFSM);
register(ExecutorActor.MONITOR_FILE_STORAGE, fileStorageMonitorActor);
@ -91,6 +105,11 @@ public class ExecutorActorControlService {
}
private void register(ExecutorActor process, RecordActorPrototype graph) {
if (!process.profileSet.contains(nodeConfiguration.profile())) {
return;
}
var sm = new ActorStateMachine(messageQueueFactory, process.id(), node, UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function));

View File

@ -0,0 +1,29 @@
package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration;
@Singleton
public class ExportTaskMonitorActor extends AbstractProcessSpawnerActor {
@Inject
public ExportTaskMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) {
super(gson,
configuration,
persistence,
processService,
ProcessInboxNames.EXPORT_TASK_INBOX,
ProcessService.ProcessId.EXPORT_TASKS);
}
}

View File

@ -0,0 +1,29 @@
package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration;
@Singleton
public class LiveCrawlerMonitorActor extends AbstractProcessSpawnerActor {
@Inject
public LiveCrawlerMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) {
super(gson,
configuration,
persistence,
processService,
ProcessInboxNames.LIVE_CRAWLER_INBOX,
ProcessService.ProcessId.LIVE_CRAWLER);
}
}

View File

@ -10,6 +10,8 @@ import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeProfile;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.module.ServiceConfiguration;
import org.jsoup.Jsoup;
@ -39,6 +41,7 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
private final Duration pollInterval = Duration.ofHours(6);
private final ServiceEventLog eventLog;
private final NodeConfigurationService nodeConfigurationService;
private final HikariDataSource dataSource;
private final int nodeId;
@ -54,8 +57,8 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
case Initial() -> {
if (nodeId > 1) {
yield new End();
if (nodeConfigurationService.get(nodeId).profile() != NodeProfile.REALTIME) {
yield new Error("Invalid node profile for RSS update");
}
else {
yield new Wait(LocalDateTime.now().toString());
@ -177,10 +180,12 @@ public class ScrapeFeedsActor extends RecordActorPrototype {
public ScrapeFeedsActor(Gson gson,
ServiceEventLog eventLog,
ServiceConfiguration configuration,
NodeConfigurationService nodeConfigurationService,
HikariDataSource dataSource)
{
super(gson);
this.eventLog = eventLog;
this.nodeConfigurationService = nodeConfigurationService;
this.dataSource = dataSource;
this.nodeId = configuration.node();
}

View File

@ -11,6 +11,8 @@ import nu.marginalia.api.feeds.RpcFeedUpdateMode;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeProfile;
import nu.marginalia.service.module.ServiceConfiguration;
import java.time.Duration;
@ -25,13 +27,19 @@ public class UpdateRssActor extends RecordActorPrototype {
private final Duration updateInterval = Duration.ofHours(24);
private final int cleanInterval = 60;
private final NodeConfigurationService nodeConfigurationService;
private final MqPersistence persistence;
@Inject
public UpdateRssActor(Gson gson, FeedsClient feedsClient, ServiceConfiguration serviceConfiguration, MqPersistence persistence) {
public UpdateRssActor(Gson gson,
FeedsClient feedsClient,
ServiceConfiguration serviceConfiguration,
NodeConfigurationService nodeConfigurationService,
MqPersistence persistence) {
super(gson);
this.feedsClient = feedsClient;
this.nodeId = serviceConfiguration.node();
this.nodeConfigurationService = nodeConfigurationService;
this.persistence = persistence;
}
@ -55,9 +63,8 @@ public class UpdateRssActor extends RecordActorPrototype {
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial() -> {
if (nodeId > 1) {
// Only run on the first node
yield new End();
if (nodeConfigurationService.get(nodeId).profile() != NodeProfile.REALTIME) {
yield new Error("Invalid node profile for RSS update");
}
else {
// Wait for 5 minutes before starting the first update, to give the system time to start up properly

View File

@ -3,50 +3,68 @@ package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.extractor.AtagExporter;
import nu.marginalia.extractor.ExporterIf;
import nu.marginalia.storage.model.*;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState;
import nu.marginalia.storage.model.FileStorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
@Singleton
public class ExportAtagsActor extends RecordActorPrototype {
private final FileStorageService storageService;
private final ExporterIf atagExporter;
private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
public record Export(FileStorageId crawlId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId) {
this(crawlId, destId, -1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
case Export(FileStorageId crawlId) -> {
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atag-export", "Anchor Tags " + LocalDateTime.now());
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "atags-export", "Atags " + LocalDateTime.now());
if (storage == null) yield new Error("Bad storage id");
yield new Run(crawlId, storage.id());
}
case Run(FileStorageId crawlId, FileStorageId destId) -> {
case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW);
try {
atagExporter.export(crawlId, destId);
storageService.setFileStorageState(destId, FileStorageState.UNSET);
}
catch (Exception ex) {
storageService.setFileStorageState(destId, FileStorageState.DELETE);
yield new Error("Failed to export data");
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.atags(crawlId, destId));
yield new Run(crawlId, destId, newMsgId);
}
case Run(_, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed");
}
else {
storageService.setFileStorageState(destId, FileStorageState.UNSET);
yield new End();
}
}
default -> new Error();
};
}
@Override
public String describe() {
return "Export anchor tags from crawl data";
@ -55,11 +73,13 @@ public class ExportAtagsActor extends RecordActorPrototype {
@Inject
public ExportAtagsActor(Gson gson,
FileStorageService storageService,
AtagExporter atagExporter)
ProcessOutboxes processOutboxes,
ActorProcessWatcher processWatcher)
{
super(gson);
this.exportTasksOutbox = processOutboxes.getExportTasksOutbox();
this.storageService = storageService;
this.atagExporter = atagExporter;
this.processWatcher = processWatcher;
}
}

View File

@ -5,8 +5,11 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.extractor.ExporterIf;
import nu.marginalia.extractor.FeedExporter;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState;
@ -19,11 +22,17 @@ import java.time.LocalDateTime;
@Singleton
public class ExportFeedsActor extends RecordActorPrototype {
private final FileStorageService storageService;
private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final ExporterIf feedExporter;
public record Export(FileStorageId crawlId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId) {
this(crawlId, destId, -1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
@ -33,20 +42,25 @@ public class ExportFeedsActor extends RecordActorPrototype {
if (storage == null) yield new Error("Bad storage id");
yield new Run(crawlId, storage.id());
}
case Run(FileStorageId crawlId, FileStorageId destId) -> {
case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW);
try {
feedExporter.export(crawlId, destId);
storageService.setFileStorageState(destId, FileStorageState.UNSET);
}
catch (Exception ex) {
storageService.setFileStorageState(destId, FileStorageState.DELETE);
yield new Error("Failed to export data");
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.feeds(crawlId, destId));
yield new Run(crawlId, destId, newMsgId);
}
case Run(_, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed");
}
else {
storageService.setFileStorageState(destId, FileStorageState.UNSET);
yield new End();
}
}
default -> new Error();
};
}
@ -60,11 +74,13 @@ public class ExportFeedsActor extends RecordActorPrototype {
@Inject
public ExportFeedsActor(Gson gson,
FileStorageService storageService,
FeedExporter feedExporter)
ActorProcessWatcher processWatcher,
ProcessOutboxes outboxes)
{
super(gson);
this.storageService = storageService;
this.feedExporter = feedExporter;
this.processWatcher = processWatcher;
this.exportTasksOutbox = outboxes.getExportTasksOutbox();
}
}

View File

@ -5,7 +5,11 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.extractor.SampleDataExporter;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState;
@ -18,11 +22,17 @@ import java.time.LocalDateTime;
@Singleton
public class ExportSampleDataActor extends RecordActorPrototype {
private final FileStorageService storageService;
private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final SampleDataExporter dataExporter;
public record Export(FileStorageId crawlId, int size, String name) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, int size, String name) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId, int size, String name) {
this(crawlId, destId, size, name, -1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
@ -35,28 +45,29 @@ public class ExportSampleDataActor extends RecordActorPrototype {
if (storage == null) yield new Error("Bad storage id");
yield new Run(crawlId, storage.id(), size, name);
}
case Run(FileStorageId crawlId, FileStorageId destId, int size, String name) -> {
case Run(FileStorageId crawlId, FileStorageId destId, int size, String name, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW);
try {
dataExporter.export(crawlId, destId, size, name);
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.sampleData(crawlId, destId, size, name));
yield new Run(crawlId, destId, size, name, newMsgId);
}
case Run(_, FileStorageId destId, _, _, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed");
}
else {
storageService.setFileStorageState(destId, FileStorageState.UNSET);
}
catch (Exception ex) {
storageService.setFileStorageState(destId, FileStorageState.DELETE);
logger.error("Failed to export data", ex);
yield new Error("Failed to export data");
}
yield new End();
}
}
default -> new Error();
};
}
@Override
public String describe() {
return "Export RSS/Atom feeds from crawl data";
@ -65,11 +76,13 @@ public class ExportSampleDataActor extends RecordActorPrototype {
@Inject
public ExportSampleDataActor(Gson gson,
FileStorageService storageService,
SampleDataExporter dataExporter)
ProcessOutboxes processOutboxes,
ActorProcessWatcher processWatcher)
{
super(gson);
this.storageService = storageService;
this.dataExporter = dataExporter;
this.processWatcher = processWatcher;
this.exportTasksOutbox = processOutboxes.getExportTasksOutbox();
}
}

View File

@ -5,45 +5,62 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.extractor.ExporterIf;
import nu.marginalia.extractor.TermFrequencyExporter;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageState;
import nu.marginalia.storage.model.FileStorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
@Singleton
public class ExportTermFreqActor extends RecordActorPrototype {
private final FileStorageService storageService;
private final ExporterIf exporter;
private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
public record Export(FileStorageId crawlId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId, long msgId) implements ActorStep {
public Run(FileStorageId crawlId, FileStorageId destId) {
this(crawlId, destId, -1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
case Export(FileStorageId crawlId) -> {
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq-export", "Term Frequencies " + LocalDateTime.now());
var storage = storageService.allocateStorage(FileStorageType.EXPORT, "term-freq", "Term Frequencies " + LocalDateTime.now());
if (storage == null) yield new Error("Bad storage id");
yield new Run(crawlId, storage.id());
}
case Run(FileStorageId crawlId, FileStorageId destId) -> {
case Run(FileStorageId crawlId, FileStorageId destId, long msgId) when msgId < 0 -> {
storageService.setFileStorageState(destId, FileStorageState.NEW);
try {
exporter.export(crawlId, destId);
storageService.setFileStorageState(destId, FileStorageState.UNSET);
}
catch (Exception ex) {
storageService.setFileStorageState(destId, FileStorageState.DELETE);
yield new Error("Failed to export data");
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.termFreq(crawlId, destId));
yield new Run(crawlId, destId, newMsgId);
}
case Run(_, FileStorageId destId, long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (rsp.state() != MqMessageState.OK) {
storageService.flagFileForDeletion(destId);
yield new Error("Exporter failed");
}
else {
storageService.setFileStorageState(destId, FileStorageState.UNSET);
yield new End();
}
}
default -> new Error();
};
}
@ -57,11 +74,13 @@ public class ExportTermFreqActor extends RecordActorPrototype {
@Inject
public ExportTermFreqActor(Gson gson,
FileStorageService storageService,
TermFrequencyExporter exporter)
ProcessOutboxes processOutboxes,
ActorProcessWatcher processWatcher)
{
super(gson);
this.storageService = storageService;
this.exporter = exporter;
this.processWatcher = processWatcher;
this.exportTasksOutbox = processOutboxes.getExportTasksOutbox();
}
}

View File

@ -0,0 +1,108 @@
package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.IndexLocations;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorStateMachines;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.LiveCrawlRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Objects;
@Singleton
public class LiveCrawlActor extends RecordActorPrototype {
// STATES
private final ActorProcessWatcher processWatcher;
private final MqOutbox mqLiveCrawlerOutbox;
private final ExecutorActorStateMachines executorActorStateMachines;
private final FeedsClient feedsClient;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final FileStorageService fileStorageService;
public record Initial() implements ActorStep {}
public record Monitor(String feedsHash) implements ActorStep {}
public record LiveCrawl(String feedsHash, long msgId) implements ActorStep {
public LiveCrawl(String feedsHash) { this(feedsHash, -1); }
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
logger.info("{}", self);
return switch (self) {
case Initial() -> {
yield new Monitor("-");
}
case Monitor(String feedsHash) -> {
for (;;) {
String currentHash = feedsClient.getFeedDataHash();
if (!Objects.equals(currentHash, feedsHash)) {
yield new LiveCrawl(currentHash);
}
Thread.sleep(Duration.ofMinutes(15));
}
}
case LiveCrawl(String feedsHash, long msgId) when msgId < 0 -> {
// Clear the index journal before starting the crawl
Path indexJournalLocation = IndexLocations.getIndexConstructionArea(fileStorageService).resolve("index-journal");
if (Files.isDirectory(indexJournalLocation)) {
FileUtils.deleteDirectory(indexJournalLocation.toFile());
}
long id = mqLiveCrawlerOutbox.sendAsync(new LiveCrawlRequest());
yield new LiveCrawl(feedsHash, id);
}
case LiveCrawl(String feedsHash, long msgId) -> {
var rsp = processWatcher.waitResponse(mqLiveCrawlerOutbox, ProcessService.ProcessId.LIVE_CRAWLER, msgId);
if (rsp.state() != MqMessageState.OK) {
yield new Error("Crawler failed");
}
// Build the index
executorActorStateMachines.initFrom(ExecutorActor.CONVERT_AND_LOAD, new ConvertAndLoadActor.Rerank());
yield new Monitor(feedsHash);
}
default -> new Error("Unknown state");
};
}
@Override
public String describe() {
return "Actor that polls the feeds database for changes, and triggers the live crawler when needed";
}
@Inject
public LiveCrawlActor(ActorProcessWatcher processWatcher,
ProcessOutboxes processOutboxes,
FeedsClient feedsClient,
Gson gson,
ExecutorActorStateMachines executorActorStateMachines, FileStorageService fileStorageService)
{
super(gson);
this.processWatcher = processWatcher;
this.mqLiveCrawlerOutbox = processOutboxes.getLiveCrawlerOutbox();
this.executorActorStateMachines = executorActorStateMachines;
this.feedsClient = feedsClient;
this.fileStorageService = fileStorageService;
}
}

View File

@ -5,53 +5,59 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class TriggerAdjacencyCalculationActor extends RecordActorPrototype {
private final ActorProcessWatcher processWatcher;
private final MqOutbox exportTasksOutbox;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final ProcessService processService;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public record Run() implements ActorStep {}
public record Run(long msgId) implements ActorStep {
public Run() {
this(-1);
}
}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
case Run() -> {
AtomicBoolean hasError = new AtomicBoolean(false);
var future = executor.submit(() -> {
try {
processService.trigger(ProcessService.ProcessId.ADJACENCIES_CALCULATOR, "load");
case Run(long msgId) when msgId < 0 -> {
long newMsgId = exportTasksOutbox.sendAsync(ExportTaskRequest.adjacencies());
yield new Run(newMsgId);
}
catch (Exception ex) {
logger.warn("Error triggering adjacency calculation", ex);
hasError.set(true);
}
});
future.get();
case Run(long msgId) -> {
var rsp = processWatcher.waitResponse(exportTasksOutbox, ProcessService.ProcessId.EXPORT_TASKS, msgId);
if (hasError.get()) {
yield new Error("Error triggering adjacency calculation");
if (rsp.state() != MqMessageState.OK) {
yield new Error("Exporter failed");
}
else {
yield new End();
}
}
default -> new Error();
};
}
@Inject
public TriggerAdjacencyCalculationActor(Gson gson,
ProcessService processService) {
ProcessOutboxes processOutboxes,
ActorProcessWatcher processWatcher) {
super(gson);
this.processService = processService;
this.exportTasksOutbox = processOutboxes.getExportTasksOutbox();
this.processWatcher = processWatcher;
}
@Override

View File

@ -15,9 +15,12 @@ import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.service.server.DiscoverableService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
@ -32,6 +35,8 @@ public class ExecutorGrpcService
private final ServiceConfiguration serviceConfiguration;
private final ExecutorActorControlService actorControlService;
private static final Logger logger = LoggerFactory.getLogger(ExecutorGrpcService.class);
@Inject
public ExecutorGrpcService(ActorApi actorApi,
FileStorageService fileStorageService,
@ -240,5 +245,22 @@ public class ExecutorGrpcService
}
}
@Override
public void restartExecutorService(Empty request, StreamObserver<Empty> responseObserver) {
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
logger.info("Restarting executor service on node {}", serviceConfiguration.node());
try {
// Wait for the response to be sent before restarting
Thread.sleep(Duration.ofSeconds(5));
}
catch (InterruptedException e) {
logger.warn("Interrupted while waiting for restart", e);
}
System.exit(0);
}
}

View File

@ -13,6 +13,8 @@ public class ProcessOutboxes {
private final MqOutbox loaderOutbox;
private final MqOutbox crawlerOutbox;
private final MqOutbox indexConstructorOutbox;
private final MqOutbox liveCrawlerOutbox;
private final MqOutbox exportTasksOutbox;
@Inject
public ProcessOutboxes(BaseServiceParams params, MqPersistence persistence) {
@ -44,6 +46,22 @@ public class ProcessOutboxes {
params.configuration.node(),
params.configuration.instanceUuid()
);
liveCrawlerOutbox = new MqOutbox(persistence,
ProcessInboxNames.LIVE_CRAWLER_INBOX,
params.configuration.node(),
params.configuration.serviceName(),
params.configuration.node(),
params.configuration.instanceUuid()
);
exportTasksOutbox = new MqOutbox(persistence,
ProcessInboxNames.EXPORT_TASK_INBOX,
params.configuration.node(),
params.configuration.serviceName(),
params.configuration.node(),
params.configuration.instanceUuid()
);
}
@ -60,4 +78,8 @@ public class ProcessOutboxes {
}
public MqOutbox getIndexConstructorOutbox() { return indexConstructorOutbox; }
public MqOutbox getLiveCrawlerOutbox() { return liveCrawlerOutbox; }
public MqOutbox getExportTasksOutbox() { return exportTasksOutbox; }
}

View File

@ -3,14 +3,14 @@ package nu.marginalia.process;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.WmsaHome;
import nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator;
import nu.marginalia.converting.ConverterMain;
import nu.marginalia.crawl.CrawlerMain;
import nu.marginalia.index.IndexConstructorMain;
import nu.marginalia.livecrawler.LiveCrawlerMain;
import nu.marginalia.loading.LoaderMain;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import nu.marginalia.task.ExportTasksMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
@ -37,23 +37,24 @@ public class ProcessService {
private final int node;
public static ProcessService.ProcessId translateExternalIdBase(String id) {
public static ProcessId translateExternalIdBase(String id) {
return switch (id) {
case "converter" -> ProcessService.ProcessId.CONVERTER;
case "crawler" -> ProcessService.ProcessId.CRAWLER;
case "loader" -> ProcessService.ProcessId.LOADER;
case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR;
case "index-constructor" -> ProcessService.ProcessId.INDEX_CONSTRUCTOR;
case "converter" -> ProcessId.CONVERTER;
case "crawler" -> ProcessId.CRAWLER;
case "loader" -> ProcessId.LOADER;
case "export-tasks" -> ProcessId.EXPORT_TASKS;
case "index-constructor" -> ProcessId.INDEX_CONSTRUCTOR;
default -> null;
};
}
public enum ProcessId {
CRAWLER(CrawlerMain.class),
LIVE_CRAWLER(LiveCrawlerMain.class),
CONVERTER(ConverterMain.class),
LOADER(LoaderMain.class),
INDEX_CONSTRUCTOR(IndexConstructorMain.class),
ADJACENCIES_CALCULATOR(WebsiteAdjacenciesCalculator.class)
EXPORT_TASKS(ExportTasksMain.class),
;
public final String mainClass;
@ -64,10 +65,11 @@ public class ProcessService {
List<String> envOpts() {
String variable = switch (this) {
case CRAWLER -> "CRAWLER_PROCESS_OPTS";
case LIVE_CRAWLER -> "LIVE_CRAWLER_PROCESS_OPTS";
case CONVERTER -> "CONVERTER_PROCESS_OPTS";
case LOADER -> "LOADER_PROCESS_OPTS";
case INDEX_CONSTRUCTOR -> "INDEX_CONSTRUCTION_PROCESS_OPTS";
case ADJACENCIES_CALCULATOR -> "ADJACENCIES_CALCULATOR_PROCESS_OPTS";
case EXPORT_TASKS -> "EXPORT_TASKS_PROCESS_OPTS";
};
String value = System.getenv(variable);

View File

@ -11,10 +11,15 @@ import nu.marginalia.service.discovery.property.ServicePartition;
import nu.marginalia.service.module.ServiceConfiguration;
import javax.annotation.CheckReturnValue;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
@Singleton
public class FeedsClient {
@ -23,7 +28,9 @@ public class FeedsClient {
private final MqOutbox updateFeedsOutbox;
@Inject
public FeedsClient(GrpcChannelPoolFactory factory, MqPersistence mqPersistence, ServiceConfiguration serviceConfiguration) {
public FeedsClient(GrpcChannelPoolFactory factory,
MqPersistence mqPersistence,
ServiceConfiguration serviceConfiguration) {
// The client is only interested in the primary node
var key = ServiceKey.forGrpcApi(FeedApiGrpc.class, ServicePartition.any());
@ -46,6 +53,25 @@ public class FeedsClient {
}
}
public void getUpdatedDomains(Instant since, BiConsumer<String, List<String>> consumer) throws ExecutionException, InterruptedException {
channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getUpdatedLinks)
.run(RpcUpdatedLinksRequest.newBuilder().setSinceEpochMillis(since.toEpochMilli()).build())
.forEachRemaining(rsp -> consumer.accept(rsp.getDomain(), new ArrayList<>(rsp.getUrlList())));
}
public record UpdatedDomain(String domain, List<String> urls) {
public UpdatedDomain(RpcUpdatedLinksResponse rsp) {
this(rsp.getDomain(), new ArrayList<>(rsp.getUrlList()));
}
}
/** Get the hash of the feed data, for identifying when the data has been updated */
public String getFeedDataHash() {
return channelPool.call(FeedApiGrpc.FeedApiBlockingStub::getFeedDataHash)
.run(Empty.getDefaultInstance())
.getHash();
}
/** Update the feeds, return a message ID for the update */
@CheckReturnValue
public long updateFeeds(RpcFeedUpdateMode mode) throws Exception {

View File

@ -7,7 +7,22 @@ option java_multiple_files=true;
service FeedApi {
rpc getFeed(RpcDomainId) returns (RpcFeed) {}
rpc getFeedDataHash(Empty) returns (RpcFeedDataHash) {}
rpc updateFeeds(RpcUpdateRequest) returns (Empty) {}
rpc getUpdatedLinks(RpcUpdatedLinksRequest) returns (stream RpcUpdatedLinksResponse) {}
}
message RpcUpdatedLinksRequest {
int64 sinceEpochMillis = 1;
}
message RpcUpdatedLinksResponse {
string domain = 1;
repeated string url = 2;
}
message RpcFeedDataHash {
string hash = 1;
}
message RpcDomainId {

View File

@ -11,11 +11,16 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.MessageDigest;
import java.time.Instant;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
@Singleton
public class FeedDb {
@ -45,6 +50,18 @@ public class FeedDb {
}
}
/** Constructor for testing */
public FeedDb(Path dbPath) {
feedDbEnabled = true;
readerDbPath = dbPath;
try {
reader = new FeedDbReader(readerDbPath);
} catch (Exception e) {
reader = null;
}
}
public boolean isEnabled() {
return feedDbEnabled;
}
@ -112,7 +129,7 @@ public class FeedDb {
}
try {
Path dbFile = Files.createTempFile(WmsaHome.getDataPath(), "rss-feeds", ".tmp.db");
Path dbFile = Files.createTempFile(readerDbPath.getParent(), "rss-feeds", ".tmp.db");
return new FeedDbWriter(dbFile);
} catch (Exception e) {
logger.error("Error creating new database writer", e);
@ -141,4 +158,34 @@ public class FeedDb {
}
}
public String getDataHash() throws Exception {
MessageDigest digest = MessageDigest.getInstance("MD5");
byte[] buffer = new byte[4096];
try (var inputStream = new BufferedInputStream(Files.newInputStream(readerDbPath))) {
int rb;
while ((rb = inputStream.read(buffer)) >= 0) {
digest.update(buffer, 0, rb);
}
}
return Base64.getEncoder().encodeToString(digest.digest());
}
public void getLinksUpdatedSince(Instant since, BiConsumer<String, List<String>> consumer) throws Exception {
if (!feedDbEnabled) {
throw new IllegalStateException("Feed database is disabled on this node");
}
// Capture the current reader to avoid concurrency issues
FeedDbReader reader = this.reader;
if (reader == null) {
throw new NullPointerException("Reader is not available");
}
reader.getLinksUpdatedSince(since, consumer);
}
}

View File

@ -12,9 +12,11 @@ import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
public class FeedDbReader implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(FeedDbReader.class);
@ -99,4 +101,27 @@ public class FeedDbReader implements AutoCloseable {
}
public void getLinksUpdatedSince(Instant since, BiConsumer<String, List<String>> consumer) {
try (var stmt = connection.prepareStatement("SELECT FEED FROM feed")) {
var rs = stmt.executeQuery();
while (rs.next()) {
FeedItems items = deserialize(rs.getString(1));
List<String> urls = new ArrayList<>();
for (var item : items.items()) {
if (item.getUpdateTimeZD().toInstant().isAfter(since)) {
urls.add(item.url());
}
}
if (!urls.isEmpty()) {
consumer.accept(items.domain(), new ArrayList<>(urls));
urls.clear();
}
}
} catch (SQLException e) {
logger.error("Error getting updated links", e);
}
}
}

View File

@ -55,6 +55,11 @@ public record FeedItem(String title,
return zonedDateTime.map(date -> date.format(DATE_FORMAT)).orElse("");
}
public ZonedDateTime getUpdateTimeZD() {
return ZonedDateTime.parse(date, DATE_FORMAT);
}
@Override
public int compareTo(@NotNull FeedItem o) {
return o.date.compareTo(date);

View File

@ -1,7 +1,6 @@
package nu.marginalia.rss.model;
import java.util.List;
import java.util.Optional;
public record FeedItems(String domain,
String feedUrl,
@ -17,17 +16,4 @@ public record FeedItems(String domain,
public boolean isEmpty() {
return items.isEmpty();
}
public Optional<FeedItem> getLatest() {
if (items.isEmpty())
return Optional.empty();
return Optional.of(
items.getFirst()
);
}
public Optional<String> getLatestDate() {
return getLatest().map(FeedItem::date);
}
}

View File

@ -70,7 +70,7 @@ public class FeedFetcherService {
this.serviceHeartbeat = serviceHeartbeat;
this.executorClient = executorClient;
rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier() + " RSS Feed Fetcher");
rssReader.addHeader("User-Agent", WmsaHome.getUserAgent().uaIdentifier());
}
public enum UpdateMode {

View File

@ -14,6 +14,8 @@ import nu.marginalia.service.server.DiscoverableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements DiscoverableService {
@ -64,6 +66,45 @@ public class FeedsGrpcService extends FeedApiGrpc.FeedApiImplBase implements Dis
responseObserver.onCompleted();
}
@Override
public void getFeedDataHash(Empty request, StreamObserver<RpcFeedDataHash> responseObserver) {
if (!feedDb.isEnabled()) {
responseObserver.onError(new IllegalStateException("Feed database is disabled on this node"));
return;
}
try {
String hash = feedDb.getDataHash();
responseObserver.onNext(RpcFeedDataHash.newBuilder().setHash(hash).build());
responseObserver.onCompleted();
}
catch (Exception e) {
logger.error("Error getting feed data hash", e);
responseObserver.onError(e);
}
}
@Override
public void getUpdatedLinks(RpcUpdatedLinksRequest request, StreamObserver<RpcUpdatedLinksResponse> responseObserver) {
Instant since = Instant.ofEpochMilli(request.getSinceEpochMillis());
try {
feedDb.getLinksUpdatedSince(since, (String domain, List<String> urls) -> {
RpcUpdatedLinksResponse rsp = RpcUpdatedLinksResponse.newBuilder()
.setDomain(domain)
.addAllUrl(urls)
.build();
responseObserver.onNext(rsp);
});
responseObserver.onCompleted();
}
catch (Exception e) {
logger.error("Error getting updated links", e);
responseObserver.onError(e);
}
}
@Override
public void getFeed(RpcDomainId request,
StreamObserver<RpcFeed> responseObserver)

View File

@ -0,0 +1,34 @@
package nu.marginalia.rss.db;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class FeedDbReaderTest {
@Tag("flaky") // will only work on ~vlofgren, not on CI; remove test when this feature is stable
@Test
void getLinksUpdatedSince() throws SQLException {
var reader = new FeedDbReader(Path.of("/home/vlofgren/rss-feeds.db"));
Map<String, List<String>> links = new HashMap<>();
reader.getLinksUpdatedSince(Instant.now().minus(10, ChronoUnit.DAYS), links::put);
System.out.println(links.size());
for (var link : links.values()) {
if (link.size() < 2) {
System.out.println(link);
}
}
reader.close();
}
}

View File

@ -0,0 +1,36 @@
package nu.marginalia.rss.db;
import nu.marginalia.rss.model.FeedItem;
import nu.marginalia.rss.model.FeedItems;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
class FeedDbTest {
@Test
public void testDbHash() throws Exception{
Path dbPath = Files.createTempFile("rss-feeds", ".db");
try {
FeedDb db = new FeedDb(dbPath);
try (var writer = db.createWriter()) {
writer.saveFeed(new FeedItems("foo", "bar", "baz", List.of(
new FeedItem("title1", "link1", "description1", "content1"),
new FeedItem("title2", "link2", "description2", "content2")
)));
db.switchDb(writer);
}
var hash = db.getDataHash();
Assertions.assertFalse(hash.isBlank());
} finally {
Files.delete(dbPath);
}
}
}

View File

@ -67,7 +67,6 @@ dependencies {
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation libs.commons.lang3
testImplementation project(':code:common:process')
testImplementation project(':code:libraries:array')
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')

View File

@ -20,7 +20,7 @@ dependencies {
implementation project(':code:index:query')
implementation project(':code:index:index-journal')
implementation project(':code:common:model')
implementation project(':code:common:process')
implementation project(':code:common:service')
implementation project(':code:processes:converting-process:model')
implementation libs.bundles.slf4j

View File

@ -21,8 +21,8 @@ dependencies {
implementation project(':code:index:query')
implementation project(':code:index:index-journal')
implementation project(':code:common:model')
implementation project(':code:common:service')
implementation project(':code:processes:converting-process:model')
implementation project(':code:common:process')
implementation project(':third-party:parquet-floor')
implementation project(':third-party:commons-codec')

View File

@ -21,7 +21,6 @@ tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:process')
implementation project(':third-party:porterstemmer')
implementation project(':third-party:count-min-sketch')

View File

@ -14,9 +14,9 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:config')
implementation project(':code:common:service')
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:common:process')
implementation project(':code:processes:converting-process:ft-keyword-extraction')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:term-frequency-dict')

View File

@ -2,10 +2,10 @@ package nu.marginalia.atags.source;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.process.ProcessConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -4,8 +4,6 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.converting.model.CrawlPlan;
import nu.marginalia.converting.model.WorkDir;
import nu.marginalia.converting.processor.DomainProcessor;
@ -17,14 +15,14 @@ import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.process.log.WorkLogEntry;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.util.SimpleBlockingThreadPool;
@ -34,13 +32,13 @@ import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@ -50,12 +48,9 @@ import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX;
public class ConverterMain extends ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class);
private final DomainProcessor processor;
private final Gson gson;
private final ProcessHeartbeat heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final SideloadSourceFactory sideloadSourceFactory;
private final int node;
public static void main(String... args) throws Exception {
@ -70,8 +65,9 @@ public class ConverterMain extends ProcessMainClass {
logger.info("Starting pipe");
converter
.fetchInstructions()
Instructions<ConvertRequest> instructions = converter.fetchInstructions(ConvertRequest.class);
converter.createAction(instructions)
.execute(converter);
logger.info("Finished");
@ -83,6 +79,65 @@ public class ConverterMain extends ProcessMainClass {
System.exit(0);
}
private Action createAction(Instructions<ConvertRequest> instructions) throws SQLException, IOException {
var request = instructions.value();
final Path inputPath = request.getInputPath();
return switch (request.action) {
case ConvertCrawlData -> {
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var processData = fileStorageService.getStorage(request.processedDataStorage);
var plan = new CrawlPlan(null,
new WorkDir(crawlData.asPath().toString(), "crawler.log"),
new WorkDir(processData.asPath().toString(), "processor.log")
);
yield new ConvertCrawlDataAction(plan, instructions);
}
case SideloadEncyclopedia -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl),
processData.asPath(),
instructions);
}
case SideloadDirtree -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadDirtree(inputPath),
processData.asPath(),
instructions);
}
case SideloadWarc -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadWarc(inputPath),
processData.asPath(),
instructions);
}
case SideloadReddit -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadReddit(inputPath),
processData.asPath(),
instructions);
}
case SideloadStackexchange -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadStackexchange(inputPath),
processData.asPath(),
instructions);
}
};
}
@Inject
public ConverterMain(
DomainProcessor processor,
@ -94,13 +149,12 @@ public class ConverterMain extends ProcessMainClass {
ProcessConfiguration processConfiguration
)
{
super(messageQueueFactory, processConfiguration, gson, CONVERTER_INBOX);
this.processor = processor;
this.gson = gson;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.sideloadSourceFactory = sideloadSourceFactory;
this.node = processConfiguration.node();
heartbeat.start();
}
@ -220,45 +274,44 @@ public class ConverterMain extends ProcessMainClass {
}
}
private abstract static class ConvertRequest {
private final MqMessage message;
private final MqSingleShotInbox inbox;
private abstract static class Action {
final Instructions<?> instructions;
private ConvertRequest(MqMessage message, MqSingleShotInbox inbox) {
this.message = message;
this.inbox = inbox;
public Action(Instructions<?> instructions) {
this.instructions = instructions;
}
public abstract void execute(ConverterMain converterMain) throws Exception;
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
instructions.ok();
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
instructions.err();
}
}
private static class SideloadAction extends ConvertRequest {
private static class SideloadAction extends Action {
private final Collection<? extends SideloadSource> sideloadSources;
private final Path workDir;
SideloadAction(SideloadSource sideloadSource,
Path workDir,
MqMessage message, MqSingleShotInbox inbox) {
super(message, inbox);
Instructions<?> instructions) {
super(instructions);
this.sideloadSources = List.of(sideloadSource);
this.workDir = workDir;
}
SideloadAction(Collection<? extends SideloadSource> sideloadSources,
Path workDir,
MqMessage message, MqSingleShotInbox inbox) {
super(message, inbox);
Instructions<?> instructions) {
super(instructions);
this.sideloadSources = sideloadSources;
this.workDir = workDir;
}
@Override
public void execute(ConverterMain converterMain) throws Exception {
try {
@ -272,11 +325,12 @@ public class ConverterMain extends ProcessMainClass {
}
}
private static class ConvertCrawlDataAction extends ConvertRequest {
private static class ConvertCrawlDataAction extends Action {
private final CrawlPlan plan;
private ConvertCrawlDataAction(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) {
super(message, inbox);
private ConvertCrawlDataAction(CrawlPlan plan,
Instructions<?> instructions) {
super(instructions);
this.plan = plan;
}
@ -294,94 +348,4 @@ public class ConverterMain extends ProcessMainClass {
}
}
private ConvertRequest fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, node, UUID.randomUUID());
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.converting.ConvertRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
try {
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class);
// will be null on ConvertCrawlData
final Path inputPath = request.getInputPath();
return switch (request.action) {
case ConvertCrawlData -> {
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var processData = fileStorageService.getStorage(request.processedDataStorage);
var plan = new CrawlPlan(null,
new WorkDir(crawlData.asPath().toString(), "crawler.log"),
new WorkDir(processData.asPath().toString(), "processor.log")
);
yield new ConvertCrawlDataAction(plan, msg, inbox);
}
case SideloadEncyclopedia -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl),
processData.asPath(),
msg, inbox);
}
case SideloadDirtree -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadDirtree(inputPath),
processData.asPath(),
msg, inbox);
}
case SideloadWarc -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadWarc(inputPath),
processData.asPath(),
msg, inbox);
}
case SideloadReddit -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadReddit(inputPath),
processData.asPath(),
msg, inbox);
}
case SideloadStackexchange -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadStackexchange(inputPath),
processData.asPath(),
msg, inbox);
}
};
}
catch (Exception ex) {
inbox.sendResponse(msg, MqInboxResponse.err(ex.getClass().getSimpleName() + ": " + ex.getMessage()));
throw ex;
}
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
}

View File

@ -1,7 +1,6 @@
package nu.marginalia.converting.processor;
import com.google.inject.Inject;
import nu.marginalia.atags.AnchorTextKeywords;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.atags.source.AnchorTagsSource;
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
@ -37,7 +36,6 @@ public class DomainProcessor {
private final DocumentProcessor documentProcessor;
private final SiteWords siteWords;
private final AnchorTagsSource anchorTagsSource;
private final AnchorTextKeywords anchorTextKeywords;
private final GeoIpDictionary geoIpDictionary;
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -46,12 +44,10 @@ public class DomainProcessor {
public DomainProcessor(DocumentProcessor documentProcessor,
SiteWords siteWords,
AnchorTagsSourceFactory anchorTagsSourceFactory,
AnchorTextKeywords anchorTextKeywords,
GeoIpDictionary geoIpDictionary) throws SQLException
{
this.documentProcessor = documentProcessor;
this.siteWords = siteWords;
this.anchorTextKeywords = anchorTextKeywords;
this.anchorTagsSource = anchorTagsSourceFactory.create();
this.geoIpDictionary = geoIpDictionary;

View File

@ -51,10 +51,6 @@ public class SideloaderProcessing {
"NP",
"",
body,
Integer.toHexString(url.hashCode()),
url,
"",
"SIDELOAD",
false,
null,
null

View File

@ -27,6 +27,8 @@ public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriter
private final SlopDomainLinkRecord.Writer domainLinkWriter;
private final SlopDocumentRecord.Writer documentWriter;
private int ordinalOffset = 0;
private static final Logger logger = LoggerFactory.getLogger(ConverterBatchWriter.class);
public ConverterBatchWriter(Path basePath, int batchNumber) throws IOException {
@ -46,6 +48,11 @@ public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriter
documentWriter = new SlopDocumentRecord.Writer(ProcessedDataFileNames.documentFileName(basePath), batchNumber);
}
/** Sets the lowest ordinal value for the documents in this batch */
public void setOrdinalOffset(int ordinalOffset) {
this.ordinalOffset = ordinalOffset;
}
@Override
public void write(ConverterBatchWritableIf writable) throws IOException {
writable.write(this);
@ -79,7 +86,7 @@ public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriter
throws IOException
{
int ordinal = 0;
int ordinal = ordinalOffset;
String domainName = domain.toString();

View File

@ -147,10 +147,6 @@ public class ConvertingIntegrationTest {
"",
"",
readClassPathFile(p.toString()),
Double.toString(Math.random()),
"https://memex.marginalia.nu/" + file,
null,
"",
false,
null,
null

View File

@ -3,9 +3,9 @@ package nu.marginalia.converting;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import nu.marginalia.LanguageModels;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.WmsaHome;
import nu.marginalia.converting.processor.ConverterDomainTypes;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.service.module.ServiceConfiguration;
import org.mockito.Mockito;

View File

@ -2,10 +2,10 @@ package nu.marginalia.converting.sideload.reddit;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.converting.ConverterModule;
import nu.marginalia.converting.processor.ConverterDomainTypes;
import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.process.ProcessConfiguration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

View File

@ -21,7 +21,6 @@ tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:process')
implementation project(':code:common:db')
implementation project(':code:common:model')

View File

@ -5,8 +5,6 @@ import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks;
@ -25,15 +23,15 @@ import nu.marginalia.io.CrawledDomainReader;
import nu.marginalia.io.CrawlerOutputFile;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.util.SimpleBlockingThreadPool;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
@ -46,8 +44,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.Security;
import java.sql.SQLException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -59,14 +59,12 @@ public class CrawlerMain extends ProcessMainClass {
private final UserAgent userAgent;
private final ProcessHeartbeatImpl heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final DomainProber domainProber;
private final FileStorageService fileStorageService;
private final AnchorTagsSourceFactory anchorTagsSourceFactory;
private final WarcArchiverFactory warcArchiverFactory;
private final HikariDataSource dataSource;
private final DomainBlacklist blacklist;
private final Gson gson;
private final int node;
private final SimpleBlockingThreadPool pool;
@ -96,16 +94,17 @@ public class CrawlerMain extends ProcessMainClass {
HikariDataSource dataSource,
DomainBlacklist blacklist,
Gson gson) throws InterruptedException {
super(messageQueueFactory, processConfiguration, gson, CRAWLER_INBOX);
this.userAgent = userAgent;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.domainProber = domainProber;
this.fileStorageService = fileStorageService;
this.anchorTagsSourceFactory = anchorTagsSourceFactory;
this.warcArchiverFactory = warcArchiverFactory;
this.dataSource = dataSource;
this.blacklist = blacklist;
this.gson = gson;
this.node = processConfiguration.node();
pool = new SimpleBlockingThreadPool("CrawlerPool",
@ -144,13 +143,14 @@ public class CrawlerMain extends ProcessMainClass {
);
var crawler = injector.getInstance(CrawlerMain.class);
var instructions = crawler.fetchInstructions();
var instructions = crawler.fetchInstructions(nu.marginalia.mqapi.crawling.CrawlRequest.class);
try {
if (instructions.targetDomainName != null) {
crawler.runForSingleDomain(instructions.targetDomainName, instructions.outputDir);
var req = instructions.value();
if (req.targetDomainName != null) {
crawler.runForSingleDomain(req.targetDomainName, req.crawlStorage);
}
else {
crawler.runForDatabaseDomains(instructions.outputDir);
crawler.runForDatabaseDomains(req.crawlStorage);
}
instructions.ok();
} catch (Exception ex) {
@ -166,6 +166,10 @@ public class CrawlerMain extends ProcessMainClass {
System.exit(0);
}
public void runForDatabaseDomains(FileStorageId fileStorageId) throws Exception {
runForDatabaseDomains(fileStorageService.getStorage(fileStorageId).asPath());
}
public void runForDatabaseDomains(Path outputDir) throws Exception {
heartbeat.start();
@ -285,6 +289,11 @@ public class CrawlerMain extends ProcessMainClass {
}
}
public void runForSingleDomain(String targetDomainName, FileStorageId fileStorageId) throws Exception {
runForSingleDomain(targetDomainName, fileStorageService.getStorage(fileStorageId).asPath());
}
public void runForSingleDomain(String targetDomainName, Path outputDir) throws Exception {
heartbeat.start();
@ -410,70 +419,6 @@ public class CrawlerMain extends ProcessMainClass {
}
private static class CrawlRequest {
private final Path outputDir;
private final MqMessage message;
private final MqSingleShotInbox inbox;
private final String targetDomainName;
CrawlRequest(String targetDomainName,
Path outputDir,
MqMessage message,
MqSingleShotInbox inbox)
{
this.message = message;
this.inbox = inbox;
this.outputDir = outputDir;
this.targetDomainName = targetDomainName;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private CrawlRequest fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(CRAWLER_INBOX, node, UUID.randomUUID());
logger.info("Waiting for instructions");
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.crawling.CrawlRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class);
var crawlStorage = fileStorageService.getStorage(request.crawlStorage);
return new CrawlRequest(
request.targetDomainName,
crawlStorage.asPath(),
msg,
inbox);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
public record CrawlSpecRecord(@NotNull String domain, int crawlDepth, @NotNull List<String> urls) {
public CrawlSpecRecord(String domain, int crawlDepth) {

View File

@ -19,8 +19,13 @@ public class CrawlDelayTimer {
private final long delayTime;
public CrawlDelayTimer(long delayTime) {
if (delayTime <= 0) {
this.delayTime = DEFAULT_CRAWL_DELAY_MIN_MS;
}
else {
this.delayTime = delayTime;
}
}
/** Call when we've gotten an HTTP 429 response. This will wait a moment, and then
* set a flag that slows down the main crawl delay as well. */
@ -41,6 +46,10 @@ public class CrawlDelayTimer {
Thread.sleep(delay.toMillis());
}
public void waitFetchDelay() {
waitFetchDelay(0);
}
public void waitFetchDelay(long spentTime) {
long sleepTime = delayTime;

View File

@ -1,8 +1,8 @@
package nu.marginalia.crawl.warc;
import com.google.inject.Inject;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.process.ProcessConfiguration;
import org.apache.commons.io.IOUtils;
import java.io.IOException;

View File

@ -20,7 +20,6 @@ dependencies {
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:common:config')
implementation project(':code:common:process')
implementation project(':code:index:api')
implementation project(':code:processes:crawling-process:ft-content-type')
implementation project(':code:libraries:language-processing')

View File

@ -148,10 +148,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
"",
nextRecord.headers,
bodyString,
Long.toHexString(hash.hashNearlyASCII(bodyString)), // this field isn't actually used, maybe we can skip calculating it?
nextRecord.url,
null,
"",
// this field isn't actually used, maybe we can skip calculating it?
nextRecord.cookies,
lastModified,
etag));

View File

@ -4,7 +4,7 @@ import nu.marginalia.model.EdgeUrl;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.Nullable;
public class CrawledDocument implements SerializableCrawlData {
public final class CrawledDocument implements SerializableCrawlData {
public String crawlId;
public String url;
@ -21,16 +21,6 @@ public class CrawledDocument implements SerializableCrawlData {
public String documentBody;
@Deprecated
public String documentBodyHash;
@Deprecated
public String canonicalUrl;
public String redirectUrl;
@Deprecated
public String recrawlState;
/**
* This is not guaranteed to be set in all versions of the format,
* information may come in CrawledDomain instead
@ -40,7 +30,7 @@ public class CrawledDocument implements SerializableCrawlData {
public String lastModifiedMaybe;
public String etagMaybe;
public CrawledDocument(String crawlId, String url, String contentType, String timestamp, int httpStatus, String crawlerStatus, String crawlerStatusDesc, @Nullable String headers, String documentBody, String documentBodyHash, String canonicalUrl, String redirectUrl, String recrawlState, Boolean hasCookies, String lastModifiedMaybe, String etagMaybe) {
public CrawledDocument(String crawlId, String url, String contentType, String timestamp, int httpStatus, String crawlerStatus, String crawlerStatusDesc, @Nullable String headers, String documentBody, Boolean hasCookies, String lastModifiedMaybe, String etagMaybe) {
this.crawlId = crawlId;
this.url = url;
this.contentType = contentType;
@ -50,10 +40,6 @@ public class CrawledDocument implements SerializableCrawlData {
this.crawlerStatusDesc = crawlerStatusDesc;
this.headers = headers;
this.documentBody = documentBody;
this.documentBodyHash = documentBodyHash;
this.canonicalUrl = canonicalUrl;
this.redirectUrl = redirectUrl;
this.recrawlState = recrawlState;
this.hasCookies = hasCookies;
this.lastModifiedMaybe = lastModifiedMaybe;
this.etagMaybe = etagMaybe;
@ -120,7 +106,7 @@ public class CrawledDocument implements SerializableCrawlData {
}
public String toString() {
return "CrawledDocument(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBody=" + this.documentBody + ", documentBodyHash=" + this.documentBodyHash + ", canonicalUrl=" + this.canonicalUrl + ", redirectUrl=" + this.redirectUrl + ", recrawlState=" + this.recrawlState + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")";
return "CrawledDocument(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBody=" + this.documentBody + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")";
}
public static class CrawledDocumentBuilder {
@ -133,9 +119,6 @@ public class CrawledDocument implements SerializableCrawlData {
private String crawlerStatusDesc;
private @Nullable String headers;
private String documentBody;
private String documentBodyHash;
private String canonicalUrl;
private String redirectUrl;
private String recrawlState;
private Boolean hasCookies;
private String lastModifiedMaybe;
@ -189,23 +172,6 @@ public class CrawledDocument implements SerializableCrawlData {
return this;
}
@Deprecated
public CrawledDocumentBuilder documentBodyHash(String documentBodyHash) {
this.documentBodyHash = documentBodyHash;
return this;
}
@Deprecated
public CrawledDocumentBuilder canonicalUrl(String canonicalUrl) {
this.canonicalUrl = canonicalUrl;
return this;
}
public CrawledDocumentBuilder redirectUrl(String redirectUrl) {
this.redirectUrl = redirectUrl;
return this;
}
@Deprecated
public CrawledDocumentBuilder recrawlState(String recrawlState) {
this.recrawlState = recrawlState;
@ -228,11 +194,11 @@ public class CrawledDocument implements SerializableCrawlData {
}
public CrawledDocument build() {
return new CrawledDocument(this.crawlId, this.url, this.contentType, this.timestamp, this.httpStatus, this.crawlerStatus, this.crawlerStatusDesc, this.headers, this.documentBody, this.documentBodyHash, this.canonicalUrl, this.redirectUrl, this.recrawlState, this.hasCookies, this.lastModifiedMaybe, this.etagMaybe);
return new CrawledDocument(this.crawlId, this.url, this.contentType, this.timestamp, this.httpStatus, this.crawlerStatus, this.crawlerStatusDesc, this.headers, this.documentBody, this.hasCookies, this.lastModifiedMaybe, this.etagMaybe);
}
public String toString() {
return "CrawledDocument.CrawledDocumentBuilder(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBody=" + this.documentBody + ", documentBodyHash=" + this.documentBodyHash + ", canonicalUrl=" + this.canonicalUrl + ", redirectUrl=" + this.redirectUrl + ", recrawlState=" + this.recrawlState + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")";
return "CrawledDocument.CrawledDocumentBuilder(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBody=" + this.documentBody + ", recrawlState=" + this.recrawlState + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")";
}
}
}

View File

@ -1,8 +1,9 @@
package nu.marginalia.model.crawldata;
import java.util.List;
import java.util.Objects;
public class CrawledDomain implements SerializableCrawlData {
public final class CrawledDomain implements SerializableCrawlData {
public String domain;
public String redirectDomain;
@ -11,6 +12,7 @@ public class CrawledDomain implements SerializableCrawlData {
public String crawlerStatusDesc;
public String ip;
@Deprecated // This used to be populated, but is no longer
public List<CrawledDocument> doc;
/**
@ -29,15 +31,6 @@ public class CrawledDomain implements SerializableCrawlData {
this.cookies = cookies;
}
public static CrawledDomainBuilder builder() {
return new CrawledDomainBuilder();
}
public int size() {
if (doc == null) return 0;
return doc.size();
}
public String getDomain() {
return this.domain;
}
@ -94,119 +87,26 @@ public class CrawledDomain implements SerializableCrawlData {
this.cookies = cookies;
}
public boolean equals(final Object o) {
if (o == this) return true;
if (!(o instanceof CrawledDomain)) return false;
final CrawledDomain other = (CrawledDomain) o;
if (!other.canEqual((Object) this)) return false;
final Object this$domain = this.getDomain();
final Object other$domain = other.getDomain();
if (this$domain == null ? other$domain != null : !this$domain.equals(other$domain)) return false;
final Object this$redirectDomain = this.getRedirectDomain();
final Object other$redirectDomain = other.getRedirectDomain();
if (this$redirectDomain == null ? other$redirectDomain != null : !this$redirectDomain.equals(other$redirectDomain))
return false;
final Object this$crawlerStatus = this.getCrawlerStatus();
final Object other$crawlerStatus = other.getCrawlerStatus();
if (this$crawlerStatus == null ? other$crawlerStatus != null : !this$crawlerStatus.equals(other$crawlerStatus))
return false;
final Object this$crawlerStatusDesc = this.getCrawlerStatusDesc();
final Object other$crawlerStatusDesc = other.getCrawlerStatusDesc();
if (this$crawlerStatusDesc == null ? other$crawlerStatusDesc != null : !this$crawlerStatusDesc.equals(other$crawlerStatusDesc))
return false;
final Object this$ip = this.getIp();
final Object other$ip = other.getIp();
if (this$ip == null ? other$ip != null : !this$ip.equals(other$ip)) return false;
final Object this$doc = this.getDoc();
final Object other$doc = other.getDoc();
if (this$doc == null ? other$doc != null : !this$doc.equals(other$doc)) return false;
final Object this$cookies = this.getCookies();
final Object other$cookies = other.getCookies();
if (this$cookies == null ? other$cookies != null : !this$cookies.equals(other$cookies)) return false;
return true;
}
protected boolean canEqual(final Object other) {
return other instanceof CrawledDomain;
@Override
public boolean equals(Object o) {
if (!(o instanceof CrawledDomain that)) return false;
return Objects.equals(domain, that.domain) && Objects.equals(redirectDomain, that.redirectDomain) && Objects.equals(crawlerStatus, that.crawlerStatus) && Objects.equals(crawlerStatusDesc, that.crawlerStatusDesc) && Objects.equals(ip, that.ip) && Objects.equals(doc, that.doc) && Objects.equals(cookies, that.cookies);
}
@Override
public int hashCode() {
final int PRIME = 59;
int result = 1;
final Object $domain = this.getDomain();
result = result * PRIME + ($domain == null ? 43 : $domain.hashCode());
final Object $redirectDomain = this.getRedirectDomain();
result = result * PRIME + ($redirectDomain == null ? 43 : $redirectDomain.hashCode());
final Object $crawlerStatus = this.getCrawlerStatus();
result = result * PRIME + ($crawlerStatus == null ? 43 : $crawlerStatus.hashCode());
final Object $crawlerStatusDesc = this.getCrawlerStatusDesc();
result = result * PRIME + ($crawlerStatusDesc == null ? 43 : $crawlerStatusDesc.hashCode());
final Object $ip = this.getIp();
result = result * PRIME + ($ip == null ? 43 : $ip.hashCode());
final Object $doc = this.getDoc();
result = result * PRIME + ($doc == null ? 43 : $doc.hashCode());
final Object $cookies = this.getCookies();
result = result * PRIME + ($cookies == null ? 43 : $cookies.hashCode());
int result = Objects.hashCode(domain);
result = 31 * result + Objects.hashCode(redirectDomain);
result = 31 * result + Objects.hashCode(crawlerStatus);
result = 31 * result + Objects.hashCode(crawlerStatusDesc);
result = 31 * result + Objects.hashCode(ip);
result = 31 * result + Objects.hashCode(doc);
result = 31 * result + Objects.hashCode(cookies);
return result;
}
public String toString() {
return "CrawledDomain(domain=" + this.getDomain() + ", redirectDomain=" + this.getRedirectDomain() + ", crawlerStatus=" + this.getCrawlerStatus() + ", crawlerStatusDesc=" + this.getCrawlerStatusDesc() + ", ip=" + this.getIp() + ", doc=" + this.getDoc() + ", cookies=" + this.getCookies() + ")";
}
public static class CrawledDomainBuilder {
private String domain;
private String redirectDomain;
private String crawlerStatus;
private String crawlerStatusDesc;
private String ip;
private List<CrawledDocument> doc;
private List<String> cookies;
CrawledDomainBuilder() {
}
public CrawledDomainBuilder domain(String domain) {
this.domain = domain;
return this;
}
public CrawledDomainBuilder redirectDomain(String redirectDomain) {
this.redirectDomain = redirectDomain;
return this;
}
public CrawledDomainBuilder crawlerStatus(String crawlerStatus) {
this.crawlerStatus = crawlerStatus;
return this;
}
public CrawledDomainBuilder crawlerStatusDesc(String crawlerStatusDesc) {
this.crawlerStatusDesc = crawlerStatusDesc;
return this;
}
public CrawledDomainBuilder ip(String ip) {
this.ip = ip;
return this;
}
public CrawledDomainBuilder doc(List<CrawledDocument> doc) {
this.doc = doc;
return this;
}
public CrawledDomainBuilder cookies(List<String> cookies) {
this.cookies = cookies;
return this;
}
public CrawledDomain build() {
return new CrawledDomain(this.domain, this.redirectDomain, this.crawlerStatus, this.crawlerStatusDesc, this.ip, this.doc, this.cookies);
}
public String toString() {
return "CrawledDomain.CrawledDomainBuilder(domain=" + this.domain + ", redirectDomain=" + this.redirectDomain + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", ip=" + this.ip + ", doc=" + this.doc + ", cookies=" + this.cookies + ")";
}
}
}

View File

@ -1,5 +1,5 @@
package nu.marginalia.model.crawldata;
public interface SerializableCrawlData {
public sealed interface SerializableCrawlData permits CrawledDocument, CrawledDomain {
String getDomain();
}

View File

@ -1,24 +1,33 @@
plugins {
id 'java'
id "de.undercouch.download" version "5.1.0"
id 'application'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
application {
mainClass = 'nu.marginalia.task.ExportTaskMain'
applicationName = 'export-task-process'
}
tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:config')
implementation project(':code:common:process')
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:common:service')
implementation project(':code:common:config')
implementation project(':code:libraries:message-queue')
implementation project(':code:functions:link-graph:api')
implementation project(':code:processes:process-mq-api')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:term-frequency-dict')
implementation project(':code:libraries:blocking-thread-pool')
@ -28,20 +37,32 @@ dependencies {
implementation project(':code:processes:converting-process')
implementation project(':third-party:commons-codec')
implementation libs.bundles.slf4j
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
implementation libs.roaringbitmap
implementation libs.trove
implementation libs.fastutil
implementation libs.bundles.mariadb
implementation libs.gson
implementation libs.commons.lang3
implementation libs.commons.io
implementation libs.commons.compress
implementation libs.notnull
implementation libs.commons.codec
implementation libs.jsoup
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation libs.commons.codec
testImplementation 'org.testcontainers:mariadb:1.17.4'
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
testImplementation project(':code:libraries:test-helpers')
}

View File

@ -0,0 +1,127 @@
package nu.marginalia.adjacencies;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.api.linkgraph.AggregateLinkGraphClient;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import static nu.marginalia.adjacencies.SparseBitVector.andCardinality;
import static nu.marginalia.adjacencies.SparseBitVector.weightedProduct;
public class WebsiteAdjacenciesCalculator {
private final AggregateLinkGraphClient domainLinksClient;
private final ProcessConfiguration configuration;
private final HikariDataSource dataSource;
public AdjacenciesData adjacenciesData;
public DomainAliases domainAliases;
private static final Logger logger = LoggerFactory.getLogger(WebsiteAdjacenciesCalculator.class);
float[] weights;
@Inject
public WebsiteAdjacenciesCalculator(AggregateLinkGraphClient domainLinksClient,
ProcessConfiguration configuration,
HikariDataSource dataSource) {
this.domainLinksClient = domainLinksClient;
this.configuration = configuration;
this.dataSource = dataSource;
}
public void export() throws Exception {
try (var processHeartbeat = new ProcessHeartbeatImpl(configuration, dataSource)) {
domainAliases = new DomainAliases(dataSource);
adjacenciesData = new AdjacenciesData(domainLinksClient, domainAliases);
weights = adjacenciesData.getWeights();
AdjacenciesLoader loader = new AdjacenciesLoader(dataSource);
var executor = Executors.newFixedThreadPool(16);
int total = adjacenciesData.getIdsList().size();
AtomicInteger progress = new AtomicInteger(0);
IntStream.of(adjacenciesData.getIdsList().toArray()).parallel()
.filter(domainAliases::isNotAliased)
.forEach(id -> {
findAdjacent(id, loader::load);
processHeartbeat.setProgress(progress.incrementAndGet() / (double) total);
});
executor.shutdown();
System.out.println("Waiting for wrap-up");
loader.stop();
}
}
public void findAdjacent(int domainId, Consumer<DomainSimilarities> andThen) {
findAdjacentDtoS(domainId, andThen);
}
double cosineSimilarity(SparseBitVector a, SparseBitVector b) {
double andCardinality = andCardinality(a, b);
andCardinality /= Math.sqrt(a.getCardinality());
andCardinality /= Math.sqrt(b.getCardinality());
return andCardinality;
}
double expensiveCosineSimilarity(SparseBitVector a, SparseBitVector b) {
return weightedProduct(weights, a, b) / Math.sqrt(a.mulAndSum(weights) * b.mulAndSum(weights));
}
public record DomainSimilarities(int domainId, List<DomainSimilarity> similarities) {}
public record DomainSimilarity(int domainId, double value) {}
private void findAdjacentDtoS(int domainId, Consumer<DomainSimilarities> andThen) {
var vector = adjacenciesData.getVector(domainId);
if (vector == null || !vector.cardinalityExceeds(10)) {
return;
}
List<DomainSimilarity> similarities = new ArrayList<>(1000);
var items = adjacenciesData.getCandidates(vector);
int cardMin = Math.max(2, (int) (0.01 * vector.getCardinality()));
items.forEach(id -> {
var otherVec = adjacenciesData.getVector(id);
if (null == otherVec || otherVec == vector)
return true;
if (otherVec.getCardinality() < cardMin)
return true;
double similarity = cosineSimilarity(vector, otherVec);
if (similarity > 0.1) {
var recalculated = expensiveCosineSimilarity(vector, otherVec);
if (recalculated > 0.1) {
similarities.add(new DomainSimilarity(id, recalculated));
}
}
return true;
});
if (similarities.size() > 128) {
similarities.sort(Comparator.comparing(DomainSimilarity::value));
similarities.subList(0, similarities.size() - 128).clear();
}
andThen.accept(new DomainSimilarities(domainId, similarities));
}
}

View File

@ -0,0 +1,82 @@
package nu.marginalia.task;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import nu.marginalia.adjacencies.WebsiteAdjacenciesCalculator;
import nu.marginalia.extractor.AtagExporter;
import nu.marginalia.extractor.FeedExporter;
import nu.marginalia.extractor.SampleDataExporter;
import nu.marginalia.extractor.TermFrequencyExporter;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.mqapi.tasks.ExportTaskRequest;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExportTasksMain extends ProcessMainClass {
private static final Logger logger = LoggerFactory.getLogger(ExportTasksMain.class);
private final AtagExporter atagExporter;
private final FeedExporter feedExporter;
private final SampleDataExporter sampleDataExporter;
private final TermFrequencyExporter termFrequencyExporter;
private final WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator;
public static void main(String[] args) throws Exception {
var injector = Guice.createInjector(
new ServiceDiscoveryModule(),
new ProcessConfigurationModule("export-tasks"),
new DatabaseModule(false)
);
var exportTasks = injector.getInstance(ExportTasksMain.class);
Instructions<ExportTaskRequest> instructions = exportTasks.fetchInstructions(ExportTaskRequest.class);
try {
exportTasks.run(instructions.value());
instructions.ok();
}
catch (Exception e) {
logger.error("Error running export task", e);
instructions.err();
}
}
@Inject
public ExportTasksMain(MessageQueueFactory messageQueueFactory,
ProcessConfiguration config,
AtagExporter atagExporter,
FeedExporter feedExporter,
SampleDataExporter sampleDataExporter,
TermFrequencyExporter termFrequencyExporter,
Gson gson, WebsiteAdjacenciesCalculator websiteAdjacenciesCalculator)
{
super(messageQueueFactory, config, gson, ProcessInboxNames.EXPORT_TASK_INBOX);
this.atagExporter = atagExporter;
this.feedExporter = feedExporter;
this.sampleDataExporter = sampleDataExporter;
this.termFrequencyExporter = termFrequencyExporter;
this.websiteAdjacenciesCalculator = websiteAdjacenciesCalculator;
}
private void run(ExportTaskRequest request) throws Exception {
switch (request.task) {
case ATAGS: atagExporter.export(request.crawlId, request.destId); break;
case FEEDS: feedExporter.export(request.crawlId, request.destId); break;
case TERM_FREQ: termFrequencyExporter.export(request.crawlId, request.destId); break;
case SAMPLE_DATA: sampleDataExporter.export(request.crawlId, request.destId, request.size, request.name); break;
case ADJACENCIES: websiteAdjacenciesCalculator.export(); break;
}
}
}

View File

@ -22,7 +22,6 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:processes:process-mq-api')
implementation project(':code:common:process')
implementation project(':code:common:service')
implementation project(':code:common:db')
implementation project(':code:common:config')

View File

@ -1,11 +1,8 @@
package nu.marginalia.index;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import nu.marginalia.IndexLocations;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.index.construction.full.FullIndexConstructor;
import nu.marginalia.index.construction.prio.PrioIndexConstructor;
import nu.marginalia.index.domainrankings.DomainRankings;
@ -15,13 +12,11 @@ import nu.marginalia.index.journal.IndexJournal;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.mqapi.index.CreateIndexRequest;
import nu.marginalia.mqapi.index.IndexName;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
import org.slf4j.Logger;
@ -31,8 +26,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX;
@ -40,15 +33,12 @@ import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX;
public class IndexConstructorMain extends ProcessMainClass {
private final FileStorageService fileStorageService;
private final ProcessHeartbeatImpl heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final DomainRankings domainRankings;
private final int node;
private static final Logger logger = LoggerFactory.getLogger(IndexConstructorMain.class);
private final Gson gson = GsonFactory.get();
public static void main(String[] args) throws Exception {
CreateIndexInstructions instructions = null;
public static void main(String[] args) throws Exception {
Instructions<CreateIndexRequest> instructions = null;
try {
new org.mariadb.jdbc.Driver();
@ -58,9 +48,8 @@ public class IndexConstructorMain extends ProcessMainClass {
new DatabaseModule(false))
.getInstance(IndexConstructorMain.class);
instructions = main.fetchInstructions();
main.run(instructions);
instructions = main.fetchInstructions(CreateIndexRequest.class);
main.run(instructions.value());
instructions.ok();
}
catch (Exception ex) {
@ -85,17 +74,17 @@ public class IndexConstructorMain extends ProcessMainClass {
ProcessConfiguration processConfiguration,
DomainRankings domainRankings) {
super(messageQueueFactory, processConfiguration, GsonFactory.get(), INDEX_CONSTRUCTOR_INBOX);
this.fileStorageService = fileStorageService;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.domainRankings = domainRankings;
this.node = processConfiguration.node();
}
private void run(CreateIndexInstructions instructions) throws SQLException, IOException {
private void run(CreateIndexRequest instructions) throws SQLException, IOException {
heartbeat.start();
switch (instructions.name) {
switch (instructions.indexName()) {
case FORWARD -> createForwardIndex();
case REVERSE_FULL -> createFullReverseIndex();
case REVERSE_PRIO -> createPrioReverseIndex();
@ -171,52 +160,4 @@ public class IndexConstructorMain extends ProcessMainClass {
docId);
}
private static class CreateIndexInstructions {
public final IndexName name;
private final MqSingleShotInbox inbox;
private final MqMessage message;
private CreateIndexInstructions(IndexName name, MqSingleShotInbox inbox, MqMessage message) {
this.name = name;
this.inbox = inbox;
this.message = message;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private CreateIndexInstructions fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(INDEX_CONSTRUCTOR_INBOX, node, UUID.randomUUID());
logger.info("Waiting for instructions");
var msgOpt = getMessage(inbox, CreateIndexRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
var payload = gson.fromJson(msg.payload(), CreateIndexRequest.class);
var name = payload.indexName();
return new CreateIndexInstructions(name, inbox, msg);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
}

View File

@ -0,0 +1,77 @@
plugins {
id 'java'
id 'application'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(rootProject.ext.jvmVersion))
}
}
application {
mainClass = 'nu.marginalia.livecrawler.LiveCrawlerMain'
applicationName = 'live-crawler-process'
}
tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':code:common:db')
implementation project(':code:common:model')
implementation project(':code:common:config')
implementation project(':code:common:service')
implementation project(':code:common:linkdb')
implementation project(':code:functions:live-capture:api')
implementation project(':code:libraries:blocking-thread-pool')
implementation project(':code:index:api')
implementation project(':code:processes:process-mq-api')
implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh')
implementation project(':code:processes:crawling-process')
implementation project(':code:processes:crawling-process:model')
implementation project(':code:processes:converting-process')
implementation project(':code:processes:loading-process')
implementation project(':code:processes:crawling-process:ft-crawl-blocklist')
implementation project(':code:processes:crawling-process:ft-link-parser')
implementation project(':code:processes:crawling-process:ft-content-type')
implementation project(':third-party:commons-codec')
implementation libs.bundles.slf4j
implementation libs.notnull
implementation libs.guava
implementation dependencies.create(libs.guice.get()) {
exclude group: 'com.google.guava'
}
implementation libs.gson
implementation libs.zstd
implementation libs.jwarc
implementation libs.crawlercommons
implementation libs.okhttp3
implementation libs.jsoup
implementation libs.opencsv
implementation libs.fastutil
implementation libs.sqlite
implementation libs.bundles.mariadb
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation libs.commons.codec
testImplementation 'org.testcontainers:mariadb:1.17.4'
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
testImplementation project(':code:libraries:test-helpers')
}

View File

@ -0,0 +1,244 @@
package nu.marginalia.livecrawler;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawldata.CrawledDocument;
import nu.marginalia.model.crawldata.CrawledDomain;
import nu.marginalia.model.crawldata.SerializableCrawlData;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
/** Data access object for the live crawl database, a simple sqlite file */
public class LiveCrawlDataSet implements AutoCloseable {
private final Connection connection;
private final Path basePath;
public LiveCrawlDataSet(Path basePath) throws SQLException {
this.basePath = basePath;
this.connection = DriverManager.getConnection("jdbc:sqlite:" + basePath.resolve("live-crawl-data.db"));
this.connection.setAutoCommit(true);
try (var stmt = connection.createStatement()) {
stmt.execute("CREATE TABLE IF NOT EXISTS urls (url TEXT PRIMARY KEY, domainId LONG, body BLOB, headers BLOB, ip TEXT, timestamp long)");
stmt.execute("CREATE INDEX IF NOT EXISTS domainIdIndex ON urls (domainId)");
stmt.execute("CREATE TABLE IF NOT EXISTS badUrls (url TEXT PRIMARY KEY, timestamp long)");
}
}
public Path createWorkDir() throws IOException {
return Files.createTempDirectory(basePath, "work");
}
/** Remove entries older than the given timestamp */
public void prune(Instant cutoff) throws SQLException {
try (var stmt = connection.prepareStatement("DELETE FROM urls WHERE timestamp < ?")) {
stmt.setLong(1, cutoff.toEpochMilli());
stmt.executeUpdate();
}
try (var stmt = connection.prepareStatement("DELETE FROM badUrls WHERE timestamp < ?")) {
stmt.setLong(1, cutoff.toEpochMilli());
stmt.executeUpdate();
}
}
/** Check if the given URL is already in the database */
public boolean hasUrl(String url) throws SQLException {
try (var stmt = connection.prepareStatement("""
SELECT 1 FROM urls WHERE urls.url = ?
UNION
SELECT 1 FROM badUrls WHERE badUrls.url = ?
""");
) {
stmt.setString(1, url);
stmt.setString(2, url);
return stmt.executeQuery().next();
}
}
/** Check if the given URL is already in the database */
public boolean hasUrl(EdgeUrl url) throws SQLException {
return hasUrl(url.toString());
}
/** Save a document to the database */
public void saveDocument(int domainId, EdgeUrl url, String body, String headers, String ip) throws SQLException, IOException {
try (var stmt = connection.prepareStatement("""
INSERT OR REPLACE INTO urls (domainId, url, body, headers, ip, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
"""))
{
stmt.setInt(1, domainId);
stmt.setString(2, url.toString());
stmt.setBytes(3, compress(body));
stmt.setBytes(4, compress(headers));
stmt.setString(5, ip);
stmt.setLong(6, Instant.now().toEpochMilli());
stmt.executeUpdate();
}
}
/** Flag a URL as bad, i.e. it should not be revisited */
public void flagAsBad(EdgeUrl url) {
try (var stmt = connection.prepareStatement("""
INSERT OR IGNORE INTO badUrls (url, timestamp)
VALUES (?, ?)
"""))
{
stmt.setString(1, url.toString());
stmt.setLong(2, Instant.now().toEpochMilli());
stmt.executeUpdate();
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
private byte[] compress(String data) throws IOException {
// gzip compression
try (var bos = new ByteArrayOutputStream();
var gzip = new GZIPOutputStream(bos))
{
gzip.write(data.getBytes());
gzip.finish();
return bos.toByteArray();
}
}
private String decompress(byte[] data) {
// gzip decompression
try (var bis = new ByteArrayInputStream(data);
var gzip = new GZIPInputStream(bis))
{
return new String(gzip.readAllBytes());
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
}
/** Get the data in the database as a list of SerializableCrawlDataStream's, the
* format expected by the converter code.
*/
public Collection<SerializableCrawlDataStream> getDataStreams() throws SQLException {
List<Integer> domainIds = new ArrayList<>();
try (var stmt = connection.createStatement()) {
var rs = stmt.executeQuery("SELECT DISTINCT domainId FROM urls");
while (rs.next()) {
domainIds.add(rs.getInt(1));
}
}
List<SerializableCrawlDataStream> streams = new ArrayList<>();
for (var domainId : domainIds) {
streams.add(new WrappedDataStream(domainId));
}
return streams;
}
/** Wraps the data in the database as a SerializableCrawlDataStream.
* <p></p>
* This is a bit clunky as the interface is built intending the data
* to be a stream of objects being read from Parquet.
* */
private class WrappedDataStream implements SerializableCrawlDataStream {
private final int domainId;
private ArrayList<SerializableCrawlData> dataStack;
WrappedDataStream(int domainId) {
this.domainId = domainId;
this.dataStack = null;
}
/** Lazy initialization for the data being iterated over */
private void query() {
try (var stmt = connection.prepareStatement("""
SELECT url, body, headers, ip, timestamp
FROM urls
WHERE domainId = ?
""")) {
stmt.setInt(1, domainId);
var rs = stmt.executeQuery();
dataStack = new ArrayList<>();
while (rs.next()) {
String url = rs.getString("url");
String body = decompress(rs.getBytes("body"));
String headers = decompress(rs.getBytes("headers"));
dataStack.add(new CrawledDocument(
"LIVE",
url,
"text/html",
Instant.ofEpochMilli(rs.getLong("timestamp")).toString(),
200,
"OK",
"",
headers,
body,
false,
"",
""
));
}
var last = dataStack.getLast();
var domain = new CrawledDomain(
last.getDomain(),
null,
"OK",
"",
"0.0.0.0",
List.of(),
List.of()
);
// Add the domain as the last element, which will be the first
// element popped from the list
dataStack.addLast(domain);
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
@Override
public SerializableCrawlData next() throws IOException {
if (dataStack == null)
query();
return dataStack.removeLast();
}
@Override
public boolean hasNext() throws IOException {
if (dataStack == null) {
query();
}
return !dataStack.isEmpty();
}
@Override
public void close() throws Exception {
dataStack.clear();
}
}
@Override
public void close() throws Exception {
connection.close();
}
}

View File

@ -0,0 +1,224 @@
package nu.marginalia.livecrawler;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.WmsaHome;
import nu.marginalia.api.feeds.FeedsClient;
import nu.marginalia.converting.ConverterModule;
import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.io.SerializableCrawlDataStream;
import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService;
import nu.marginalia.loading.domains.DbDomainIdRegistry;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.crawling.LiveCrawlRequest;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.service.module.ServiceDiscoveryModule;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Security;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static nu.marginalia.mqapi.ProcessInboxNames.LIVE_CRAWLER_INBOX;
public class LiveCrawlerMain extends ProcessMainClass {
private static final Logger logger =
LoggerFactory.getLogger(LiveCrawlerMain.class);
private final FeedsClient feedsClient;
private final ProcessHeartbeat heartbeat;
private final DbDomainQueries domainQueries;
private final DomainBlacklist domainBlacklist;
private final DomainProcessor domainProcessor;
private final FileStorageService fileStorageService;
private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService;
private final HikariDataSource dataSource;
@Inject
public LiveCrawlerMain(FeedsClient feedsClient,
Gson gson,
ProcessConfiguration config,
ProcessHeartbeat heartbeat,
DbDomainQueries domainQueries,
DomainBlacklist domainBlacklist,
MessageQueueFactory messageQueueFactory,
DomainProcessor domainProcessor,
FileStorageService fileStorageService,
KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService, HikariDataSource dataSource)
throws Exception
{
super(messageQueueFactory, config, gson, LIVE_CRAWLER_INBOX);
this.feedsClient = feedsClient;
this.heartbeat = heartbeat;
this.domainQueries = domainQueries;
this.domainBlacklist = domainBlacklist;
this.domainProcessor = domainProcessor;
this.fileStorageService = fileStorageService;
this.keywordLoaderService = keywordLoaderService;
this.documentLoaderService = documentLoaderService;
this.dataSource = dataSource;
domainBlacklist.waitUntilLoaded();
}
public static void main(String... args) throws Exception {
// Prevent Java from caching DNS lookups forever (filling up the system RAM as a result)
Security.setProperty("networkaddress.cache.ttl", "3600");
// This must run *early*
System.setProperty("http.agent", WmsaHome.getUserAgent().uaString());
// If these aren't set properly, the JVM will hang forever on some requests
System.setProperty("sun.net.client.defaultConnectTimeout", "30000");
System.setProperty("sun.net.client.defaultReadTimeout", "30000");
// We don't want to use too much memory caching sessions for https
System.setProperty("javax.net.ssl.sessionCacheSize", "2048");
try {
Injector injector = Guice.createInjector(
new LiveCrawlerModule(),
new ProcessConfigurationModule("crawler"),
new ConverterModule(),
new ServiceDiscoveryModule(),
new DatabaseModule(false)
);
var crawler = injector.getInstance(LiveCrawlerMain.class);
Instructions<LiveCrawlRequest> instructions = crawler.fetchInstructions(LiveCrawlRequest.class);
try{
crawler.run();
instructions.ok();
} catch (Exception e) {
instructions.err();
throw e;
}
} catch (Exception e) {
logger.error("LiveCrawler failed", e);
System.exit(1);
}
System.exit(0);
}
enum LiveCrawlState {
PRUNE_DB,
FETCH_LINKS,
CRAWLING,
PROCESSING,
LOADING,
CONSTRUCTING,
DONE
}
private void run() throws Exception {
Path basePath = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE).asPath().resolve("live-crawl-data");
if (!Files.isDirectory(basePath)) {
Files.createDirectories(basePath);
}
run(basePath);
}
private void run(Path basePath) throws Exception {
try (var processHeartbeat = heartbeat.createProcessTaskHeartbeat(LiveCrawlState.class, "LiveCrawler");
LiveCrawlDataSet dataSet = new LiveCrawlDataSet(basePath))
{
final Instant cutoff = Instant.now().minus(60, ChronoUnit.DAYS);
processHeartbeat.progress(LiveCrawlState.FETCH_LINKS);
Map<String, List<String>> urlsPerDomain = new HashMap<>(10_000);
feedsClient.getUpdatedDomains(cutoff, urlsPerDomain::put);
logger.info("Fetched data for {} domains", urlsPerDomain.size());
processHeartbeat.progress(LiveCrawlState.PRUNE_DB);
// Remove data that is too old
dataSet.prune(cutoff);
processHeartbeat.progress(LiveCrawlState.CRAWLING);
try (SimpleLinkScraper fetcher = new SimpleLinkScraper(dataSet, domainQueries, domainBlacklist);
var hb = heartbeat.createAdHocTaskHeartbeat("Live Crawling"))
{
for (Map.Entry<String, List<String>> entry : hb.wrap("Fetching", urlsPerDomain.entrySet())) {
EdgeDomain domain = new EdgeDomain(entry.getKey());
List<String> urls = entry.getValue();
fetcher.scheduleRetrieval(domain, urls);
}
}
Path tempPath = dataSet.createWorkDir();
try {
processHeartbeat.progress(LiveCrawlState.PROCESSING);
try (var hb = heartbeat.createAdHocTaskHeartbeat("Processing");
var writer = new ConverterBatchWriter(tempPath, 0)
) {
// Offset the documents' ordinals toward the upper range, to avoid an ID collisions with the
// main indexes (the maximum permissible for doc ordinal is value is 67_108_863, so this
// leaves us with a lot of headroom still)
writer.setOrdinalOffset(67_000_000);
for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) {
writer.write(domainProcessor.sideloadProcessing(stream, 0));
}
}
processHeartbeat.progress(LiveCrawlState.LOADING);
LoaderInputData lid = new LoaderInputData(tempPath, 1);
DomainIdRegistry domainIdRegistry = new DbDomainIdRegistry(dataSource);
keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, lid);
documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, lid);
keywordLoaderService.close();
} finally {
FileUtils.deleteDirectory(tempPath.toFile());
}
// Construct the index
processHeartbeat.progress(LiveCrawlState.DONE);
}
}
}

View File

@ -0,0 +1,49 @@
package nu.marginalia.livecrawler;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Names;
import nu.marginalia.IndexLocations;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.service.ServiceId;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.UUID;
import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME;
public class LiveCrawlerModule extends AbstractModule {
public void configure() {
bind(UserAgent.class).toInstance(WmsaHome.getUserAgent());
bind(Path.class).annotatedWith(Names.named("local-index-path")).toInstance(Path.of(System.getProperty("local-index-path", "/vol")));
}
@Inject
@Provides @Singleton
private DocumentDbWriter createLinkdbWriter(FileStorageService service) throws SQLException, IOException {
// Migrate
Path dbPath = IndexLocations.getLinkdbWritePath(service).resolve(DOCDB_FILE_NAME);
if (Files.exists(dbPath)) {
Files.delete(dbPath);
}
return new DocumentDbWriter(dbPath);
}
@Singleton
@Provides
public ServiceConfiguration provideServiceConfiguration(ProcessConfiguration processConfiguration) {
return new ServiceConfiguration(ServiceId.NOT_A_SERVICE, processConfiguration.node(), null, null, -1, UUID.randomUUID());
}
}

View File

@ -0,0 +1,217 @@
package nu.marginalia.livecrawler;
import crawlercommons.robots.SimpleRobotRules;
import crawlercommons.robots.SimpleRobotRulesParser;
import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.util.SimpleBlockingThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/** A simple link scraper that fetches URLs and stores them in a database,
* with no concept of a crawl frontier, WARC output, or other advanced features
*/
public class SimpleLinkScraper implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(SimpleLinkScraper.class);
private final SimpleBlockingThreadPool pool = new SimpleBlockingThreadPool("LiveCrawler", 32, 10);
private final LinkParser lp = new LinkParser();
private final LiveCrawlDataSet dataSet;
private final DbDomainQueries domainQueries;
private final DomainBlacklist domainBlacklist;
private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(10);
public SimpleLinkScraper(LiveCrawlDataSet dataSet,
DbDomainQueries domainQueries,
DomainBlacklist domainBlacklist) {
this.dataSet = dataSet;
this.domainQueries = domainQueries;
this.domainBlacklist = domainBlacklist;
}
public void scheduleRetrieval(EdgeDomain domain, List<String> urls) {
var id = domainQueries.tryGetDomainId(domain);
if (id.isEmpty() || domainBlacklist.isBlacklisted(id.getAsInt())) {
return;
}
pool.submitQuietly(() -> retrieveNow(domain, id.getAsInt(), urls));
}
public void retrieveNow(EdgeDomain domain, int domainId, List<String> urls) throws Exception {
try (HttpClient client = HttpClient
.newBuilder()
.connectTimeout(connectTimeout)
.followRedirects(HttpClient.Redirect.NEVER)
.version(HttpClient.Version.HTTP_2)
.build()) {
EdgeUrl rootUrl = domain.toRootUrlHttps();
SimpleRobotRules rules = fetchRobotsRules(rootUrl, client);
if (rules == null) { // I/O error fetching robots.txt
// If we can't fetch the robots.txt,
for (var url : urls) {
lp.parseLink(rootUrl, url).ifPresent(this::maybeFlagAsBad);
}
return;
}
CrawlDelayTimer timer = new CrawlDelayTimer(rules.getCrawlDelay());
for (var url : urls) {
Optional<EdgeUrl> optParsedUrl = lp.parseLink(rootUrl, url);
if (optParsedUrl.isEmpty()) {
continue;
}
if (dataSet.hasUrl(optParsedUrl.get())) {
continue;
}
EdgeUrl parsedUrl = optParsedUrl.get();
if (!rules.isAllowed(url)) {
continue;
}
switch (fetchUrl(domainId, parsedUrl, timer, client)) {
case FetchResult.Success(int id, EdgeUrl docUrl, String body, String headers)
-> dataSet.saveDocument(id, docUrl, body, headers, "");
case FetchResult.Error(EdgeUrl docUrl) -> maybeFlagAsBad(docUrl);
}
}
}
}
private void maybeFlagAsBad(EdgeUrl url) {
// To give bad URLs a chance to be re-fetched, we only flag them as bad
// with a 20% probability. This will prevent the same bad URL being
// re-fetched over and over again for several months, but still allow
// us to *mostly* re-fetch it if it was just a transient error.
// There's of course the chance we immediately flag it as bad on an
// unlucky roll, but you know, that's xcom baby
if (ThreadLocalRandom.current().nextDouble(0, 1) < 0.2) {
dataSet.flagAsBad(url);
}
}
@Nullable
private SimpleRobotRules fetchRobotsRules(EdgeUrl rootUrl, HttpClient client) throws IOException, InterruptedException, URISyntaxException {
var robotsRequest = HttpRequest.newBuilder(rootUrl.withPathAndParam("/robots.txt", null).asURI())
.GET()
.header("User-Agent", WmsaHome.getUserAgent().uaString())
.timeout(readTimeout);
// Fetch the robots.txt
try {
SimpleRobotRulesParser parser = new SimpleRobotRulesParser();
HttpResponse<byte[]> robotsTxt = client.send(robotsRequest.build(), HttpResponse.BodyHandlers.ofByteArray());
if (robotsTxt.statusCode() == 200) {
return parser.parseContent(rootUrl.toString(),
robotsTxt.body(),
robotsTxt.headers().firstValue("Content-Type").orElse("text/plain"),
WmsaHome.getUserAgent().uaIdentifier());
}
else if (robotsTxt.statusCode() == 404) {
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
}
}
catch (IOException ex) {
logger.error("Error fetching robots.txt for {}: {} {}", rootUrl, ex.getClass().getSimpleName(), ex.getMessage());
}
return null;
}
/** Fetch a URL and store it in the database
*/
private FetchResult fetchUrl(int domainId, EdgeUrl parsedUrl, CrawlDelayTimer timer, HttpClient client) throws Exception {
timer.waitFetchDelay();
HttpRequest request = HttpRequest.newBuilder(parsedUrl.asURI())
.GET()
.header("User-Agent", WmsaHome.getUserAgent().uaString())
.header("Accept", "text/html")
.timeout(readTimeout)
.build();
try {
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
// Handle rate limiting by waiting and retrying once
if (response.statusCode() == 429) {
timer.waitRetryDelay(new HttpFetcherImpl.RateLimitException(
response.headers().firstValue("Retry-After").orElse("5")
));
response = client.send(request, HttpResponse.BodyHandlers.ofString());
}
String contentType = response.headers().firstValue("Content-Type").orElse("").toLowerCase();
if (response.statusCode() == 200) {
if (!contentType.toLowerCase().startsWith("text/html")) {
return new FetchResult.Error(parsedUrl);
}
String body = response.body();
if (body.length() > 1024 * 1024) {
return new FetchResult.Error(parsedUrl);
}
return new FetchResult.Success(domainId, parsedUrl, body, headersToString(response.headers()));
}
}
catch (IOException ex) {
// We don't want a full stack trace on every error, as it's quite common and very noisy
logger.error("Error fetching URL {}: {} {}", parsedUrl, ex.getClass().getSimpleName(), ex.getMessage());
}
return new FetchResult.Error(parsedUrl);
}
sealed interface FetchResult {
record Success(int domainId, EdgeUrl url, String body, String headers) implements FetchResult {}
record Error(EdgeUrl url) implements FetchResult {}
}
private String headersToString(HttpHeaders headers) {
StringBuilder headersStr = new StringBuilder();
headers.map().forEach((k, v) -> {
headersStr.append(k).append(": ").append(v).append("\n");
});
return headersStr.toString();
}
@Override
public void close() throws Exception {
pool.shutDown();
for (int i = 0; i < 4; i++) {
pool.awaitTermination(1, TimeUnit.HOURS);
}
pool.shutDownNow();
}
}

View File

@ -0,0 +1,92 @@
package nu.marginalia.livecrawler;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawldata.CrawledDocument;
import nu.marginalia.model.crawldata.CrawledDomain;
import nu.marginalia.model.crawldata.SerializableCrawlData;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
public class LiveCrawlDataSetTest {
@Test
public void testGetDataSet() throws Exception {
Path tempDir = Files.createTempDirectory("live-crawl-data-set-test");
try (LiveCrawlDataSet dataSet = new LiveCrawlDataSet(tempDir)) {
Assertions.assertFalse(dataSet.hasUrl("https://www.example.com/"));
dataSet.saveDocument(
1,
new EdgeUrl("https://www.example.com/"),
"test",
"test",
"test"
);
Assertions.assertTrue(dataSet.hasUrl("https://www.example.com/"));
var streams = dataSet.getDataStreams();
Assertions.assertEquals(1, streams.size());
var stream = streams.iterator().next();
List<SerializableCrawlData> data = new ArrayList<>();
while (stream.hasNext()) {
data.add(stream.next());
}
int dataCount = 0;
int domainCount = 0;
for (var item : data) {
switch (item) {
case CrawledDomain domain -> {
domainCount++;
Assertions.assertEquals("www.example.com", domain.getDomain());
}
case CrawledDocument document -> {
dataCount++;
Assertions.assertEquals("https://www.example.com/", document.url);
Assertions.assertEquals("test", document.documentBody);
}
}
}
Assertions.assertEquals(1, dataCount);
Assertions.assertEquals(1, domainCount);
}
finally {
FileUtils.deleteDirectory(tempDir.toFile());
}
}
@Test
public void testHasUrl() throws Exception {
Path tempDir = Files.createTempDirectory("live-crawl-data-set-test");
try (LiveCrawlDataSet dataSet = new LiveCrawlDataSet(tempDir)) {
Assertions.assertFalse(dataSet.hasUrl("https://www.example.com/"));
dataSet.saveDocument(
1,
new EdgeUrl("https://www.example.com/saved"),
"test",
"test",
"test"
);
Assertions.assertTrue(dataSet.hasUrl("https://www.example.com/saved"));
dataSet.flagAsBad(new EdgeUrl("https://www.example.com/bad"));
Assertions.assertTrue(dataSet.hasUrl("https://www.example.com/bad"));
Assertions.assertFalse(dataSet.hasUrl("https://www.example.com/notPresent"));
}
finally {
FileUtils.deleteDirectory(tempDir.toFile());
}
}
}

Some files were not shown because too many files have changed in this diff Show More