diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java index 9d2476f8..f8349eb7 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexMqEndpoints.java @@ -3,6 +3,8 @@ package nu.marginalia.index.client; public class IndexMqEndpoints { public static final String INDEX_IS_BLOCKED = "INDEX-IS-BLOCKED"; public static final String INDEX_REPARTITION = "INDEX-REPARTITION"; + + public static final String INDEX_RELOAD_LEXICON = "INDEX-RELOAD-LEXICON"; public static final String INDEX_REINDEX = "INDEX-REINDEX"; } diff --git a/code/api/search-api/build.gradle b/code/api/search-api/build.gradle index 8c38b5f3..ba00a702 100644 --- a/code/api/search-api/build.gradle +++ b/code/api/search-api/build.gradle @@ -14,6 +14,7 @@ java { dependencies { implementation project(':code:common:model') implementation project(':code:common:config') + implementation project(':code:common:message-queue') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') diff --git a/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchClient.java b/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchClient.java index 393fa285..69e011bd 100644 --- a/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchClient.java +++ b/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchClient.java @@ -5,6 +5,8 @@ import com.google.inject.Singleton; import io.reactivex.rxjava3.core.Observable; import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.model.gson.GsonFactory; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.search.client.model.ApiSearchResults; import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.id.ServiceId; @@ -16,14 +18,30 @@ import org.slf4j.LoggerFactory; import javax.annotation.CheckReturnValue; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.UUID; @Singleton public class SearchClient extends AbstractDynamicClient { private final Logger logger = LoggerFactory.getLogger(getClass()); + private final MqOutbox outbox; + @Inject - public SearchClient(ServiceDescriptors descriptors) { + public SearchClient(ServiceDescriptors descriptors, + MqPersistence persistence) { + super(descriptors.forId(ServiceId.Search), WmsaHome.getHostsFile(), GsonFactory::get); + + String inboxName = ServiceId.Search.name + ":" + "0"; + String outboxName = System.getProperty("service-name", UUID.randomUUID().toString()); + + outbox = new MqOutbox(persistence, inboxName, outboxName, UUID.randomUUID()); + + } + + + public MqOutbox outbox() { + return outbox; } @CheckReturnValue diff --git a/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchMqEndpoints.java b/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchMqEndpoints.java new file mode 100644 index 00000000..1c546b3e --- /dev/null +++ b/code/api/search-api/src/main/java/nu/marginalia/search/client/SearchMqEndpoints.java @@ -0,0 +1,6 @@ +package nu.marginalia.search.client; + +public class SearchMqEndpoints { + /** Flushes the URL caches, run if significant changes have occurred in the URLs database */ + public static final String FLUSH_CACHES = "FLUSH_CACHES"; +} diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryData.java b/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryData.java index 830ed4a7..ea291052 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryData.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryData.java @@ -9,7 +9,6 @@ public class DictionaryData { public DictionaryData(int bankSize) { this.bankSize = bankSize; - banks.add(new DictionaryDataBank(0, bankSize)); } @@ -36,4 +35,8 @@ public class DictionaryData { return banks.get(offset/ bankSize).keyEquals(offset, otherKey); } + public void clear() { + banks.clear(); + banks.add(new DictionaryDataBank(0, bankSize)); + } } diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryMap.java b/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryMap.java index dc904441..1f9525a2 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryMap.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/dict/DictionaryMap.java @@ -7,6 +7,8 @@ public interface DictionaryMap { return new OnHeapDictionaryMap(); } + void clear(); + int size(); int put(long key); diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/dict/OffHeapDictionaryHashMap.java b/code/features-index/lexicon/src/main/java/nu/marginalia/dict/OffHeapDictionaryHashMap.java index e17c9c19..6a7aa07f 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/dict/OffHeapDictionaryHashMap.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/dict/OffHeapDictionaryHashMap.java @@ -58,6 +58,13 @@ public class OffHeapDictionaryHashMap implements DictionaryMap { } } + @Override + public void clear() { + dictionaryData.clear(); + initializeBuffers(); + sz.set(0); + } + @Override public int size() { return sz.get(); diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/dict/OnHeapDictionaryMap.java b/code/features-index/lexicon/src/main/java/nu/marginalia/dict/OnHeapDictionaryMap.java index 3b70e7e4..96dd5d13 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/dict/OnHeapDictionaryMap.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/dict/OnHeapDictionaryMap.java @@ -6,6 +6,11 @@ public class OnHeapDictionaryMap implements DictionaryMap { private static final int DEFAULT_SIZE = Integer.getInteger("lexiconSizeHint", 100_000); private final Long2IntOpenHashMap entries = new Long2IntOpenHashMap(DEFAULT_SIZE, 0.75f); + @Override + public void clear() { + entries.clear(); + } + @Override public int size() { return entries.size(); diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexicon.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexicon.java index 40f9d73b..bd88efc8 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexicon.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexicon.java @@ -9,6 +9,7 @@ import nu.marginalia.lexicon.journal.KeywordLexiconJournal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -46,6 +47,12 @@ public class KeywordLexicon implements AutoCloseable { logger.info("Done creating dictionary writer"); } + public void reload() throws IOException { + logger.info("Reloading dictionary writer"); + journal.loadFile(bytes -> reverseIndex.put(hashFunction.hashBytes(bytes).padToLong())); + logger.info("Done reloading dictionary writer"); + } + public int getOrInsert(String macroWord) { return getOrInsert(macroWord.getBytes(StandardCharsets.UTF_8)); } diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexiconReadOnlyView.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexiconReadOnlyView.java index 9cdef151..ba7983a5 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexiconReadOnlyView.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/KeywordLexiconReadOnlyView.java @@ -4,6 +4,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import lombok.SneakyThrows; +import java.io.IOException; import java.util.concurrent.TimeUnit; public class KeywordLexiconReadOnlyView { @@ -21,4 +22,8 @@ public class KeywordLexiconReadOnlyView { return cache.get(word, () -> writer.getReadOnly(word)); } + public boolean reload() throws IOException { + writer.reload(); + return true; + } } diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournal.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournal.java index 84a23247..013f2c49 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournal.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournal.java @@ -64,6 +64,7 @@ public class KeywordLexiconJournal { } public void loadFile(Consumer loadJournalEntry) throws IOException { + journalFile.rewind(); journalFile.loadFile(loadJournalEntry); } } diff --git a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFile.java b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFile.java index 7473e4df..f7404296 100644 --- a/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFile.java +++ b/code/features-index/lexicon/src/main/java/nu/marginalia/lexicon/journal/KeywordLexiconJournalFile.java @@ -27,6 +27,10 @@ public class KeywordLexiconJournalFile implements AutoCloseable { this.journalFile = journalFile; } + public void rewind() throws IOException { + journalFileRAF.seek(0); + } + public void loadFile(Consumer acceptEntry) throws IOException { if (!journalFile.exists()) { logger.info("File {} does not exist, can't load", journalFile); diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java index 82ed2617..a0ff5582 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java @@ -74,6 +74,17 @@ public class IndexService extends Service { volatile boolean initialized = false; + @MqRequest(endpoint = IndexMqEndpoints.INDEX_RELOAD_LEXICON) + public String reloadLexicon(String message) throws Exception { + + if (!opsService.reloadLexicon()) { + throw new IllegalStateException("Ops lock busy"); + } + + return "ok"; + } + + @MqRequest(endpoint = IndexMqEndpoints.INDEX_REPARTITION) public String repartition(String message) { if (!opsService.repartition()) { diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java index 36377c7c..31192d37 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java @@ -3,11 +3,13 @@ package nu.marginalia.index.svc; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.index.index.SearchIndex; +import nu.marginalia.lexicon.KeywordLexiconReadOnlyView; import spark.Request; import spark.Response; import spark.Spark; import javax.annotation.CheckReturnValue; +import java.io.IOException; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.locks.ReentrantLock; @@ -18,12 +20,15 @@ public class IndexOpsService { private final SearchIndex index; private final IndexSearchSetsService searchSetService; + private final KeywordLexiconReadOnlyView lexicon; @Inject public IndexOpsService(SearchIndex index, - IndexSearchSetsService searchSetService) { + IndexSearchSetsService searchSetService, + KeywordLexiconReadOnlyView lexicon) { this.index = index; this.searchSetService = searchSetService; + this.lexicon = lexicon; } public boolean isBusy() { @@ -36,6 +41,9 @@ public class IndexOpsService { public boolean reindex() throws Exception { return run(index::switchIndex).isPresent(); } + public boolean reloadLexicon() throws Exception { + return run(lexicon::reload).isPresent(); + } public Object repartitionEndpoint(Request request, Response response) throws Exception { @@ -80,5 +88,6 @@ public class IndexOpsService { } } + } diff --git a/code/services-core/search-service/src/main/java/nu/marginalia/search/SearchService.java b/code/services-core/search-service/src/main/java/nu/marginalia/search/SearchService.java index 61ff69c3..b6e7a5d2 100644 --- a/code/services-core/search-service/src/main/java/nu/marginalia/search/SearchService.java +++ b/code/services-core/search-service/src/main/java/nu/marginalia/search/SearchService.java @@ -6,9 +6,12 @@ import lombok.SneakyThrows; import nu.marginalia.WebsiteUrl; import nu.marginalia.client.Context; import nu.marginalia.model.gson.GsonFactory; +import nu.marginalia.search.client.SearchMqEndpoints; +import nu.marginalia.search.db.DbUrlDetailsQuery; import nu.marginalia.search.svc.SearchFrontPageService; import nu.marginalia.search.svc.*; import nu.marginalia.service.server.*; +import nu.marginalia.service.server.mq.MqRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import spark.Request; @@ -21,6 +24,7 @@ import java.nio.charset.StandardCharsets; public class SearchService extends Service { private final WebsiteUrl websiteUrl; + private final DbUrlDetailsQuery dbUrlDetailsQuery; private final StaticResources staticResources; private static final Logger logger = LoggerFactory.getLogger(SearchService.class); @@ -29,6 +33,7 @@ public class SearchService extends Service { @Inject public SearchService(BaseServiceParams params, WebsiteUrl websiteUrl, + DbUrlDetailsQuery dbUrlDetailsQuery, StaticResources staticResources, SearchFrontPageService frontPageService, SearchErrorPageService errorPageService, @@ -40,6 +45,7 @@ public class SearchService extends Service { super(params); this.websiteUrl = websiteUrl; + this.dbUrlDetailsQuery = dbUrlDetailsQuery; this.staticResources = staticResources; Spark.staticFiles.expireTime(600); @@ -70,6 +76,13 @@ public class SearchService extends Service { Spark.awaitInitialization(); } + @MqRequest(endpoint = SearchMqEndpoints.FLUSH_CACHES) + public String flushCaches(String unusedArg) { + logger.info("Flushing caches"); + dbUrlDetailsQuery.clearCaches(); + return "OK"; + } + private Object serveStatic(Request request, Response response) { String resource = request.params("resource"); staticResources.serveStatic("search", resource, request, response); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java index 1b329b97..be4b22ca 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java @@ -11,6 +11,8 @@ import nu.marginalia.mqsm.StateFactory; import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.mqsm.graph.ResumeBehavior; +import nu.marginalia.search.client.SearchClient; +import nu.marginalia.search.client.SearchMqEndpoints; import java.nio.file.Files; import java.nio.file.Path; @@ -25,14 +27,25 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph { private static final String RECONVERT = "RECONVERT"; private static final String LOAD = "LOAD"; private static final String MOVE_INDEX_FILES = "MOVE_INDEX_FILES"; + private static final String RELOAD_LEXICON = "RELOAD_LEXICON"; + private static final String RELOAD_LEXICON_WAIT = "RELOAD_LEXICON_WAIT"; + private static final String FLUSH_CACHES = "FLUSH_CACHES"; private static final String END = "END"; private final ProcessService processService; + private final MqOutbox mqIndexOutbox; + private final MqOutbox mqSearchOutbox; @Inject - public ReconvertAndLoadProcess(StateFactory stateFactory, ProcessService processService) { + public ReconvertAndLoadProcess(StateFactory stateFactory, + ProcessService processService, + IndexClient indexClient, + SearchClient searchClient + ) { super(stateFactory); this.processService = processService; + this.mqIndexOutbox = indexClient.outbox(); + this.mqSearchOutbox = searchClient.outbox(); } @GraphState(name = INITIAL, next = RECONVERT) @@ -62,8 +75,8 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph { error(); } - @GraphState(name = MOVE_INDEX_FILES, next = END, resume = ResumeBehavior.ERROR) - public String moveIndexFiles(String crawlJob) throws Exception { + @GraphState(name = MOVE_INDEX_FILES, next = RELOAD_LEXICON, resume = ResumeBehavior.ERROR) + public void moveIndexFiles(String crawlJob) throws Exception { Path indexData = Path.of("/vol/index.dat"); Path indexDest = Path.of("/vol/iw/0/page-index.dat"); @@ -71,7 +84,28 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph { error("Index data not found"); Files.move(indexData, indexDest, StandardCopyOption.REPLACE_EXISTING); + } - return crawlJob; + @GraphState(name = RELOAD_LEXICON, next = RELOAD_LEXICON_WAIT, resume = ResumeBehavior.ERROR) + public long reloadLexicon() throws Exception { + return mqIndexOutbox.sendAsync(IndexMqEndpoints.INDEX_RELOAD_LEXICON, ""); + } + + @GraphState(name = RELOAD_LEXICON_WAIT, next = FLUSH_CACHES, resume = ResumeBehavior.RETRY) + public void reloadLexiconWait(long id) throws Exception { + var rsp = mqIndexOutbox.waitResponse(id); + + if (rsp.state() != MqMessageState.OK) { + error("RELOAD_LEXICON failed"); + } + } + + @GraphState(name = FLUSH_CACHES, next = END, resume = ResumeBehavior.RETRY) + public void flushCaches() throws Exception { + var rsp = mqSearchOutbox.send(SearchMqEndpoints.FLUSH_CACHES, ""); + + if (rsp.state() != MqMessageState.OK) { + error("FLUSH_CACHES failed"); + } } }