MarginaliaSearch/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java

205 lines
7.6 KiB
Java
Raw Normal View History

2023-03-04 12:19:01 +00:00
package nu.marginalia.loading;
2022-05-19 15:45:26 +00:00
2023-07-14 15:08:10 +00:00
import com.google.gson.Gson;
2022-05-19 15:45:26 +00:00
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import lombok.Getter;
2022-05-19 15:45:26 +00:00
import lombok.SneakyThrows;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.loading.domains.DomainLoaderService;
import nu.marginalia.loading.links.DomainLinksLoaderService;
2023-07-14 15:08:10 +00:00
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
2023-07-14 15:08:10 +00:00
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
2023-03-04 12:19:01 +00:00
import nu.marginalia.service.module.DatabaseModule;
2022-05-19 15:45:26 +00:00
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
2023-03-04 12:19:01 +00:00
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
2023-07-14 15:08:10 +00:00
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
2022-05-19 15:45:26 +00:00
import java.util.concurrent.TimeUnit;
2023-07-14 15:08:10 +00:00
2023-07-17 11:57:32 +00:00
import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX;
2022-05-19 15:45:26 +00:00
public class LoaderMain extends ProcessMainClass {
2022-07-18 15:22:22 +00:00
private static final Logger logger = LoggerFactory.getLogger(LoaderMain.class);
private final ProcessHeartbeatImpl heartbeat;
2023-07-14 15:08:10 +00:00
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final DocumentDbWriter documentDbWriter;
private final DomainLoaderService domainService;
private final DomainLinksLoaderService linksService;
private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService;
private final int node;
2023-07-14 15:08:10 +00:00
private final Gson gson;
2022-05-19 15:45:26 +00:00
public static void main(String... args) {
try {
new org.mariadb.jdbc.Driver();
2023-03-04 12:19:01 +00:00
Injector injector = Guice.createInjector(
new ProcessConfigurationModule("loader"),
new LoaderModule(),
new DatabaseModule(false)
);
var instance = injector.getInstance(LoaderMain.class);
2022-05-19 15:45:26 +00:00
var instructions = instance.fetchInstructions();
logger.info("Instructions received");
instance.run(instructions);
}
catch (Exception ex) {
logger.error("Error running loader", ex);
}
2022-05-19 15:45:26 +00:00
}
@Inject
public LoaderMain(ProcessHeartbeatImpl heartbeat,
2023-07-14 15:08:10 +00:00
MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService,
DocumentDbWriter documentDbWriter,
DomainLoaderService domainService,
DomainLinksLoaderService linksService,
KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService,
ProcessConfiguration processConfiguration,
2023-07-14 15:08:10 +00:00
Gson gson
) {
this.node = processConfiguration.node();
this.heartbeat = heartbeat;
2023-07-14 15:08:10 +00:00
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.documentDbWriter = documentDbWriter;
this.domainService = domainService;
this.linksService = linksService;
this.keywordLoaderService = keywordLoaderService;
this.documentLoaderService = documentLoaderService;
2023-07-14 15:08:10 +00:00
this.gson = gson;
heartbeat.start();
2022-05-19 15:45:26 +00:00
}
2023-03-04 12:19:01 +00:00
@SneakyThrows
void run(LoadRequest instructions) {
LoaderInputData inputData = instructions.getInputData();
DomainIdRegistry domainIdRegistry = domainService.getOrCreateDomainIds(inputData);
try {
var results = ForkJoinPool.commonPool()
.invokeAll(
List.of(
() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData),
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
)
);
for (var result : results) {
if (result.state() == Future.State.FAILED) {
throw result.exceptionNow();
}
}
2023-07-14 15:08:10 +00:00
instructions.ok();
}
catch (Exception ex) {
instructions.err();
logger.error("Error", ex);
}
finally {
keywordLoaderService.close();
documentDbWriter.close();
heartbeat.shutDown();
}
System.exit(0);
2022-05-19 15:45:26 +00:00
}
2023-07-14 15:08:10 +00:00
private static class LoadRequest {
@Getter
private final LoaderInputData inputData;
2023-07-14 15:08:10 +00:00
private final MqMessage message;
private final MqSingleShotInbox inbox;
LoadRequest(LoaderInputData inputData, MqMessage message, MqSingleShotInbox inbox) {
this.inputData = inputData;
2023-07-14 15:08:10 +00:00
this.message = message;
this.inbox = inbox;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private LoadRequest fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(LOADER_INBOX, node, UUID.randomUUID());
2023-07-14 15:08:10 +00:00
2023-07-17 11:57:32 +00:00
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName());
2023-07-14 15:08:10 +00:00
if (msgOpt.isEmpty())
throw new RuntimeException("No instruction received in inbox");
var msg = msgOpt.get();
2023-07-17 11:57:32 +00:00
if (!nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName().equals(msg.function())) {
2023-07-14 15:08:10 +00:00
throw new RuntimeException("Unexpected message in inbox: " + msg);
}
try {
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.loading.LoadRequest.class);
2023-07-14 15:08:10 +00:00
List<Path> inputSources = new ArrayList<>();
for (var storageId : request.inputProcessDataStorageIds) {
inputSources.add(fileStorageService.getStorage(storageId).asPath());
}
2023-07-14 15:08:10 +00:00
return new LoadRequest(new LoaderInputData(inputSources), msg, inbox);
}
catch (Exception ex) {
inbox.sendResponse(msg, new MqInboxResponse("FAILED", MqMessageState.ERR));
throw ex;
}
2023-07-14 15:08:10 +00:00
}
2022-05-19 15:45:26 +00:00
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
2022-05-19 15:45:26 +00:00
}