(control, WIP) Run reconvert-load from converter :D

This commit is contained in:
Viktor Lofgren 2023-07-11 18:05:37 +02:00
parent 77261a38cd
commit 88b9ec70c6
16 changed files with 135 additions and 7 deletions

View File

@ -3,6 +3,8 @@ package nu.marginalia.index.client;
public class IndexMqEndpoints { public class IndexMqEndpoints {
public static final String INDEX_IS_BLOCKED = "INDEX-IS-BLOCKED"; public static final String INDEX_IS_BLOCKED = "INDEX-IS-BLOCKED";
public static final String INDEX_REPARTITION = "INDEX-REPARTITION"; public static final String INDEX_REPARTITION = "INDEX-REPARTITION";
public static final String INDEX_RELOAD_LEXICON = "INDEX-RELOAD-LEXICON";
public static final String INDEX_REINDEX = "INDEX-REINDEX"; public static final String INDEX_REINDEX = "INDEX-REINDEX";
} }

View File

@ -14,6 +14,7 @@ java {
dependencies { dependencies {
implementation project(':code:common:model') implementation project(':code:common:model')
implementation project(':code:common:config') implementation project(':code:common:config')
implementation project(':code:common:message-queue')
implementation project(':code:common:service-discovery') implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client') implementation project(':code:common:service-client')

View File

@ -5,6 +5,8 @@ import com.google.inject.Singleton;
import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observable;
import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.search.client.model.ApiSearchResults; import nu.marginalia.search.client.model.ApiSearchResults;
import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId; import nu.marginalia.service.id.ServiceId;
@ -16,14 +18,30 @@ import org.slf4j.LoggerFactory;
import javax.annotation.CheckReturnValue; import javax.annotation.CheckReturnValue;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Singleton @Singleton
public class SearchClient extends AbstractDynamicClient { public class SearchClient extends AbstractDynamicClient {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final MqOutbox outbox;
@Inject @Inject
public SearchClient(ServiceDescriptors descriptors) { public SearchClient(ServiceDescriptors descriptors,
MqPersistence persistence) {
super(descriptors.forId(ServiceId.Search), WmsaHome.getHostsFile(), GsonFactory::get); super(descriptors.forId(ServiceId.Search), WmsaHome.getHostsFile(), GsonFactory::get);
String inboxName = ServiceId.Search.name + ":" + "0";
String outboxName = System.getProperty("service-name", UUID.randomUUID().toString());
outbox = new MqOutbox(persistence, inboxName, outboxName, UUID.randomUUID());
}
public MqOutbox outbox() {
return outbox;
} }
@CheckReturnValue @CheckReturnValue

View File

@ -0,0 +1,6 @@
package nu.marginalia.search.client;
public class SearchMqEndpoints {
/** Flushes the URL caches, run if significant changes have occurred in the URLs database */
public static final String FLUSH_CACHES = "FLUSH_CACHES";
}

View File

@ -9,7 +9,6 @@ public class DictionaryData {
public DictionaryData(int bankSize) { public DictionaryData(int bankSize) {
this.bankSize = bankSize; this.bankSize = bankSize;
banks.add(new DictionaryDataBank(0, bankSize)); banks.add(new DictionaryDataBank(0, bankSize));
} }
@ -36,4 +35,8 @@ public class DictionaryData {
return banks.get(offset/ bankSize).keyEquals(offset, otherKey); return banks.get(offset/ bankSize).keyEquals(offset, otherKey);
} }
public void clear() {
banks.clear();
banks.add(new DictionaryDataBank(0, bankSize));
}
} }

View File

@ -7,6 +7,8 @@ public interface DictionaryMap {
return new OnHeapDictionaryMap(); return new OnHeapDictionaryMap();
} }
void clear();
int size(); int size();
int put(long key); int put(long key);

View File

@ -58,6 +58,13 @@ public class OffHeapDictionaryHashMap implements DictionaryMap {
} }
} }
@Override
public void clear() {
dictionaryData.clear();
initializeBuffers();
sz.set(0);
}
@Override @Override
public int size() { public int size() {
return sz.get(); return sz.get();

View File

@ -6,6 +6,11 @@ public class OnHeapDictionaryMap implements DictionaryMap {
private static final int DEFAULT_SIZE = Integer.getInteger("lexiconSizeHint", 100_000); private static final int DEFAULT_SIZE = Integer.getInteger("lexiconSizeHint", 100_000);
private final Long2IntOpenHashMap entries = new Long2IntOpenHashMap(DEFAULT_SIZE, 0.75f); private final Long2IntOpenHashMap entries = new Long2IntOpenHashMap(DEFAULT_SIZE, 0.75f);
@Override
public void clear() {
entries.clear();
}
@Override @Override
public int size() { public int size() {
return entries.size(); return entries.size();

View File

@ -9,6 +9,7 @@ import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -46,6 +47,12 @@ public class KeywordLexicon implements AutoCloseable {
logger.info("Done creating dictionary writer"); logger.info("Done creating dictionary writer");
} }
public void reload() throws IOException {
logger.info("Reloading dictionary writer");
journal.loadFile(bytes -> reverseIndex.put(hashFunction.hashBytes(bytes).padToLong()));
logger.info("Done reloading dictionary writer");
}
public int getOrInsert(String macroWord) { public int getOrInsert(String macroWord) {
return getOrInsert(macroWord.getBytes(StandardCharsets.UTF_8)); return getOrInsert(macroWord.getBytes(StandardCharsets.UTF_8));
} }

View File

@ -4,6 +4,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class KeywordLexiconReadOnlyView { public class KeywordLexiconReadOnlyView {
@ -21,4 +22,8 @@ public class KeywordLexiconReadOnlyView {
return cache.get(word, () -> writer.getReadOnly(word)); return cache.get(word, () -> writer.getReadOnly(word));
} }
public boolean reload() throws IOException {
writer.reload();
return true;
}
} }

View File

@ -64,6 +64,7 @@ public class KeywordLexiconJournal {
} }
public void loadFile(Consumer<byte[]> loadJournalEntry) throws IOException { public void loadFile(Consumer<byte[]> loadJournalEntry) throws IOException {
journalFile.rewind();
journalFile.loadFile(loadJournalEntry); journalFile.loadFile(loadJournalEntry);
} }
} }

View File

@ -27,6 +27,10 @@ public class KeywordLexiconJournalFile implements AutoCloseable {
this.journalFile = journalFile; this.journalFile = journalFile;
} }
public void rewind() throws IOException {
journalFileRAF.seek(0);
}
public void loadFile(Consumer<byte[]> acceptEntry) throws IOException { public void loadFile(Consumer<byte[]> acceptEntry) throws IOException {
if (!journalFile.exists()) { if (!journalFile.exists()) {
logger.info("File {} does not exist, can't load", journalFile); logger.info("File {} does not exist, can't load", journalFile);

View File

@ -74,6 +74,17 @@ public class IndexService extends Service {
volatile boolean initialized = false; volatile boolean initialized = false;
@MqRequest(endpoint = IndexMqEndpoints.INDEX_RELOAD_LEXICON)
public String reloadLexicon(String message) throws Exception {
if (!opsService.reloadLexicon()) {
throw new IllegalStateException("Ops lock busy");
}
return "ok";
}
@MqRequest(endpoint = IndexMqEndpoints.INDEX_REPARTITION) @MqRequest(endpoint = IndexMqEndpoints.INDEX_REPARTITION)
public String repartition(String message) { public String repartition(String message) {
if (!opsService.repartition()) { if (!opsService.repartition()) {

View File

@ -3,11 +3,13 @@ package nu.marginalia.index.svc;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.index.index.SearchIndex; import nu.marginalia.index.index.SearchIndex;
import nu.marginalia.lexicon.KeywordLexiconReadOnlyView;
import spark.Request; import spark.Request;
import spark.Response; import spark.Response;
import spark.Spark; import spark.Spark;
import javax.annotation.CheckReturnValue; import javax.annotation.CheckReturnValue;
import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -18,12 +20,15 @@ public class IndexOpsService {
private final SearchIndex index; private final SearchIndex index;
private final IndexSearchSetsService searchSetService; private final IndexSearchSetsService searchSetService;
private final KeywordLexiconReadOnlyView lexicon;
@Inject @Inject
public IndexOpsService(SearchIndex index, public IndexOpsService(SearchIndex index,
IndexSearchSetsService searchSetService) { IndexSearchSetsService searchSetService,
KeywordLexiconReadOnlyView lexicon) {
this.index = index; this.index = index;
this.searchSetService = searchSetService; this.searchSetService = searchSetService;
this.lexicon = lexicon;
} }
public boolean isBusy() { public boolean isBusy() {
@ -36,6 +41,9 @@ public class IndexOpsService {
public boolean reindex() throws Exception { public boolean reindex() throws Exception {
return run(index::switchIndex).isPresent(); return run(index::switchIndex).isPresent();
} }
public boolean reloadLexicon() throws Exception {
return run(lexicon::reload).isPresent();
}
public Object repartitionEndpoint(Request request, Response response) throws Exception { public Object repartitionEndpoint(Request request, Response response) throws Exception {
@ -80,5 +88,6 @@ public class IndexOpsService {
} }
} }
} }

View File

@ -6,9 +6,12 @@ import lombok.SneakyThrows;
import nu.marginalia.WebsiteUrl; import nu.marginalia.WebsiteUrl;
import nu.marginalia.client.Context; import nu.marginalia.client.Context;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.search.client.SearchMqEndpoints;
import nu.marginalia.search.db.DbUrlDetailsQuery;
import nu.marginalia.search.svc.SearchFrontPageService; import nu.marginalia.search.svc.SearchFrontPageService;
import nu.marginalia.search.svc.*; import nu.marginalia.search.svc.*;
import nu.marginalia.service.server.*; import nu.marginalia.service.server.*;
import nu.marginalia.service.server.mq.MqRequest;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import spark.Request; import spark.Request;
@ -21,6 +24,7 @@ import java.nio.charset.StandardCharsets;
public class SearchService extends Service { public class SearchService extends Service {
private final WebsiteUrl websiteUrl; private final WebsiteUrl websiteUrl;
private final DbUrlDetailsQuery dbUrlDetailsQuery;
private final StaticResources staticResources; private final StaticResources staticResources;
private static final Logger logger = LoggerFactory.getLogger(SearchService.class); private static final Logger logger = LoggerFactory.getLogger(SearchService.class);
@ -29,6 +33,7 @@ public class SearchService extends Service {
@Inject @Inject
public SearchService(BaseServiceParams params, public SearchService(BaseServiceParams params,
WebsiteUrl websiteUrl, WebsiteUrl websiteUrl,
DbUrlDetailsQuery dbUrlDetailsQuery,
StaticResources staticResources, StaticResources staticResources,
SearchFrontPageService frontPageService, SearchFrontPageService frontPageService,
SearchErrorPageService errorPageService, SearchErrorPageService errorPageService,
@ -40,6 +45,7 @@ public class SearchService extends Service {
super(params); super(params);
this.websiteUrl = websiteUrl; this.websiteUrl = websiteUrl;
this.dbUrlDetailsQuery = dbUrlDetailsQuery;
this.staticResources = staticResources; this.staticResources = staticResources;
Spark.staticFiles.expireTime(600); Spark.staticFiles.expireTime(600);
@ -70,6 +76,13 @@ public class SearchService extends Service {
Spark.awaitInitialization(); Spark.awaitInitialization();
} }
@MqRequest(endpoint = SearchMqEndpoints.FLUSH_CACHES)
public String flushCaches(String unusedArg) {
logger.info("Flushing caches");
dbUrlDetailsQuery.clearCaches();
return "OK";
}
private Object serveStatic(Request request, Response response) { private Object serveStatic(Request request, Response response) {
String resource = request.params("resource"); String resource = request.params("resource");
staticResources.serveStatic("search", resource, request, response); staticResources.serveStatic("search", resource, request, response);

View File

@ -11,6 +11,8 @@ import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState; import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior; import nu.marginalia.mqsm.graph.ResumeBehavior;
import nu.marginalia.search.client.SearchClient;
import nu.marginalia.search.client.SearchMqEndpoints;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@ -25,14 +27,25 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph {
private static final String RECONVERT = "RECONVERT"; private static final String RECONVERT = "RECONVERT";
private static final String LOAD = "LOAD"; private static final String LOAD = "LOAD";
private static final String MOVE_INDEX_FILES = "MOVE_INDEX_FILES"; private static final String MOVE_INDEX_FILES = "MOVE_INDEX_FILES";
private static final String RELOAD_LEXICON = "RELOAD_LEXICON";
private static final String RELOAD_LEXICON_WAIT = "RELOAD_LEXICON_WAIT";
private static final String FLUSH_CACHES = "FLUSH_CACHES";
private static final String END = "END"; private static final String END = "END";
private final ProcessService processService; private final ProcessService processService;
private final MqOutbox mqIndexOutbox;
private final MqOutbox mqSearchOutbox;
@Inject @Inject
public ReconvertAndLoadProcess(StateFactory stateFactory, ProcessService processService) { public ReconvertAndLoadProcess(StateFactory stateFactory,
ProcessService processService,
IndexClient indexClient,
SearchClient searchClient
) {
super(stateFactory); super(stateFactory);
this.processService = processService; this.processService = processService;
this.mqIndexOutbox = indexClient.outbox();
this.mqSearchOutbox = searchClient.outbox();
} }
@GraphState(name = INITIAL, next = RECONVERT) @GraphState(name = INITIAL, next = RECONVERT)
@ -62,8 +75,8 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph {
error(); error();
} }
@GraphState(name = MOVE_INDEX_FILES, next = END, resume = ResumeBehavior.ERROR) @GraphState(name = MOVE_INDEX_FILES, next = RELOAD_LEXICON, resume = ResumeBehavior.ERROR)
public String moveIndexFiles(String crawlJob) throws Exception { public void moveIndexFiles(String crawlJob) throws Exception {
Path indexData = Path.of("/vol/index.dat"); Path indexData = Path.of("/vol/index.dat");
Path indexDest = Path.of("/vol/iw/0/page-index.dat"); Path indexDest = Path.of("/vol/iw/0/page-index.dat");
@ -71,7 +84,28 @@ public class ReconvertAndLoadProcess extends AbstractStateGraph {
error("Index data not found"); error("Index data not found");
Files.move(indexData, indexDest, StandardCopyOption.REPLACE_EXISTING); Files.move(indexData, indexDest, StandardCopyOption.REPLACE_EXISTING);
}
return crawlJob; @GraphState(name = RELOAD_LEXICON, next = RELOAD_LEXICON_WAIT, resume = ResumeBehavior.ERROR)
public long reloadLexicon() throws Exception {
return mqIndexOutbox.sendAsync(IndexMqEndpoints.INDEX_RELOAD_LEXICON, "");
}
@GraphState(name = RELOAD_LEXICON_WAIT, next = FLUSH_CACHES, resume = ResumeBehavior.RETRY)
public void reloadLexiconWait(long id) throws Exception {
var rsp = mqIndexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("RELOAD_LEXICON failed");
}
}
@GraphState(name = FLUSH_CACHES, next = END, resume = ResumeBehavior.RETRY)
public void flushCaches() throws Exception {
var rsp = mqSearchOutbox.send(SearchMqEndpoints.FLUSH_CACHES, "");
if (rsp.state() != MqMessageState.OK) {
error("FLUSH_CACHES failed");
}
} }
} }