mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-22 20:48:59 +00:00
(crawler) Smarter parquet->slop crawl data migration
This commit is contained in:
parent
abec83582d
commit
274941f6de
@ -28,13 +28,11 @@ import nu.marginalia.process.ProcessConfigurationModule;
|
||||
import nu.marginalia.process.ProcessMainClass;
|
||||
import nu.marginalia.process.control.ProcessHeartbeatImpl;
|
||||
import nu.marginalia.process.log.WorkLog;
|
||||
import nu.marginalia.process.log.WorkLogEntry;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.slop.SlopCrawlDataRecord;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.util.SimpleBlockingThreadPool;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -44,11 +42,13 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.security.Security;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static nu.marginalia.mqapi.ProcessInboxNames.CRAWLER_INBOX;
|
||||
|
||||
@ -182,8 +182,6 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
// Assign any domains with node_affinity=0 to this node, and then fetch all domains assigned to this node
|
||||
// to be crawled.
|
||||
|
||||
performMigration(outputDir);
|
||||
|
||||
try (var conn = dataSource.getConnection()) {
|
||||
try (var assignFreeDomains = conn.prepareStatement(
|
||||
"""
|
||||
@ -417,11 +415,22 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
|
||||
private CrawlDataReference getReference() {
|
||||
try {
|
||||
return new CrawlDataReference(CrawledDomainReader.createDataStream(outputDir, domain, id));
|
||||
Path slopPath = CrawlerOutputFile.getSlopPath(outputDir, id, domain);
|
||||
if (Files.exists(slopPath)) {
|
||||
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath));
|
||||
}
|
||||
|
||||
Path parquetPath = CrawlerOutputFile.getParquetPath(outputDir, id, domain);
|
||||
if (Files.exists(parquetPath)) {
|
||||
slopPath = migrateParquetData(parquetPath, domain, outputDir);
|
||||
return new CrawlDataReference(CrawledDomainReader.createDataStream(slopPath));
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.debug("Failed to read previous crawl data for {}", specification.domain());
|
||||
return new CrawlDataReference();
|
||||
}
|
||||
|
||||
return new CrawlDataReference();
|
||||
}
|
||||
|
||||
}
|
||||
@ -482,92 +491,19 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
}
|
||||
}
|
||||
|
||||
// Data migration logic
|
||||
|
||||
private void performMigration(Path root) throws IOException {
|
||||
Path crawlerLog = root.resolve("crawler.log");
|
||||
Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log");
|
||||
|
||||
|
||||
int finishedTasks = 0;
|
||||
int totalTasks;
|
||||
try (var oldLog = new WorkLog(crawlerLog)) {
|
||||
totalTasks = oldLog.countFinishedJobs();
|
||||
// Migrate from parquet to slop if necessary
|
||||
//
|
||||
// This must be synchronized as chewing through parquet files in parallel leads to enormous memory overhead
|
||||
private synchronized Path migrateParquetData(Path inputPath, String domain, Path crawlDataRoot) throws IOException {
|
||||
if (!inputPath.endsWith(".parquet")) {
|
||||
return inputPath;
|
||||
}
|
||||
|
||||
try (WorkLog workLog = new WorkLog(newCrawlerLog);
|
||||
var migrationHeartbeat = heartbeat.createAdHocTaskHeartbeat("MIGRATING")) {
|
||||
Path outputFile = CrawlerOutputFile.createSlopPath(crawlDataRoot, Integer.toHexString(domain.hashCode()), domain);
|
||||
|
||||
SlopCrawlDataRecord.convertFromParquet(inputPath, outputFile);
|
||||
|
||||
|
||||
for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) {
|
||||
|
||||
var entry = item.getKey();
|
||||
var path = item.getValue();
|
||||
|
||||
if (path.toFile().getName().endsWith(".parquet")) {
|
||||
logger.info("Converting {}", entry.id());
|
||||
|
||||
String domain = entry.id();
|
||||
String id = Integer.toHexString(domain.hashCode());
|
||||
|
||||
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
|
||||
|
||||
SlopCrawlDataRecord.convertFromParquet(path, outputFile);
|
||||
|
||||
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
|
||||
}
|
||||
else {
|
||||
workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt());
|
||||
}
|
||||
|
||||
migrationHeartbeat.progress("Parquet To Slop", ++finishedTasks, totalTasks);
|
||||
}
|
||||
}
|
||||
|
||||
Path oldCrawlerLog = Files.createTempFile(root, "crawler-", ".migrate.old.log");
|
||||
Files.move(crawlerLog, oldCrawlerLog, StandardCopyOption.REPLACE_EXISTING);
|
||||
Files.move(newCrawlerLog, crawlerLog);
|
||||
}
|
||||
|
||||
|
||||
private static class CrawlDataLocator implements Function<WorkLogEntry, Optional<Map.Entry<WorkLogEntry, Path>>> {
|
||||
|
||||
private final Path crawlRootDir;
|
||||
|
||||
CrawlDataLocator(Path crawlRootDir) {
|
||||
this.crawlRootDir = crawlRootDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Map.Entry<WorkLogEntry, Path>> apply(WorkLogEntry entry) {
|
||||
var path = getCrawledFilePath(crawlRootDir, entry.path());
|
||||
|
||||
if (!Files.exists(path)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
try {
|
||||
return Optional.of(Map.entry(entry, path));
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private Path getCrawledFilePath(Path crawlDir, String fileName) {
|
||||
int sp = fileName.lastIndexOf('/');
|
||||
|
||||
// Normalize the filename
|
||||
if (sp >= 0 && sp + 1< fileName.length())
|
||||
fileName = fileName.substring(sp + 1);
|
||||
if (fileName.length() < 4)
|
||||
fileName = Strings.repeat("0", 4 - fileName.length()) + fileName;
|
||||
|
||||
String sp1 = fileName.substring(0, 2);
|
||||
String sp2 = fileName.substring(2, 4);
|
||||
return crawlDir.resolve(sp1).resolve(sp2).resolve(fileName);
|
||||
}
|
||||
return outputFile;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,9 +5,7 @@ import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class CrawledDomainReader {
|
||||
@ -26,7 +24,8 @@ public class CrawledDomainReader {
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
}
|
||||
else if (fileName.endsWith(".slop.zip")) {
|
||||
|
||||
if (fileName.endsWith(".slop.zip")) {
|
||||
try {
|
||||
return new SlopSerializableCrawlDataStream(fullPath);
|
||||
} catch (Exception ex) {
|
||||
@ -34,22 +33,9 @@ public class CrawledDomainReader {
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
}
|
||||
else {
|
||||
logger.error("Unknown file type: {}", fullPath);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
}
|
||||
|
||||
/** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */
|
||||
public static SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException {
|
||||
Path parquetPath = CrawlerOutputFile.getParquetPath(basePath, id, domain);
|
||||
|
||||
if (Files.exists(parquetPath)) {
|
||||
return createDataStream(parquetPath);
|
||||
}
|
||||
else {
|
||||
throw new FileNotFoundException("No such file: " + parquetPath);
|
||||
}
|
||||
logger.error("Unknown file type: {}", fullPath);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -35,19 +35,6 @@ public class CrawlerOutputFile {
|
||||
return destDir.resolve(id + "-" + filesystemSafeName(domain) + "-" + version.suffix + ".warc.gz");
|
||||
}
|
||||
|
||||
public static Path createParquetPath(Path basePath, String id, String domain) throws IOException {
|
||||
id = padId(id);
|
||||
|
||||
String first = id.substring(0, 2);
|
||||
String second = id.substring(2, 4);
|
||||
|
||||
Path destDir = basePath.resolve(first).resolve(second);
|
||||
if (!Files.exists(destDir)) {
|
||||
Files.createDirectories(destDir);
|
||||
}
|
||||
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".parquet");
|
||||
}
|
||||
|
||||
public static Path createSlopPath(Path basePath, String id, String domain) throws IOException {
|
||||
id = padId(id);
|
||||
|
||||
@ -71,16 +58,17 @@ public class CrawlerOutputFile {
|
||||
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".parquet");
|
||||
}
|
||||
|
||||
public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) {
|
||||
public static Path getSlopPath(Path basePath, String id, String domain) {
|
||||
id = padId(id);
|
||||
|
||||
String first = id.substring(0, 2);
|
||||
String second = id.substring(2, 4);
|
||||
|
||||
Path destDir = basePath.resolve(first).resolve(second);
|
||||
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".warc" + version.suffix);
|
||||
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".slop.zip");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Pads the given ID with leading zeros to ensure it has a length of 4 characters.
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user