2023-03-04 12:19:01 +00:00
|
|
|
package nu.marginalia.converting;
|
2022-05-19 15:45:26 +00:00
|
|
|
|
2022-07-11 21:25:03 +00:00
|
|
|
import com.google.gson.Gson;
|
2022-05-19 15:45:26 +00:00
|
|
|
import com.google.inject.Guice;
|
|
|
|
import com.google.inject.Inject;
|
|
|
|
import com.google.inject.Injector;
|
2023-10-14 10:07:40 +00:00
|
|
|
import nu.marginalia.ProcessConfiguration;
|
2023-10-10 10:32:22 +00:00
|
|
|
import nu.marginalia.ProcessConfigurationModule;
|
2023-07-28 16:14:43 +00:00
|
|
|
import nu.marginalia.converting.sideload.SideloadSource;
|
|
|
|
import nu.marginalia.converting.sideload.SideloadSourceFactory;
|
2023-12-27 17:20:03 +00:00
|
|
|
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
|
2023-09-14 10:12:07 +00:00
|
|
|
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
2023-09-13 14:13:41 +00:00
|
|
|
import nu.marginalia.converting.writer.ConverterWriter;
|
2024-04-22 10:34:28 +00:00
|
|
|
import nu.marginalia.crawling.io.CrawledDomainReader;
|
|
|
|
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
|
|
|
|
import nu.marginalia.process.log.WorkLog;
|
|
|
|
import nu.marginalia.process.log.WorkLogEntry;
|
2024-01-13 16:12:18 +00:00
|
|
|
import nu.marginalia.service.ProcessMainClass;
|
2023-10-14 10:07:40 +00:00
|
|
|
import nu.marginalia.storage.FileStorageService;
|
2023-07-14 15:08:10 +00:00
|
|
|
import nu.marginalia.mq.MessageQueueFactory;
|
|
|
|
import nu.marginalia.mq.MqMessage;
|
|
|
|
import nu.marginalia.mq.inbox.MqInboxResponse;
|
|
|
|
import nu.marginalia.mq.inbox.MqSingleShotInbox;
|
2023-07-11 12:46:21 +00:00
|
|
|
import nu.marginalia.process.control.ProcessHeartbeat;
|
2023-08-29 11:07:55 +00:00
|
|
|
import nu.marginalia.process.control.ProcessHeartbeatImpl;
|
2023-07-10 15:36:12 +00:00
|
|
|
import nu.marginalia.service.module.DatabaseModule;
|
2023-09-20 08:11:49 +00:00
|
|
|
import nu.marginalia.util.SimpleBlockingThreadPool;
|
2023-09-13 14:13:41 +00:00
|
|
|
import nu.marginalia.worklog.BatchingWorkLog;
|
|
|
|
import nu.marginalia.worklog.BatchingWorkLogImpl;
|
2024-04-22 10:34:28 +00:00
|
|
|
import org.apache.logging.log4j.util.Strings;
|
|
|
|
import nu.marginalia.converting.model.CrawlPlan;
|
2023-03-04 12:19:01 +00:00
|
|
|
import nu.marginalia.converting.processor.DomainProcessor;
|
2022-05-19 15:45:26 +00:00
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
2024-04-22 10:34:28 +00:00
|
|
|
import nu.marginalia.converting.model.WorkDir;
|
2022-05-19 15:45:26 +00:00
|
|
|
|
2024-04-22 10:34:28 +00:00
|
|
|
import java.io.IOException;
|
|
|
|
import java.nio.file.Files;
|
2023-07-28 16:14:43 +00:00
|
|
|
import java.nio.file.Path;
|
2023-07-17 10:27:27 +00:00
|
|
|
import java.sql.SQLException;
|
2023-09-17 12:35:06 +00:00
|
|
|
import java.util.Collection;
|
|
|
|
import java.util.List;
|
2023-07-17 10:27:27 +00:00
|
|
|
import java.util.Optional;
|
2023-07-14 15:08:10 +00:00
|
|
|
import java.util.UUID;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
2023-07-11 12:46:21 +00:00
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
2024-04-22 10:34:28 +00:00
|
|
|
import java.util.function.Function;
|
2022-05-19 15:45:26 +00:00
|
|
|
|
2023-07-17 11:57:32 +00:00
|
|
|
import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX;
|
2023-07-14 15:08:10 +00:00
|
|
|
|
2024-01-13 16:12:18 +00:00
|
|
|
public class ConverterMain extends ProcessMainClass {
|
2023-07-14 15:08:10 +00:00
|
|
|
private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class);
|
|
|
|
private final DomainProcessor processor;
|
|
|
|
private final Gson gson;
|
|
|
|
private final ProcessHeartbeat heartbeat;
|
|
|
|
private final MessageQueueFactory messageQueueFactory;
|
|
|
|
private final FileStorageService fileStorageService;
|
2023-07-28 16:14:43 +00:00
|
|
|
private final SideloadSourceFactory sideloadSourceFactory;
|
2023-10-14 10:07:40 +00:00
|
|
|
private final int node;
|
|
|
|
|
2023-07-14 15:08:10 +00:00
|
|
|
public static void main(String... args) throws Exception {
|
2022-05-19 15:45:26 +00:00
|
|
|
|
2023-11-22 17:31:27 +00:00
|
|
|
try {
|
|
|
|
Injector injector = Guice.createInjector(
|
|
|
|
new ConverterModule(),
|
|
|
|
new ProcessConfigurationModule("converter"),
|
2024-01-11 11:40:03 +00:00
|
|
|
new DatabaseModule(false)
|
2023-11-22 17:31:27 +00:00
|
|
|
);
|
2023-07-14 15:08:10 +00:00
|
|
|
|
2023-11-22 17:31:27 +00:00
|
|
|
var converter = injector.getInstance(ConverterMain.class);
|
2023-07-14 15:08:10 +00:00
|
|
|
|
2023-11-22 17:31:27 +00:00
|
|
|
logger.info("Starting pipe");
|
2023-07-14 15:08:10 +00:00
|
|
|
|
2023-11-22 17:31:27 +00:00
|
|
|
converter
|
|
|
|
.fetchInstructions()
|
|
|
|
.execute(converter);
|
|
|
|
|
|
|
|
logger.info("Finished");
|
|
|
|
}
|
|
|
|
catch (Exception ex) {
|
|
|
|
logger.error("Uncaught Exception", ex);
|
|
|
|
}
|
2023-07-14 15:08:10 +00:00
|
|
|
|
|
|
|
System.exit(0);
|
2022-05-19 15:45:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
@Inject
|
|
|
|
public ConverterMain(
|
|
|
|
DomainProcessor processor,
|
2023-07-11 12:46:21 +00:00
|
|
|
Gson gson,
|
2023-08-29 11:07:55 +00:00
|
|
|
ProcessHeartbeatImpl heartbeat,
|
2023-07-14 15:08:10 +00:00
|
|
|
MessageQueueFactory messageQueueFactory,
|
2023-07-28 16:14:43 +00:00
|
|
|
FileStorageService fileStorageService,
|
2023-10-14 10:07:40 +00:00
|
|
|
SideloadSourceFactory sideloadSourceFactory,
|
|
|
|
ProcessConfiguration processConfiguration
|
2023-07-28 16:14:43 +00:00
|
|
|
)
|
|
|
|
{
|
2023-07-14 15:08:10 +00:00
|
|
|
this.processor = processor;
|
|
|
|
this.gson = gson;
|
|
|
|
this.heartbeat = heartbeat;
|
|
|
|
this.messageQueueFactory = messageQueueFactory;
|
|
|
|
this.fileStorageService = fileStorageService;
|
2023-07-28 16:14:43 +00:00
|
|
|
this.sideloadSourceFactory = sideloadSourceFactory;
|
2023-10-14 10:07:40 +00:00
|
|
|
this.node = processConfiguration.node();
|
2023-07-11 12:46:21 +00:00
|
|
|
|
|
|
|
heartbeat.start();
|
2023-07-14 15:08:10 +00:00
|
|
|
}
|
2023-07-11 12:46:21 +00:00
|
|
|
|
2023-09-17 12:35:06 +00:00
|
|
|
public void convert(Collection<? extends SideloadSource> sideloadSources, Path writeDir) throws Exception {
|
2023-09-14 12:13:03 +00:00
|
|
|
try (var writer = new ConverterBatchWriter(writeDir, 0);
|
2023-09-21 10:48:33 +00:00
|
|
|
var taskHeartbeat = heartbeat.createAdHocTaskHeartbeat("Sideloading");
|
2023-09-14 12:13:03 +00:00
|
|
|
BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(writeDir.resolve("processor.log"))
|
|
|
|
) {
|
2023-09-21 10:48:33 +00:00
|
|
|
|
|
|
|
int i = 0;
|
2023-09-17 12:35:06 +00:00
|
|
|
for (var sideloadSource : sideloadSources) {
|
2023-09-25 16:27:04 +00:00
|
|
|
logger.info("Sideloading {}", sideloadSource.domainName());
|
2023-09-21 10:48:33 +00:00
|
|
|
|
2023-09-25 16:27:04 +00:00
|
|
|
taskHeartbeat.progress(sideloadSource.domainName(), i++, sideloadSources.size());
|
2023-09-21 10:48:33 +00:00
|
|
|
|
2023-12-27 17:20:03 +00:00
|
|
|
writer.writeSideloadSource(sideloadSource);
|
2023-09-17 12:35:06 +00:00
|
|
|
}
|
2023-09-21 10:48:33 +00:00
|
|
|
taskHeartbeat.progress("Finished", i, sideloadSources.size());
|
2023-09-14 12:13:03 +00:00
|
|
|
|
|
|
|
// We write an empty log with just a finish marker for the sideloading action
|
|
|
|
batchingWorkLog.logFinishedBatch();
|
2023-09-14 10:12:07 +00:00
|
|
|
}
|
2023-07-28 16:14:43 +00:00
|
|
|
}
|
2023-07-14 15:08:10 +00:00
|
|
|
|
2024-04-22 10:34:28 +00:00
|
|
|
public void convert(int totalDomains, WorkDir crawlDir, WorkDir processedDir) throws Exception {
|
|
|
|
|
2022-05-19 15:45:26 +00:00
|
|
|
|
2024-02-12 15:24:19 +00:00
|
|
|
final int defaultPoolSize = Boolean.getBoolean("system.conserveMemory")
|
|
|
|
? Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 1, 4) // <-- conserve memory
|
|
|
|
: Math.clamp(Runtime.getRuntime().availableProcessors() - 2, 1, 32); // <-- a more liberal pool size
|
|
|
|
|
|
|
|
final int maxPoolSize = Integer.getInteger("converter.poolSize", defaultPoolSize);
|
2023-07-25 18:41:43 +00:00
|
|
|
|
2024-04-22 10:34:28 +00:00
|
|
|
try (BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(processedDir.getLogFile());
|
|
|
|
ConverterWriter converterWriter = new ConverterWriter(batchingWorkLog, processedDir.getDir()))
|
2023-09-13 17:18:58 +00:00
|
|
|
{
|
2023-09-20 08:11:49 +00:00
|
|
|
var pool = new SimpleBlockingThreadPool("ConverterThread", maxPoolSize, 2);
|
2023-07-11 12:46:21 +00:00
|
|
|
|
|
|
|
AtomicInteger processedDomains = new AtomicInteger(0);
|
2023-10-19 11:22:52 +00:00
|
|
|
logger.info("Processing {} domains", totalDomains);
|
2023-07-11 12:46:21 +00:00
|
|
|
|
2023-07-25 18:41:43 +00:00
|
|
|
// Advance the progress bar to the current position if this is a resumption
|
2023-09-13 14:13:41 +00:00
|
|
|
processedDomains.set(batchingWorkLog.size());
|
2023-07-25 18:41:43 +00:00
|
|
|
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
|
2022-07-11 21:25:03 +00:00
|
|
|
|
2024-04-22 10:34:28 +00:00
|
|
|
for (var domain : WorkLog.iterableMap(crawlDir.getLogFile(),
|
|
|
|
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
|
2023-07-25 18:41:43 +00:00
|
|
|
{
|
2023-07-29 17:19:09 +00:00
|
|
|
pool.submit(() -> {
|
2023-12-09 11:33:39 +00:00
|
|
|
try {
|
2023-12-27 17:20:03 +00:00
|
|
|
ConverterBatchWritableIf writable = processor.createWritable(domain);
|
|
|
|
converterWriter.accept(writable);
|
2023-12-09 11:33:39 +00:00
|
|
|
}
|
|
|
|
catch (Exception ex) {
|
|
|
|
logger.info("Error in processing", ex);
|
|
|
|
}
|
|
|
|
finally {
|
|
|
|
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
|
|
|
|
}
|
2023-07-25 18:41:43 +00:00
|
|
|
});
|
2023-07-12 15:47:36 +00:00
|
|
|
}
|
2022-05-19 15:45:26 +00:00
|
|
|
|
2023-10-19 11:22:52 +00:00
|
|
|
// Grace period in case we're loading like 1 item
|
|
|
|
Thread.sleep(100);
|
|
|
|
|
2023-07-29 17:19:09 +00:00
|
|
|
pool.shutDown();
|
2023-07-25 18:41:43 +00:00
|
|
|
do {
|
|
|
|
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
|
|
|
|
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
|
2022-08-10 15:03:58 +00:00
|
|
|
}
|
2023-07-14 15:08:10 +00:00
|
|
|
}
|
2022-05-19 15:45:26 +00:00
|
|
|
|
2024-04-22 10:34:28 +00:00
|
|
|
private static class CrawlDataLocator implements Function<WorkLogEntry, Optional<SerializableCrawlDataStream>> {
|
|
|
|
|
|
|
|
private final Path crawlRootDir;
|
|
|
|
private final BatchingWorkLog batchingWorkLog;
|
|
|
|
|
|
|
|
CrawlDataLocator(Path crawlRootDir, BatchingWorkLog workLog) {
|
|
|
|
this.crawlRootDir = crawlRootDir;
|
|
|
|
this.batchingWorkLog = workLog;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Optional<SerializableCrawlDataStream> apply(WorkLogEntry entry) {
|
|
|
|
if (batchingWorkLog.isItemProcessed(entry.id())) {
|
|
|
|
return Optional.empty();
|
|
|
|
}
|
|
|
|
|
|
|
|
var path = getCrawledFilePath(crawlRootDir, entry.path());
|
|
|
|
|
|
|
|
if (!Files.exists(path)) {
|
|
|
|
logger.warn("File not found: {}", path);
|
|
|
|
return Optional.empty();
|
|
|
|
}
|
|
|
|
|
|
|
|
try {
|
|
|
|
return Optional.of(CrawledDomainReader.createDataStream(path));
|
|
|
|
}
|
|
|
|
catch (IOException ex) {
|
|
|
|
return Optional.empty();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private Path getCrawledFilePath(Path crawlDir, String fileName) {
|
|
|
|
int sp = fileName.lastIndexOf('/');
|
|
|
|
|
|
|
|
// Normalize the filename
|
|
|
|
if (sp >= 0 && sp + 1< fileName.length())
|
|
|
|
fileName = fileName.substring(sp + 1);
|
|
|
|
if (fileName.length() < 4)
|
|
|
|
fileName = Strings.repeat("0", 4 - fileName.length()) + fileName;
|
|
|
|
|
|
|
|
String sp1 = fileName.substring(0, 2);
|
|
|
|
String sp2 = fileName.substring(2, 4);
|
|
|
|
return crawlDir.resolve(sp1).resolve(sp2).resolve(fileName);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-28 16:14:43 +00:00
|
|
|
private abstract static class ConvertRequest {
|
2023-07-14 15:08:10 +00:00
|
|
|
private final MqMessage message;
|
|
|
|
private final MqSingleShotInbox inbox;
|
|
|
|
|
2023-07-28 16:14:43 +00:00
|
|
|
private ConvertRequest(MqMessage message, MqSingleShotInbox inbox) {
|
2023-07-14 15:08:10 +00:00
|
|
|
this.message = message;
|
|
|
|
this.inbox = inbox;
|
|
|
|
}
|
|
|
|
|
2023-07-28 16:14:43 +00:00
|
|
|
public abstract void execute(ConverterMain converterMain) throws Exception;
|
2023-07-14 15:08:10 +00:00
|
|
|
|
|
|
|
public void ok() {
|
|
|
|
inbox.sendResponse(message, MqInboxResponse.ok());
|
|
|
|
}
|
|
|
|
public void err() {
|
|
|
|
inbox.sendResponse(message, MqInboxResponse.err());
|
|
|
|
}
|
2023-07-28 16:14:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
private static class SideloadAction extends ConvertRequest {
|
|
|
|
|
2023-09-17 12:35:06 +00:00
|
|
|
private final Collection<? extends SideloadSource> sideloadSources;
|
2023-07-28 16:14:43 +00:00
|
|
|
private final Path workDir;
|
|
|
|
|
|
|
|
SideloadAction(SideloadSource sideloadSource,
|
|
|
|
Path workDir,
|
|
|
|
MqMessage message, MqSingleShotInbox inbox) {
|
|
|
|
super(message, inbox);
|
2023-09-17 12:35:06 +00:00
|
|
|
this.sideloadSources = List.of(sideloadSource);
|
|
|
|
this.workDir = workDir;
|
|
|
|
}
|
2024-04-22 10:34:28 +00:00
|
|
|
|
2023-09-17 12:35:06 +00:00
|
|
|
SideloadAction(Collection<? extends SideloadSource> sideloadSources,
|
|
|
|
Path workDir,
|
|
|
|
MqMessage message, MqSingleShotInbox inbox) {
|
|
|
|
super(message, inbox);
|
|
|
|
this.sideloadSources = sideloadSources;
|
2023-07-28 16:14:43 +00:00
|
|
|
this.workDir = workDir;
|
|
|
|
}
|
|
|
|
@Override
|
|
|
|
public void execute(ConverterMain converterMain) throws Exception {
|
|
|
|
try {
|
2023-09-17 12:35:06 +00:00
|
|
|
converterMain.convert(sideloadSources, workDir);
|
2023-07-28 16:14:43 +00:00
|
|
|
ok();
|
|
|
|
}
|
|
|
|
catch (Exception ex) {
|
|
|
|
logger.error("Error sideloading", ex);
|
|
|
|
err();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private static class ConvertCrawlDataAction extends ConvertRequest {
|
|
|
|
private final CrawlPlan plan;
|
|
|
|
|
|
|
|
private ConvertCrawlDataAction(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) {
|
|
|
|
super(message, inbox);
|
|
|
|
this.plan = plan;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void execute(ConverterMain converterMain) throws Exception {
|
|
|
|
try {
|
2024-04-22 10:34:28 +00:00
|
|
|
converterMain.convert(plan.countCrawledDomains(), plan.crawl(), plan.process());
|
2023-07-28 16:14:43 +00:00
|
|
|
ok();
|
|
|
|
}
|
|
|
|
catch (Exception ex) {
|
2024-04-30 16:24:35 +00:00
|
|
|
logger.error("Error converting", ex);
|
|
|
|
|
2023-07-28 16:14:43 +00:00
|
|
|
err();
|
|
|
|
}
|
|
|
|
}
|
2022-05-19 15:45:26 +00:00
|
|
|
}
|
|
|
|
|
2023-07-28 16:14:43 +00:00
|
|
|
|
2023-07-14 15:08:10 +00:00
|
|
|
private ConvertRequest fetchInstructions() throws Exception {
|
|
|
|
|
2023-10-14 10:07:40 +00:00
|
|
|
var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, node, UUID.randomUUID());
|
2023-07-14 15:08:10 +00:00
|
|
|
|
2023-07-17 11:57:32 +00:00
|
|
|
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.converting.ConvertRequest.class.getSimpleName());
|
2023-07-17 10:27:27 +00:00
|
|
|
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
|
2023-07-14 15:08:10 +00:00
|
|
|
|
2024-01-22 12:01:09 +00:00
|
|
|
try {
|
|
|
|
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class);
|
|
|
|
|
2024-02-14 10:11:23 +00:00
|
|
|
// will be null on ConvertCrawlData
|
|
|
|
final Path inputPath = request.getInputPath();
|
|
|
|
|
2024-01-22 12:01:09 +00:00
|
|
|
return switch (request.action) {
|
|
|
|
case ConvertCrawlData -> {
|
|
|
|
var crawlData = fileStorageService.getStorage(request.crawlStorage);
|
|
|
|
var processData = fileStorageService.getStorage(request.processedDataStorage);
|
|
|
|
|
|
|
|
var plan = new CrawlPlan(null,
|
2024-04-30 16:24:35 +00:00
|
|
|
new WorkDir(crawlData.asPath().toString(), "crawler.log"),
|
|
|
|
new WorkDir(processData.asPath().toString(), "processor.log")
|
2024-04-22 10:34:28 +00:00
|
|
|
);
|
2024-01-22 12:01:09 +00:00
|
|
|
|
|
|
|
yield new ConvertCrawlDataAction(plan, msg, inbox);
|
|
|
|
}
|
|
|
|
case SideloadEncyclopedia -> {
|
|
|
|
var processData = fileStorageService.getStorage(request.processedDataStorage);
|
|
|
|
|
2024-02-14 10:11:23 +00:00
|
|
|
yield new SideloadAction(
|
|
|
|
sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl),
|
2024-01-22 12:01:09 +00:00
|
|
|
processData.asPath(),
|
|
|
|
msg, inbox);
|
|
|
|
}
|
|
|
|
case SideloadDirtree -> {
|
|
|
|
var processData = fileStorageService.getStorage(request.processedDataStorage);
|
|
|
|
|
|
|
|
yield new SideloadAction(
|
2024-02-14 10:11:23 +00:00
|
|
|
sideloadSourceFactory.sideloadDirtree(inputPath),
|
2024-01-22 12:01:09 +00:00
|
|
|
processData.asPath(),
|
|
|
|
msg, inbox);
|
|
|
|
}
|
|
|
|
case SideloadWarc -> {
|
|
|
|
var processData = fileStorageService.getStorage(request.processedDataStorage);
|
|
|
|
|
|
|
|
yield new SideloadAction(
|
2024-02-14 10:11:23 +00:00
|
|
|
sideloadSourceFactory.sideloadWarc(inputPath),
|
|
|
|
processData.asPath(),
|
|
|
|
msg, inbox);
|
|
|
|
}
|
|
|
|
case SideloadReddit -> {
|
|
|
|
var processData = fileStorageService.getStorage(request.processedDataStorage);
|
|
|
|
|
|
|
|
yield new SideloadAction(
|
|
|
|
sideloadSourceFactory.sideloadReddit(inputPath),
|
2024-01-22 12:01:09 +00:00
|
|
|
processData.asPath(),
|
|
|
|
msg, inbox);
|
|
|
|
}
|
|
|
|
case SideloadStackexchange -> {
|
|
|
|
var processData = fileStorageService.getStorage(request.processedDataStorage);
|
|
|
|
|
2024-02-14 10:11:23 +00:00
|
|
|
yield new SideloadAction(
|
|
|
|
sideloadSourceFactory.sideloadStackexchange(inputPath),
|
2024-01-22 12:01:09 +00:00
|
|
|
processData.asPath(),
|
|
|
|
msg, inbox);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
catch (Exception ex) {
|
|
|
|
inbox.sendResponse(msg, MqInboxResponse.err(STR."\{ex.getClass().getSimpleName()}: \{ex.getMessage()}"));
|
2023-08-07 10:57:38 +00:00
|
|
|
|
2024-01-22 12:01:09 +00:00
|
|
|
throw ex;
|
|
|
|
}
|
2023-07-14 15:08:10 +00:00
|
|
|
}
|
|
|
|
|
2023-07-17 10:27:27 +00:00
|
|
|
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
|
|
|
|
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
|
|
|
|
if (opt.isPresent()) {
|
|
|
|
if (!opt.get().function().equals(expectedFunction)) {
|
|
|
|
throw new RuntimeException("Unexpected function: " + opt.get().function());
|
|
|
|
}
|
|
|
|
return opt;
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
|
|
|
|
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
|
|
|
|
return stolenMessage;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-19 15:45:26 +00:00
|
|
|
}
|