From 467ba5be201ceba9c9acb3b15f3aa5f6d1abb8a8 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Tue, 6 Feb 2024 17:20:07 +0100 Subject: [PATCH] (index-construction) Split repartition into two actions This change splits the previous 'repartition' action into two steps, one for recalculating the domain rankings, and one for recalculating the other ranking sets. Since only the first is necessary before the index construction, the rest can be delayed until after... To avoid issues in handling the shotgun blast of MqNotifications, Service was switched over to use a synchronous message queue instead of an asynchronous one. The change also modifies the behavior so that only node 1 will push the changes to the EC_DOMAIN database table, to avoid unnecessary db locks and contention with the loader. Additionally, the change fixes a bug where the index construction code wasn't actually picking up the rankings data. Since the index construction used to be performed by the index-service, merely saving the data to memory was enough for it to be accessible within the index-construction logic, but since it's been broken out into a separate process, the new process just injected an empty DomainRankings object instead. To fix this, DomainRankings can now be persisted to disk, and a pre-loaded version of the object is injected into the index-construction process. --- .../marginalia/index/client/IndexClient.java | 8 +++ .../index/client/IndexMqEndpoints.java | 1 + .../nu/marginalia/service/server/Service.java | 2 +- .../nu/marginalia/ranking/DomainRankings.java | 53 +++++++++++++++++++ .../index/IndexConstructorMain.java | 2 +- .../index/IndexConstructorModule.java | 17 ++++-- .../actor/task/ConvertAndLoadActor.java | 27 +++++----- .../actor/task/RestoreBackupActor.java | 2 +- .../nu/marginalia/index/IndexService.java | 8 +++ .../marginalia/index/svc/IndexOpsService.java | 11 ++-- .../index/svc/IndexSearchSetsService.java | 31 ++++++++--- 11 files changed, 134 insertions(+), 28 deletions(-) diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java index 8b747a85..808e2a1f 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java @@ -85,4 +85,12 @@ public class IndexClient extends AbstractDynamicClient { null ); } + + public long triggerRerank(int node) throws Exception { + return messageQueueFactory.sendSingleShotRequest( + ServiceId.Index.withNode(node), + IndexMqEndpoints.INDEX_RERANK, + null + ); + } } diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java index 733fd242..0a4635eb 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java @@ -2,6 +2,7 @@ package nu.marginalia.index.client; public class IndexMqEndpoints { public static final String INDEX_IS_BLOCKED = "INDEX-IS-BLOCKED"; + public static final String INDEX_RERANK = "INDEX-RERANK"; public static final String INDEX_REPARTITION = "INDEX-REPARTITION"; public static final String SWITCH_INDEX = "SWITCH-INDEX"; diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/Service.java b/code/common/service/src/main/java/nu/marginalia/service/server/Service.java index 604d1941..63552cf7 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/server/Service.java +++ b/code/common/service/src/main/java/nu/marginalia/service/server/Service.java @@ -53,7 +53,7 @@ public class Service { logger.info("Inbox name: {}", inboxName); var mqInboxFactory = params.messageQueueInboxFactory; - messageQueueInbox = mqInboxFactory.createAsynchronousInbox(inboxName, config.node(), config.instanceUuid()); + messageQueueInbox = mqInboxFactory.createSynchronousInbox(inboxName, config.node(), config.instanceUuid()); messageQueueInbox.subscribe(new ServiceMqSubscription(this)); serviceName = System.getProperty("service-name"); diff --git a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/DomainRankings.java b/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/DomainRankings.java index 0ca2db77..db5321b1 100644 --- a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/DomainRankings.java +++ b/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/DomainRankings.java @@ -6,11 +6,19 @@ import nu.marginalia.model.id.UrlIdCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + import static java.lang.Math.max; import static java.lang.Math.min; public class DomainRankings { private final Int2ShortOpenHashMap rankings; + private static final Logger logger = LoggerFactory.getLogger(DomainRankings.class); private final int MAX_MEANINGFUL_RANK = 50_000; private final int MAX_RANK_VALUE = 255; @@ -25,6 +33,51 @@ public class DomainRankings { values.forEach(this::putRanking); } + private static final String name = "_rankings.dat"; + + public void save(Path basePath) { + Path fileName = basePath.resolve(name); + + logger.info("Saving domain rankings to {}", fileName); + + try (DataOutputStream dos = new DataOutputStream(Files.newOutputStream(fileName, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING))) + { + rankings.forEach((domainId, rank) -> { + try { + dos.writeInt(domainId); + dos.writeShort(rank); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void load(Path basePath) { + Path fileName = basePath.resolve(name); + + logger.info("Loading domain rankings from {}", fileName); + + try (DataInputStream dis = new DataInputStream(Files.newInputStream(fileName))) { + rankings.clear(); + for (;;) { + int domainId = dis.readInt(); + short rank = dis.readShort(); + rankings.put(domainId, rank); + } + } + catch (EOFException e) { + // ok + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private void putRanking(int domainId, int value) { rankings.put(domainId, scaleRank(value)); } diff --git a/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java index 299a1445..779e2573 100644 --- a/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java +++ b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java @@ -97,7 +97,7 @@ public class IndexConstructorMain extends ProcessMainClass { heartbeat.start(); switch (instructions.name) { - case FORWARD -> createForwardIndex(); + case FORWARD -> createForwardIndex(); case REVERSE_FULL -> createFullReverseIndex(); case REVERSE_PRIO -> createPrioReverseIndex(); } diff --git a/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorModule.java b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorModule.java index 647c8ef4..025e90eb 100644 --- a/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorModule.java +++ b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorModule.java @@ -1,12 +1,23 @@ package nu.marginalia.index; import com.google.inject.AbstractModule; -import nu.marginalia.ProcessConfiguration; - -import java.util.UUID; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import nu.marginalia.IndexLocations; +import nu.marginalia.ranking.DomainRankings; +import nu.marginalia.storage.FileStorageService; public class IndexConstructorModule extends AbstractModule { @Override public void configure() { } + + @Provides @Singleton + public DomainRankings getDomainRankings(FileStorageService fileStorageService) { + var rankings = new DomainRankings(); + + rankings.load(IndexLocations.getSearchSetsPath(fileStorageService)); + + return rankings; + } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertAndLoadActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertAndLoadActor.java index 86918c55..62aa8929 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertAndLoadActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertAndLoadActor.java @@ -38,7 +38,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype { // STATES - public static final String REPARTITION = "REPARTITION"; + public static final String RERANK = "RERANK"; private final ActorProcessWatcher processWatcher; private final MqOutbox mqConverterOutbox; private final MqOutbox mqLoaderOutbox; @@ -46,7 +46,6 @@ public class ConvertAndLoadActor extends RecordActorPrototype { private final MqOutbox indexOutbox; private final FileStorageService storageService; private final BackupService backupService; - private final Gson gson; private final NodeConfigurationService nodeConfigurationService; private final int nodeId; @@ -74,14 +73,14 @@ public class ConvertAndLoadActor extends RecordActorPrototype { @Resume(behavior = ActorResumeBehavior.RETRY) public record Backup(List processedIds) implements ActorStep { } @Resume(behavior = ActorResumeBehavior.RETRY) - public record Repartition(long id) implements ActorStep { public Repartition() { this(-1); } } + public record Rerank(long id) implements ActorStep { public Rerank() { this(-1); } } @Resume(behavior = ActorResumeBehavior.RETRY) public record ReindexFwd(long id) implements ActorStep { public ReindexFwd() { this(-1); } } @Resume(behavior = ActorResumeBehavior.RETRY) public record ReindexFull(long id) implements ActorStep { public ReindexFull() { this(-1); } } @Resume(behavior = ActorResumeBehavior.RETRY) public record ReindexPrio(long id) implements ActorStep { public ReindexPrio() { this(-1); } } - public record SwitchOver() implements ActorStep {} + public record SwitchIndex() implements ActorStep {} @Override public ActorStep transition(ActorStep self) throws Exception { @@ -129,11 +128,11 @@ public class ConvertAndLoadActor extends RecordActorPrototype { } case Backup(List processedIds) -> { backupService.createBackupFromStaging(processedIds); - yield new Repartition(); + yield new Rerank(); } - case Repartition(long id) when id < 0 -> - new Repartition(indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, "")); - case Repartition(long id) -> { + case Rerank(long id) when id < 0 -> + new Rerank(indexOutbox.sendAsync(IndexMqEndpoints.INDEX_RERANK, "")); + case Rerank(long id) -> { var rsp = indexOutbox.waitResponse(id); if (rsp.state() != MqMessageState.OK) { yield new Error("Repartition failed"); @@ -166,12 +165,15 @@ public class ConvertAndLoadActor extends RecordActorPrototype { if (rsp.state() != MqMessageState.OK) yield new Error("Repartition failed"); else - yield new SwitchOver(); + yield new SwitchIndex(); } - case SwitchOver() -> { - indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_INDEX, ":^D"); - indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_LINKDB, ":-)"); + case SwitchIndex() -> { + indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_INDEX, "here"); + indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_LINKDB, "we"); + + // Defer repartitioning the domains until after the index has been switched + indexOutbox.sendNotice(IndexMqEndpoints.INDEX_REPARTITION, "go"); yield new End(); } @@ -207,7 +209,6 @@ public class ConvertAndLoadActor extends RecordActorPrototype { this.mqIndexConstructorOutbox = processOutboxes.getIndexConstructorOutbox(); this.storageService = storageService; this.backupService = backupService; - this.gson = gson; this.nodeConfigurationService = nodeConfigurationService; this.nodeId = serviceConfiguration.node(); diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RestoreBackupActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RestoreBackupActor.java index 44ce496e..1a25a9cb 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RestoreBackupActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RestoreBackupActor.java @@ -29,7 +29,7 @@ public class RestoreBackupActor extends RecordActorPrototype { ExecutorActor.CONVERT_AND_LOAD.id(node), null, null, - ConvertAndLoadActor.REPARTITION, + ConvertAndLoadActor.RERANK, "", null); diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java index 325b132c..b5f5a7f8 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java @@ -97,6 +97,14 @@ public class IndexService extends Service { volatile boolean initialized = false; + @MqRequest(endpoint = IndexMqEndpoints.INDEX_RERANK) + public String rerank(String message) { + if (!opsService.rerank()) { + throw new IllegalStateException("Ops lock busy"); + } + return "ok"; + } + @MqRequest(endpoint = IndexMqEndpoints.INDEX_REPARTITION) public String repartition(String message) { if (!opsService.repartition()) { diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java index 4f1830c8..632621b0 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java @@ -30,9 +30,14 @@ public class IndexOpsService { return opsLock.isLocked(); } - public boolean repartition() { - return run(searchSetService::recalculateAll); + public boolean rerank() { + return run(searchSetService::recalculatePrimaryRank); } + + public boolean repartition() { + return run(searchSetService::recalculateSecondary); + } + public boolean switchIndex() throws Exception { return run(index::switchIndex).isPresent(); } @@ -40,7 +45,7 @@ public class IndexOpsService { public Object repartitionEndpoint(Request request, Response response) throws Exception { - if (!run(searchSetService::recalculateAll)) { + if (!run(searchSetService::recalculateSecondary)) { Spark.halt(503, "Operations busy"); } diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexSearchSetsService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexSearchSetsService.java index 0d95a087..a6defb3a 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexSearchSetsService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexSearchSetsService.java @@ -21,6 +21,7 @@ import nu.marginalia.ranking.DomainRankings; import nu.marginalia.index.db.DbUpdateRanks; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceHeartbeat; +import nu.marginalia.service.module.ServiceConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,12 +45,14 @@ public class IndexSearchSetsService { private final ConcurrentHashMap rankingSets = new ConcurrentHashMap<>(); // Below are binary indices that are used to constrain a search private final SearchSet anySet = new SearchSetAny(); + private final int nodeId; // The ranking value of the domains used in sorting the domains private volatile DomainRankings domainRankings = new DomainRankings(); @Inject public IndexSearchSetsService(DomainTypes domainTypes, + ServiceConfiguration serviceConfiguration, ServiceHeartbeat heartbeat, RankingDomainFetcher rankingDomains, RankingDomainFetcherForSimilarityData similarityDomains, @@ -57,6 +60,7 @@ public class IndexSearchSetsService { ServiceEventLog eventLog, DomainRankingSetsService domainRankingSetsService, DbUpdateRanks dbUpdateRanks) throws IOException { + this.nodeId = serviceConfiguration.node(); this.domainTypes = domainTypes; this.heartbeat = heartbeat; this.indexServicesFactory = indexServicesFactory; @@ -102,14 +106,25 @@ public class IndexSearchSetsService { return Objects.requireNonNull(rankingSets.get(searchSetIdentifier), "Unknown search set"); } - public void recalculateAll() { + /** Recalculates the primary ranking set. This gets baked into the identifiers in the index, effectively + * changing their sort order, so it's important to run this before reconstructing the indices. */ + public void recalculatePrimaryRank() { + try { + domainRankingSetsService.get("RANK").ifPresent(this::updateDomainRankings); + eventLog.logEvent("RANKING-SET-RECALCULATED", "RANK"); + } catch (SQLException e) { + logger.warn("Failed to primary ranking set", e); + } + } + + public void recalculateSecondary() { for (var rankingSet : domainRankingSetsService.getAll()) { try { if (DomainRankingSetsService.DomainSetAlgorithm.SPECIAL.equals(rankingSet.algorithm())) { switch (rankingSet.name()) { case "BLOGS" -> recalculateBlogsSet(rankingSet); - case "RANK" -> updateDomainRankings(rankingSet); - case "NONE" -> {} + case "RANK" -> {} // Skipped, handled via recalculatePrimaryRank() + case "NONE" -> {} // No-op } } else { recalculateNornal(rankingSet); @@ -173,9 +188,13 @@ public class IndexSearchSetsService { domainRankings = new DomainRankings(ranks); } - // The EC_DOMAIN table has a field that reflects the rank, this needs to be set for search result ordering to - // make sense - dbUpdateRanks.execute(ranks); + domainRankings.save(indexServicesFactory.getSearchSetsBase()); + + if (nodeId == 1) { + // The EC_DOMAIN table has a field that reflects the rank, this needs to be set for search result ordering to + // make sense, but only do this on the primary node to avoid excessive db locks + dbUpdateRanks.execute(ranks); + } } }