diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java index 5bdf453a..6e735010 100644 --- a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java @@ -15,7 +15,7 @@ public class AbstractDynamicClient extends AbstractClient { public AbstractDynamicClient(@Nonnull ServiceDescriptor service, Supplier gsonProvider) { super( service, - 10, + 10000, gsonProvider ); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java index 1cf0dda6..5275e76d 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java @@ -6,6 +6,9 @@ import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype; import nu.marginalia.actor.state.ActorResumeBehavior; import nu.marginalia.actor.state.ActorState; +import nu.marginalia.model.gson.GsonFactory; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.model.NodeConfiguration; @@ -13,18 +16,22 @@ import org.jetbrains.annotations.NotNull; import java.sql.SQLException; import java.util.*; +import com.google.gson.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RebalanceActor extends AbstractActorPrototype { // States public static final String INIT = "INIT"; - public static final String CALCULATE_TRANSACTIONS = "CALCULATE_TRANSACTIONS"; public static final String END = "END"; + private static final Logger logger = LoggerFactory.getLogger(RebalanceActor.class); + private final NodeConfigurationService nodeConfigurationService; private final MqPersistence mqPersistence; private final HikariDataSource dataSource; - + private final Gson gson = GsonFactory.get(); @Override public String describe() { return "Rebalances crawl data among the nodes"; @@ -41,16 +48,10 @@ public class RebalanceActor extends AbstractActorPrototype { this.dataSource = dataSource; } - @ActorState(name= INIT, next = CALCULATE_TRANSACTIONS, resume = ActorResumeBehavior.ERROR, - description = "Fetches the number of domains assigned to each eligible processing node") - public List getPopulations() throws Exception { - return getNodePopulations(); - } - - @ActorState(name= CALCULATE_TRANSACTIONS, next = END, resume = ActorResumeBehavior.ERROR, - description = "Calculates how many domains to re-assign between the processing nodes" - ) - public List calculateTransactions(List populations) { + @ActorState(name= INIT, next = END, resume = ActorResumeBehavior.ERROR, + description = "Rebalance!") + public void doIt() throws Exception { + var populations = getNodePopulations(); if (populations.size() <= 1) { transition(END); @@ -91,7 +92,16 @@ public class RebalanceActor extends AbstractActorPrototype { } } - return actions; + for (var action : actions) { + var outbox = new MqOutbox(mqPersistence, "executor-service", action.dest, getClass().getSimpleName(), 0, UUID.randomUUID()); + var msg = outbox.send("TRANSFER-DOMAINS", + gson.toJson(Map.of("sourceNode", action.donor, "count", action.c))); + if (msg.state() != MqMessageState.OK) { + logger.error("ERROR! {}", msg); + } + outbox.stop(); + } + } private List getNodePopulations() throws SQLException { @@ -163,6 +173,8 @@ public class RebalanceActor extends AbstractActorPrototype { } } + public record Populations(List pops) { + } public record Pop(int node, int count) { } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/TransferDomainsActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/TransferDomainsActor.java index 0310f208..b0e6fde6 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/TransferDomainsActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/TransferDomainsActor.java @@ -8,35 +8,18 @@ import lombok.NoArgsConstructor; import lombok.With; import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype; -import nu.marginalia.actor.state.ActorResumeBehavior; import nu.marginalia.actor.state.ActorState; -import nu.marginalia.client.Context; import nu.marginalia.executor.client.ExecutorClient; -import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.persistence.MqPersistence; -import nu.marginalia.mqapi.ProcessInboxNames; -import nu.marginalia.process.ProcessOutboxes; -import nu.marginalia.process.log.WorkLog; import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.storage.FileStorageService; -import nu.marginalia.storage.model.FileStorageBaseType; -import nu.marginalia.storage.model.FileStorageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.UUID; - @Singleton public class TransferDomainsActor extends AbstractActorPrototype { - - // STATES public static final String INITIAL = "INITIAL"; - public static final String TRANSFER_DOMAINS = "TRANSFER-DOMAINS"; - public static final String UPDATE_DONOR_LOG = "UPDATE_DONOR_LOG"; public static final String END = "END"; private final FileStorageService storageService; @@ -77,101 +60,13 @@ public class TransferDomainsActor extends AbstractActorPrototype { } @ActorState(name = INITIAL, - next = TRANSFER_DOMAINS, + next = END, description = """ - Ensure preconditions are met + Transfer the domains """) - public Message init(Message message) throws Exception { - var storages = storageService.getOnlyActiveFileStorage(FileStorageType.CRAWL_DATA); + public void init(Message message) throws Exception { - // Ensure crawl data exists to receive into - if (storages.isEmpty()) { - var storage = storageService.allocateTemporaryStorage( - storageService.getStorageBase(FileStorageBaseType.STORAGE), - FileStorageType.CRAWL_DATA, - "crawl-data", - "Crawl Data" - ); - storageService.enableFileStorage(storage.id()); - } - - return message; } - @ActorState(name = TRANSFER_DOMAINS, - next = UPDATE_DONOR_LOG, - resume = ActorResumeBehavior.ERROR, - description = """ - Do the needful - """ - ) - public Message transferData(Message message) throws Exception { - var storageId = storageService - .getOnlyActiveFileStorage(FileStorageType.CRAWL_DATA) - .orElseThrow(AssertionError::new); // This Shouldn't Happen (tm) - var storage = storageService.getStorage(storageId); - - var spec = executorClient.getTransferSpec(Context.internal(), message.sourceNode, message.count); - if (spec.size() == 0) { - transition("END", "NOTHING TO TRANSFER"); - } - - Path basePath = storage.asPath(); - try (var workLog = new WorkLog(basePath.resolve("crawler.log")); - var conn = dataSource.getConnection(); - var stmt = conn.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY=? WHERE ID=?"); - ) { - for (var item : spec.items()) { - logger.info("{}", item); - logger.info("Transferring {}", item.domainName()); - - Path dest = basePath.resolve(item.path()); - Files.createDirectories(dest.getParent()); - try (var fileStream = Files.newOutputStream(dest)) { - executorClient.transferFile(Context.internal(), - message.sourceNode, - item.fileStorageId(), - item.path(), - fileStream); - - stmt.setInt(1, nodeId); - stmt.setInt(2, item.domainId()); - stmt.executeUpdate(); - - executorClient.yieldDomain(Context.internal(), message.sourceNode, item); - workLog.setJobToFinished(item.domainName(), item.path(), 1); - } - catch (IOException ex) { - Files.deleteIfExists(dest); - error(ex); - } - catch (Exception ex) { - error(ex); - } - } - } - - return message; - } - - @ActorState(name = UPDATE_DONOR_LOG, - next = END, - resume = ActorResumeBehavior.ERROR, - description = """ - Do the needful - """ - ) - public void updateDonorLog(Message message) throws InterruptedException { - var outbox = new MqOutbox(persistence, executorServiceName, message.sourceNode, - getClass().getSimpleName(), nodeId, UUID.randomUUID()); - - try { - outbox.send("PRUNE-CRAWL-DATA", ":-)"); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - outbox.stop(); - } - } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java index ecfe901d..edc9afc8 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; // Weird name for this one to not have clashes with java.util.concurrent.ExecutorService public class ExecutorSvc extends Service { private final BaseServiceParams params; + private final Gson gson; private final ExecutorActorControlService actorControlService; private final FileStorageService fileStorageService; private final TransferService transferService; @@ -51,6 +52,7 @@ public class ExecutorSvc extends Service { ActorApi actorApi) { super(params); this.params = params; + this.gson = gson; this.actorControlService = actorControlService; this.fileStorageService = fileStorageService; this.transferService = transferService; @@ -92,9 +94,26 @@ public class ExecutorSvc extends Service { actorControlService.start(ExecutorActor.PROC_INDEX_CONSTRUCTOR_SPAWNER); actorControlService.start(ExecutorActor.PROC_LOADER_SPAWNER); } + + @MqRequest(endpoint="TRANSFER-DOMAINS") + public String transferDomains(String message) throws Exception { + + var spec = gson.fromJson(message, TransferService.TransferReq.class); + + synchronized (this) { + transferService.transferMqEndpoint(spec.sourceNode(), spec.count()); + } + + return "OK"; + } + + @MqRequest(endpoint="PRUNE-CRAWL-DATA") public String pruneCrawlData(String message) throws SQLException, IOException { - transferService.pruneCrawlDataMqEndpoint(); + + synchronized (this) { // would not be great if this ran in parallel with itself + transferService.pruneCrawlDataMqEndpoint(); + } return "OK"; } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java index ca9e90e2..0700fbc9 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java @@ -4,13 +4,18 @@ import com.google.gson.Gson; import com.google.inject.Inject; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; +import nu.marginalia.client.Context; +import nu.marginalia.executor.client.ExecutorClient; import nu.marginalia.executor.model.transfer.TransferItem; import nu.marginalia.executor.model.transfer.TransferSpec; import nu.marginalia.executor.storage.FileStorageContent; import nu.marginalia.executor.storage.FileStorageFile; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.process.log.WorkLog; import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageBaseType; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageType; import org.apache.commons.io.FileUtils; @@ -27,11 +32,15 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.UUID; public class TransferService { private final Gson gson; private final FileStorageService fileStorageService; private final HikariDataSource dataSource; + private final ExecutorClient executorClient; + private final MqPersistence persistence; + private final String executorServiceName; private final int nodeId; private static final Logger logger = LoggerFactory.getLogger(TransferService.class); @@ -40,12 +49,15 @@ public class TransferService { Gson gson, FileStorageService fileStorageService, HikariDataSource dataSource, - ServiceConfiguration config) + ExecutorClient executorClient, MqPersistence persistence, ServiceConfiguration config) { this.gson = gson; this.fileStorageService = fileStorageService; this.dataSource = dataSource; + this.executorClient = executorClient; + this.persistence = persistence; this.nodeId = config.node(); + this.executorServiceName = config.serviceName(); } public Object transferFile(Request request, Response response) throws SQLException, IOException { @@ -169,4 +181,78 @@ public class TransferService { Files.move(newCrawlLogPath, oldCrawlLogPath, StandardCopyOption.REPLACE_EXISTING); } + + public void transferMqEndpoint(int sourceNode, int count) throws Exception { + var storages = fileStorageService.getOnlyActiveFileStorage(FileStorageType.CRAWL_DATA); + + // Ensure crawl data exists to receive into + if (storages.isEmpty()) { + var storage = fileStorageService.allocateTemporaryStorage( + fileStorageService.getStorageBase(FileStorageBaseType.STORAGE), + FileStorageType.CRAWL_DATA, + "crawl-data", + "Crawl Data" + ); + fileStorageService.enableFileStorage(storage.id()); + } + + var storageId = fileStorageService + .getOnlyActiveFileStorage(FileStorageType.CRAWL_DATA) + .orElseThrow(AssertionError::new); // This Shouldn't Happen (tm) + + var storage = fileStorageService.getStorage(storageId); + + var spec = executorClient.getTransferSpec(Context.internal(), sourceNode, count); + if (spec.size() == 0) { + return; + } + + Path basePath = storage.asPath(); + try (var workLog = new WorkLog(basePath.resolve("crawler.log")); + var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY=? WHERE ID=?"); + ) { + for (var item : spec.items()) { + logger.info("{}", item); + logger.info("Transferring {}", item.domainName()); + + Path dest = basePath.resolve(item.path()); + Files.createDirectories(dest.getParent()); + try (var fileStream = Files.newOutputStream(dest)) { + executorClient.transferFile(Context.internal(), + sourceNode, + item.fileStorageId(), + item.path(), + fileStream); + + stmt.setInt(1, nodeId); + stmt.setInt(2, item.domainId()); + stmt.executeUpdate(); + + executorClient.yieldDomain(Context.internal(), sourceNode, item); + workLog.setJobToFinished(item.domainName(), item.path(), 1); + } + catch (IOException ex) { + Files.deleteIfExists(dest); + throw new RuntimeException(ex); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + + var outbox = new MqOutbox(persistence, executorServiceName, sourceNode, + getClass().getSimpleName(), nodeId, UUID.randomUUID()); + + try { + outbox.send("PRUNE-CRAWL-DATA", ":-)"); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + outbox.stop(); + } + } + + public record TransferReq(int sourceNode, int count) { } }