diff --git a/build.gradle b/build.gradle index 49bf7b98..ea0db6be 100644 --- a/build.gradle +++ b/build.gradle @@ -26,6 +26,14 @@ tasks.register('dist', Copy) { from tarTree("$buildDir/dist/loader-process.tar") into "$projectDir/run/dist/" } + copy { + from tarTree("$buildDir/dist/website-adjacencies-calculator.tar") + into "$projectDir/run/dist/" + } + copy { + from tarTree("$buildDir/dist/crawl-job-extractor-process.tar") + into "$projectDir/run/dist/" + } } } idea { diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java index eb43f9cb..cc2e74fd 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -93,12 +93,12 @@ public class ControlService extends Service { Spark.post("/public/fsms/:fsm/start", controlActorService::startFsm, redirectToProcesses); Spark.post("/public/fsms/:fsm/stop", controlActorService::stopFsm, redirectToProcesses); - Spark.post("/public/storage/:fid/crawl", controlActorService::triggerCrawling, redirectToProcesses); Spark.post("/public/storage/:fid/recrawl", controlActorService::triggerRecrawling, redirectToProcesses); Spark.post("/public/storage/:fid/process", controlActorService::triggerProcessing, redirectToProcesses); Spark.post("/public/storage/:fid/load", controlActorService::loadProcessedData, redirectToProcesses); + Spark.post("/public/storage/specs", controlActorService::createCrawlSpecification, redirectToStorage); Spark.post("/public/storage/:fid/delete", controlFileStorageService::flagFileForDeletionRequest, redirectToStorage); Spark.get("/public/:resource", this::serveStatic); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java index bfa90be1..052ca2cb 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java @@ -4,13 +4,11 @@ import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; import lombok.SneakyThrows; -import nu.marginalia.control.actor.task.CrawlActor; -import nu.marginalia.control.actor.task.RecrawlActor; +import nu.marginalia.control.actor.task.*; import nu.marginalia.control.model.Actor; import nu.marginalia.control.actor.monitor.*; import nu.marginalia.control.actor.monitor.ConverterMonitorActor; import nu.marginalia.control.actor.monitor.LoaderMonitorActor; -import nu.marginalia.control.actor.task.ReconvertAndLoadActor; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mqsm.StateMachine; @@ -45,7 +43,9 @@ public class ControlActors { LoaderMonitorActor loaderMonitor, MessageQueueMonitorActor messageQueueMonitor, ProcessLivenessMonitorActor processMonitorFSM, - FileStorageMonitorActor fileStorageMonitorActor + FileStorageMonitorActor fileStorageMonitorActor, + TriggerAdjacencyCalculationActor triggerAdjacencyCalculationActor, + CrawlJobExtractorActor crawlJobExtractorActor ) { this.messageQueueFactory = messageQueueFactory; this.eventLog = baseServiceParams.eventLog; @@ -60,6 +60,8 @@ public class ControlActors { register(Actor.MESSAGE_QUEUE_MONITOR, messageQueueMonitor); register(Actor.PROCESS_LIVENESS_MONITOR, processMonitorFSM); register(Actor.FILE_STORAGE_MONITOR, fileStorageMonitorActor); + register(Actor.ADJACENCY_CALCULATION, triggerAdjacencyCalculationActor); + register(Actor.CRAWL_JOB_EXTRACTOR, crawlJobExtractorActor); } private void register(Actor process, AbstractStateGraph graph) { diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlJobExtractorActor.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlJobExtractorActor.java new file mode 100644 index 00000000..df86da38 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/CrawlJobExtractorActor.java @@ -0,0 +1,135 @@ +package nu.marginalia.control.actor.task; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.control.svc.ControlFileStorageService; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.db.storage.FileStorageService; +import nu.marginalia.db.storage.model.FileStorage; +import nu.marginalia.db.storage.model.FileStorageBaseType; +import nu.marginalia.db.storage.model.FileStorageType; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.mqsm.graph.ResumeBehavior; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +@Singleton +public class CrawlJobExtractorActor extends AbstractStateGraph { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + // STATES + + public static final String INITIAL = "INITIAL"; + public static final String CREATE_FROM_DB = "CREATE_FROM_DB"; + public static final String CREATE_FROM_LINK = "CREATE_FROM_LINK"; + public static final String END = "END"; + private final ProcessService processService; + private final FileStorageService fileStorageService; + private final ControlFileStorageService controlFileStorageService; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @Inject + public CrawlJobExtractorActor(StateFactory stateFactory, + ProcessService processService, + FileStorageService fileStorageService, + ControlFileStorageService controlFileStorageService + ) { + super(stateFactory); + this.processService = processService; + this.fileStorageService = fileStorageService; + this.controlFileStorageService = controlFileStorageService; + } + + public record CrawlJobExtractorArguments(String description) { } + public record CrawlJobExtractorArgumentsWithURL(String description, String url) { } + @GraphState(name = INITIAL, next = END) + public void initial() throws Exception { error("This state does nothing"); } + + @GraphState(name = CREATE_FROM_LINK, next = END, + resume = ResumeBehavior.ERROR, + description = """ + Download a list of URLs as provided, + and then spawn a CrawlJobExtractor process, + then wait for it to finish. + """ + ) + public void createFromFromLink(CrawlJobExtractorArgumentsWithURL arg) throws Exception { + if (arg == null) { + error("This actor requires a CrawlJobExtractorArgumentsWithURL argument"); + } + + var base = fileStorageService.getStorageBase(FileStorageBaseType.SLOW); + var storage = fileStorageService.allocateTemporaryStorage(base, FileStorageType.CRAWL_SPEC, "crawl-spec", arg.description()); + + Path urlsTxt = storage.asPath().resolve("urls.txt"); + + try (var os = Files.newOutputStream(urlsTxt, StandardOpenOption.CREATE_NEW); + var is = new URL(arg.url()).openStream()) + { + is.transferTo(os); + } + catch (Exception ex) { + controlFileStorageService.flagFileForDeletion(storage.id()); + error("Error downloading " + arg.url()); + } + + final Path path = storage.asPath(); + + run(storage, path.resolve("crawler.spec").toString(), + "-f", urlsTxt.toString()); + } + + + @GraphState(name = CREATE_FROM_DB, next = END, + resume = ResumeBehavior.ERROR, + description = """ + Spawns a CrawlJobExtractor process that loads data from the link database, and wait for it to finish. + """ + ) + public void createFromDB(CrawlJobExtractorArguments arg) throws Exception { + if (arg == null) { + error("This actor requires a CrawlJobExtractorArguments argument"); + } + + var base = fileStorageService.getStorageBase(FileStorageBaseType.SLOW); + var storage = fileStorageService.allocateTemporaryStorage(base, FileStorageType.CRAWL_SPEC, "crawl-spec", arg.description()); + + final Path path = storage.asPath(); + + run(storage, + path.resolve("crawler.spec").toString()); + } + + private void run(FileStorage storage, String... args) throws Exception { + + AtomicBoolean hasError = new AtomicBoolean(false); + var future = executor.submit(() -> { + try { + processService.trigger(ProcessService.ProcessId.CRAWL_JOB_EXTRACTOR, + args); + } + catch (Exception ex) { + logger.warn("Error in creating crawl job", ex); + hasError.set(true); + } + }); + future.get(); + + if (hasError.get()) { + controlFileStorageService.flagFileForDeletion(storage.id()); + error("Error triggering adjacency calculation"); + } + + } + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/TriggerAdjacencyCalculationActor.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/TriggerAdjacencyCalculationActor.java new file mode 100644 index 00000000..8861cc07 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/TriggerAdjacencyCalculationActor.java @@ -0,0 +1,59 @@ +package nu.marginalia.control.actor.task; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.mqsm.graph.ResumeBehavior; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +@Singleton +public class TriggerAdjacencyCalculationActor extends AbstractStateGraph { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + // STATES + + private static final String INITIAL = "INITIAL"; + private static final String END = "END"; + private final ProcessService processService; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @Inject + public TriggerAdjacencyCalculationActor(StateFactory stateFactory, + ProcessService processService) { + super(stateFactory); + this.processService = processService; + } + + @GraphState(name = INITIAL, next = END, + resume = ResumeBehavior.ERROR, + description = """ + Spawns a WebsitesAdjacenciesCalculator process and waits for it to finish. + """ + ) + public void init() throws Exception { + AtomicBoolean hasError = new AtomicBoolean(false); + var future = executor.submit(() -> { + try { + processService.trigger(ProcessService.ProcessId.ADJACENCIES_CALCULATOR, "load"); + } + catch (Exception ex) { + logger.warn("Error triggering adjacency calculation", ex); + hasError.set(true); + } + }); + future.get(); + + if (hasError.get()) { + error("Error triggering adjacency calculation"); + } + } + +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/Actor.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/Actor.java index 83d0b810..755d67a1 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/Actor.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/Actor.java @@ -9,7 +9,9 @@ public enum Actor { CRAWLER_MONITOR, MESSAGE_QUEUE_MONITOR, PROCESS_LIVENESS_MONITOR, - FILE_STORAGE_MONITOR + FILE_STORAGE_MONITOR, + ADJACENCY_CALCULATION, + CRAWL_JOB_EXTRACTOR ; diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java index 47640dde..9b0b8b0a 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java @@ -42,6 +42,8 @@ public record ProcessHeartbeat( case "converter" -> ProcessService.ProcessId.CONVERTER; case "crawler" -> ProcessService.ProcessId.CRAWLER; case "loader" -> ProcessService.ProcessId.LOADER; + case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR; + case "crawl-job-extractor" -> ProcessService.ProcessId.CRAWL_JOB_EXTRACTOR; default -> throw new RuntimeException("Unknown process base: " + processBase); }; } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java index c7bab07f..25461e58 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ControlActorService.java @@ -3,6 +3,7 @@ package nu.marginalia.control.svc; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.control.actor.ControlActors; +import nu.marginalia.control.actor.task.CrawlJobExtractorActor; import nu.marginalia.control.actor.task.ReconvertAndLoadActor; import nu.marginalia.control.actor.task.RecrawlActor; import nu.marginalia.control.model.Actor; @@ -94,4 +95,27 @@ public class ControlActorService { }).toList(); } + public Object createCrawlSpecification(Request request, Response response) throws Exception { + final String description = request.queryParams("description"); + final String url = request.queryParams("url"); + final String source = request.queryParams("source"); + + if ("db".equals(source)) { + controlActors.startFrom(Actor.CRAWL_JOB_EXTRACTOR, + CrawlJobExtractorActor.CREATE_FROM_DB, + new CrawlJobExtractorActor.CrawlJobExtractorArguments(description) + ); + } + else if ("download".equals(source)) { + controlActors.startFrom(Actor.CRAWL_JOB_EXTRACTOR, + CrawlJobExtractorActor.CREATE_FROM_LINK, + new CrawlJobExtractorActor.CrawlJobExtractorArgumentsWithURL(description, url) + ); + } + else { + throw new IllegalArgumentException("Unknown source: " + source); + } + + return ""; + } } \ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java index 032f2c23..0281ed43 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java @@ -7,7 +7,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; -import spark.utils.IOUtils; import javax.inject.Inject; import javax.inject.Singleton; @@ -33,7 +32,11 @@ public class ProcessService { public enum ProcessId { CRAWLER("crawler-process/bin/crawler-process"), CONVERTER("converter-process/bin/converter-process"), - LOADER("loader-process/bin/loader-process"); + LOADER("loader-process/bin/loader-process"), + ADJACENCIES_CALCULATOR("website-adjacencies-calculator/bin/website-adjacencies-calculator"), + CRAWL_JOB_EXTRACTOR("crawl-job-extractor-process/bin/crawl-job-extractor-process"), + + ; public final String path; ProcessId(String path) { @@ -49,10 +52,17 @@ public class ProcessService { } public boolean trigger(ProcessId processId) throws Exception { + return trigger(processId, new String[0]); + } + + public boolean trigger(ProcessId processId, String... parameters) throws Exception { String processPath = processPath(processId); - String[] args = new String[] { - processPath - }; + String[] args = new String[parameters.length + 1]; + + args[0] = processPath; + for (int i = 0; i < parameters.length; i++) + args[i+1] = parameters[i]; + String[] env = env(); Process process; diff --git a/code/tools/crawl-job-extractor/src/main/java/nu/marginalia/crawl/CrawlJobExtractorMain.java b/code/tools/crawl-job-extractor/src/main/java/nu/marginalia/crawl/CrawlJobExtractorMain.java index e898293b..9693e2ae 100644 --- a/code/tools/crawl-job-extractor/src/main/java/nu/marginalia/crawl/CrawlJobExtractorMain.java +++ b/code/tools/crawl-job-extractor/src/main/java/nu/marginalia/crawl/CrawlJobExtractorMain.java @@ -28,6 +28,8 @@ public class CrawlJobExtractorMain { return; } + // TODO (2023-06-26) figure out whether this needs a ProcessHeartbeat + String[] targetDomains = getTargetDomains(Arrays.copyOfRange(args, 1, args.length)); try (CrawlJobSpecWriter out = new CrawlJobSpecWriter(outFile)) diff --git a/code/tools/website-adjacencies-calculator/build.gradle b/code/tools/website-adjacencies-calculator/build.gradle index 99fca87e..90b20e73 100644 --- a/code/tools/website-adjacencies-calculator/build.gradle +++ b/code/tools/website-adjacencies-calculator/build.gradle @@ -19,6 +19,7 @@ java { dependencies { implementation project(':code:common:model') implementation project(':code:common:db') + implementation project(':code:common:process') implementation project(':code:common:service') implementation libs.lombok diff --git a/code/tools/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java b/code/tools/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java index f6a4022f..12348543 100644 --- a/code/tools/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java +++ b/code/tools/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java @@ -2,9 +2,11 @@ package nu.marginalia.adjacencies; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; +import nu.marginalia.ProcessConfiguration; import nu.marginalia.db.DbDomainQueries; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.id.EdgeId; +import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.service.module.DatabaseModule; import java.sql.SQLException; @@ -58,30 +60,22 @@ public class WebsiteAdjacenciesCalculator { } @SneakyThrows - public void loadAll() { + public void loadAll(ProcessHeartbeat processHeartbeat) { AdjacenciesLoader loader = new AdjacenciesLoader(dataSource); - var executor = Executors.newFixedThreadPool(16); - var ids = adjacenciesData.getIdsList(); - - ProgressPrinter progressPrinter = new ProgressPrinter(ids.size()); - progressPrinter.start(); - + int total = adjacenciesData.getIdsList().size(); + AtomicInteger progress = new AtomicInteger(0); IntStream.of(adjacenciesData.getIdsList().toArray()).parallel() .filter(domainAliases::isNotAliased) .forEach(id -> { findAdjacent(id, loader::load); - progressPrinter.advance(); + processHeartbeat.setProgress(progress.incrementAndGet() / (double) total); }); - progressPrinter.stop(); - executor.shutdown(); System.out.println("Waiting for wrap-up"); loader.stop(); - - } private static class ProgressPrinter { @@ -192,10 +186,19 @@ public class WebsiteAdjacenciesCalculator { public static void main(String[] args) throws SQLException { DatabaseModule dm = new DatabaseModule(); - var main = new WebsiteAdjacenciesCalculator(dm.provideConnection()); + var dataSource = dm.provideConnection(); + + var main = new WebsiteAdjacenciesCalculator(dataSource); if (args.length == 1 && "load".equals(args[0])) { - main.loadAll(); + var processHeartbeat = new ProcessHeartbeat( + new ProcessConfiguration("website-adjacencies-calculator", 0, UUID.randomUUID()), + dataSource + ); + + processHeartbeat.start(); + main.loadAll(processHeartbeat); + processHeartbeat.shutDown(); return; }