MarginaliaSearch/code/processes/loading-process/java/nu/marginalia/loading/LoaderMain.java

134 lines
4.9 KiB
Java
Raw Normal View History

2023-03-04 12:19:01 +00:00
package nu.marginalia.loading;
2022-05-19 15:45:26 +00:00
2023-07-14 15:08:10 +00:00
import com.google.gson.Gson;
2022-05-19 15:45:26 +00:00
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.linkdb.docs.DocumentDbWriter;
import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.loading.domains.DomainLoaderService;
import nu.marginalia.loading.links.DomainLinksLoaderService;
2023-07-14 15:08:10 +00:00
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqapi.loading.LoadRequest;
import nu.marginalia.process.ProcessConfiguration;
import nu.marginalia.process.ProcessConfigurationModule;
import nu.marginalia.process.ProcessMainClass;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
2023-03-04 12:19:01 +00:00
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.storage.FileStorageService;
2022-05-19 15:45:26 +00:00
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
2023-07-14 15:08:10 +00:00
2023-07-17 11:57:32 +00:00
import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX;
2022-05-19 15:45:26 +00:00
public class LoaderMain extends ProcessMainClass {
2022-07-18 15:22:22 +00:00
private static final Logger logger = LoggerFactory.getLogger(LoaderMain.class);
private final ProcessHeartbeatImpl heartbeat;
2023-07-14 15:08:10 +00:00
private final FileStorageService fileStorageService;
private final DocumentDbWriter documentDbWriter;
private final DomainLoaderService domainService;
private final DomainLinksLoaderService linksService;
private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService;
2022-05-19 15:45:26 +00:00
public static void main(String... args) {
try {
new org.mariadb.jdbc.Driver();
2023-03-04 12:19:01 +00:00
Injector injector = Guice.createInjector(
new ProcessConfigurationModule("loader"),
new LoaderModule(),
new DatabaseModule(false)
);
var instance = injector.getInstance(LoaderMain.class);
2022-05-19 15:45:26 +00:00
var instructions = instance.fetchInstructions(LoadRequest.class);
logger.info("Instructions received");
instance.run(instructions);
}
catch (Throwable ex) {
logger.error("Error running loader", ex);
}
2022-05-19 15:45:26 +00:00
}
@Inject
public LoaderMain(ProcessHeartbeatImpl heartbeat,
2023-07-14 15:08:10 +00:00
MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService,
DocumentDbWriter documentDbWriter,
DomainLoaderService domainService,
DomainLinksLoaderService linksService,
KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService,
ProcessConfiguration processConfiguration,
2023-07-14 15:08:10 +00:00
Gson gson
) {
super(messageQueueFactory, processConfiguration, gson, LOADER_INBOX);
this.heartbeat = heartbeat;
2023-07-14 15:08:10 +00:00
this.fileStorageService = fileStorageService;
this.documentDbWriter = documentDbWriter;
this.domainService = domainService;
this.linksService = linksService;
this.keywordLoaderService = keywordLoaderService;
this.documentLoaderService = documentLoaderService;
heartbeat.start();
2022-05-19 15:45:26 +00:00
}
void run(Instructions<LoadRequest> instructions) throws Throwable {
List<Path> inputSources = new ArrayList<>();
for (var storageId : instructions.value().inputProcessDataStorageIds) {
inputSources.add(fileStorageService.getStorage(storageId).asPath());
}
var inputData = new LoaderInputData(inputSources);
DomainIdRegistry domainIdRegistry = domainService.getOrCreateDomainIds(heartbeat, inputData);
try {
var results = ForkJoinPool.commonPool()
.invokeAll(
List.of(
() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputData),
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputData),
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputData),
() -> domainService.loadDomainMetadata(domainIdRegistry, heartbeat, inputData)
)
);
for (var result : results) {
if (result.state() == Future.State.FAILED) {
throw result.exceptionNow();
}
}
2023-07-14 15:08:10 +00:00
instructions.ok();
}
catch (Exception ex) {
instructions.err();
logger.error("Error", ex);
}
finally {
keywordLoaderService.close();
documentDbWriter.close();
heartbeat.shutDown();
}
System.exit(0);
2022-05-19 15:45:26 +00:00
}
2022-05-19 15:45:26 +00:00
}