(crawler/converter) Remove legacy junk from parquet migration

This commit is contained in:
Viktor Lofgren 2024-04-22 12:34:28 +02:00
parent ad2ac8eee3
commit 8a891c2159
32 changed files with 175 additions and 703 deletions

View File

@ -61,7 +61,7 @@ public class AtagExporter implements ExporterIf {
}
Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.FAST, crawlDataPath)) {
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
exportLinks(tagWriter, stream);
}
catch (Exception ex) {

View File

@ -58,7 +58,7 @@ public class FeedExporter implements ExporterIf {
}
Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.COMPATIBLE, crawlDataPath)) {
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
exportFeeds(tagWriter, stream);
}
catch (Exception ex) {

View File

@ -99,7 +99,7 @@ public class TermFrequencyExporter implements ExporterIf {
private void processFile(Path crawlDataPath, TLongIntHashMap counts, AtomicInteger docCount, SentenceExtractor se) {
TLongHashSet words = new TLongHashSet(10_000);
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.FAST, crawlDataPath)) {
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
while (stream.hasNext()) {
if (Thread.interrupted())
return;

View File

@ -10,7 +10,7 @@ import java.util.regex.Pattern;
public class ContentTypeLogic {
private static final Predicate<String> probableHtmlPattern = Pattern.compile("^.*\\.(htm|html|php|txt)$").asMatchPredicate();
private static final Predicate<String> probableHtmlPattern = Pattern.compile("^.*\\.(htm|html|php|txt|md)$").asMatchPredicate();
private static final Predicate<String> probableBinaryPattern = Pattern.compile("^.*\\.[a-z]+$").asMatchPredicate();
private static final Set<String> blockedContentTypes = Set.of("text/css", "text/javascript");
private static final List<String> acceptedContentTypePrefixes = List.of(
@ -29,6 +29,7 @@ public class ContentTypeLogic {
this.allowAllContentTypes = allowAllContentTypes;
}
/** Returns true if the URL is likely to be a binary file, based on the URL path. */
public boolean isUrlLikeBinary(EdgeUrl url) {
String pathLowerCase = url.path.toLowerCase();
@ -41,6 +42,7 @@ public class ContentTypeLogic {
public boolean isAllowableContentType(ContentType contentType) {
return isAllowableContentType(contentType.contentType());
}
public boolean isAllowableContentType(String contentType) {
if (allowAllContentTypes)
return true;

View File

@ -1,41 +1,18 @@
package nu.marginalia.crawling.io;
import com.google.gson.Gson;
import nu.marginalia.crawling.io.format.CompatibleLegacySerializableCrawlDataStream;
import nu.marginalia.crawling.io.format.FastLegacySerializableCrawlDataStream;
import nu.marginalia.crawling.io.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.model.gson.GsonFactory;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
public class CrawledDomainReader {
private static final Gson gson = GsonFactory.get();
public CrawledDomainReader() {
}
public enum CompatibilityLevel {
/** Data order emulates the ordering of the new format. This is slower */
COMPATIBLE,
/** Data order is not compatible with the new format, but the data itself is */
FAST,
/** Alias for FAST */
ANY
}
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
public static SerializableCrawlDataStream createDataStream(CompatibilityLevel compatibilityLevel,
Path fullPath) throws IOException
public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException
{
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".zstd")) {
if (compatibilityLevel == CompatibilityLevel.COMPATIBLE)
return new CompatibleLegacySerializableCrawlDataStream(gson, fullPath.toFile());
else // if (compatibilityLevel == CompatibilityLevel.FAST or ANY)
return new FastLegacySerializableCrawlDataStream(gson, fullPath.toFile());
}
else if (fileName.endsWith(".parquet")) {
if (fileName.endsWith(".parquet")) {
return new ParquetSerializableCrawlDataStream(fullPath);
}
else {
@ -44,14 +21,14 @@ public class CrawledDomainReader {
}
/** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */
public static SerializableCrawlDataStream createDataStream(CompatibilityLevel level, Path basePath, String domain, String id) throws IOException {
public static SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException {
Path parquetPath = CrawlerOutputFile.getParquetPath(basePath, id, domain);
if (Files.exists(parquetPath)) {
return createDataStream(level, parquetPath);
return createDataStream(parquetPath);
}
else {
return createDataStream(level, CrawlerOutputFile.getLegacyOutputFile(basePath, id, domain));
throw new FileNotFoundException("No such file: " + parquetPath);
}
}

View File

@ -1,66 +0,0 @@
package nu.marginalia.crawling.io;
import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdOutputStream;
import com.google.gson.Gson;
import lombok.SneakyThrows;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.model.gson.GsonFactory;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
public class CrawledDomainWriter implements AutoCloseable {
private final Path outputDir;
private final Gson gson = GsonFactory.get();
private final Writer writer;
private final Path tmpFile;
private final Path actualFile;
public CrawledDomainWriter(Path outputDir, String domain, String id) throws IOException {
this.outputDir = outputDir;
if (!Files.isDirectory(outputDir)) {
throw new IllegalArgumentException("Output dir " + outputDir + " does not exist");
}
// Do the actual writing to a temporary file first, then move it to the actual file when close() is invoked
// this lets us read the old file and compare its contents while writing the new file. It also guards against
// half-written files if the process is killed.
tmpFile = getOutputFile(id, domain + "_tmp");
actualFile = getOutputFile(id, domain);
writer = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(tmpFile,
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)),
RecyclingBufferPool.INSTANCE));
}
public Path getOutputFile() {
return actualFile;
}
@SneakyThrows
public void accept(SerializableCrawlData data) {
writer.write(data.getSerialIdentifier());
writer.write('\n');
gson.toJson(data, writer);
writer.write('\n');
}
private Path getOutputFile(String id, String name) throws IOException {
return CrawlerOutputFile.createLegacyOutputPath(outputDir, id, name);
}
@Override
public void close() throws IOException {
Files.move(tmpFile, actualFile, StandardCopyOption.REPLACE_EXISTING);
writer.close();
}
}

View File

@ -8,33 +8,6 @@ import java.nio.file.Path;
public class CrawlerOutputFile {
/** Return the Path to a file for the given id and name */
public static Path getLegacyOutputFile(Path base, String id, String name) {
id = padId(id);
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = base.resolve(first).resolve(second);
return destDir.resolve(STR."\{id}-\{filesystemSafeName(name)}.zstd");
}
/** Return the Path to a file for the given id and name, creating the prerequisite
* directory structure as necessary. */
public static Path createLegacyOutputPath(Path base, String id, String name) throws IOException {
id = padId(id);
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = base.resolve(first).resolve(second);
if (!Files.exists(destDir)) {
Files.createDirectories(destDir);
}
return destDir.resolve(STR."\{id}-\{filesystemSafeName(name)}.zstd");
}
private static String filesystemSafeName(String name) {
StringBuilder nameSaneBuilder = new StringBuilder();

View File

@ -1,113 +0,0 @@
package nu.marginalia.crawling.io.format;
import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStream;
import com.google.gson.Gson;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import java.io.*;
import java.nio.file.Path;
import static java.util.Objects.*;
/** This class is used to read the old format of crawl data, which was zstd-compressed JSON
* with type delimiters between records. It does its best to preserve the semantics of the
* new format. This is slow.
*/
public class CompatibleLegacySerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
private final Gson gson;
private final BufferedReader bufferedReader;
private CrawledDomain domain;
private SerializableCrawlData next;
private final Path path;
private int sizeHint;
public CompatibleLegacySerializableCrawlDataStream(Gson gson, File file) throws IOException {
this.gson = gson;
path = file.toPath();
domain = findDomain(file);
bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE)));
}
@Override
public int sizeHint() {
return sizeHint;
}
/** Scan through the file and find the domain record */
private CrawledDomain findDomain(File file) throws IOException {
try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE)))) {
for (;;sizeHint++) {
String identifierLine =
requireNonNull(br.readLine(), "No identifier line found");
String dataLine =
requireNonNull(br.readLine(), "No data line found");
if (identifierLine.equals(CrawledDomain.SERIAL_IDENTIFIER)) {
return gson.fromJson(dataLine, CrawledDomain.class);
}
}
}
}
@Override
public Path path() {
return path;
}
@Override
public SerializableCrawlData next() throws IOException {
if (hasNext()) {
if (domain != null) {
var ret = domain;
domain = null;
return ret;
}
else {
var ret = next;
next = null;
return ret;
}
}
throw new IllegalStateException("No more data");
}
@Override
public boolean hasNext() throws IOException {
if (domain != null || next != null) {
return true;
}
String identifier = bufferedReader.readLine();
if (identifier == null) {
bufferedReader.close();
return false;
}
String data = bufferedReader.readLine();
if (data == null) {
bufferedReader.close();
return false;
}
if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) {
next = null;
return false; // last record is expected to be the domain, so we're done
} else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) {
next = gson.fromJson(data, CrawledDocument.class);
} else {
throw new IllegalStateException("Unknown identifier: " + identifier);
}
return true;
}
@Override
public void close() throws Exception {
bufferedReader.close();
}
}

View File

@ -1,74 +0,0 @@
package nu.marginalia.crawling.io.format;
import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStream;
import com.google.gson.Gson;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import java.io.*;
import java.nio.file.Path;
/** This class is used to read the old format of crawl data, which was zstd-compressed JSON
* with type delimiters between records. It does not preserve the semantics of the new format,
* but it is faster.
*/
public class FastLegacySerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
private final Gson gson;
private final BufferedReader bufferedReader;
private SerializableCrawlData next = null;
private final Path path;
public FastLegacySerializableCrawlDataStream(Gson gson, File file) throws IOException {
this.gson = gson;
bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE)));
path = file.toPath();
}
@Override
public Path path() {
return path;
}
@Override
public SerializableCrawlData next() throws IOException {
if (hasNext()) {
var ret = next;
next = null;
return ret;
}
throw new IllegalStateException("No more data");
}
@Override
public boolean hasNext() throws IOException {
if (next != null)
return true;
String identifier = bufferedReader.readLine();
if (identifier == null) {
bufferedReader.close();
return false;
}
String data = bufferedReader.readLine();
if (data == null) {
bufferedReader.close();
return false;
}
if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) {
next = gson.fromJson(data, CrawledDomain.class);
} else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) {
next = gson.fromJson(data, CrawledDocument.class);
} else {
throw new IllegalStateException("Unknown identifier: " + identifier);
}
return true;
}
@Override
public void close() throws Exception {
bufferedReader.close();
}
}

View File

@ -29,7 +29,6 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
public ParquetSerializableCrawlDataStream(Path file) throws IOException {
path = file;
backingIterator = CrawledDocumentParquetRecordFileReader.stream(file).iterator();
}
@ -79,6 +78,10 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
String statusReason = "";
String redirectDomain = null;
// The advisory content types are used to signal various states of the crawl
// that are not actual crawled documents.
if (parquetRecord.contentType.equals("x-marginalia/advisory;state=redirect")) {
EdgeUrl crawledUrl = new EdgeUrl(parquetRecord.url);
redirectDomain = crawledUrl.getDomain().toString();
@ -103,8 +106,6 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
));
}
private CrawledDocumentParquetRecord previousRecord = null;
private void createDocumentRecord(CrawledDocumentParquetRecord nextRecord) {
String bodyString = "";
CrawlerDocumentStatus status = CrawlerDocumentStatus.OK;
@ -115,7 +116,8 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
else if (nextRecord.contentType.startsWith("x-marginalia/advisory;state=robots-txt-skipped")) {
status = CrawlerDocumentStatus.ROBOTS_TXT;
}
else if (nextRecord.contentType.startsWith("x-marginalia/advisory")) { // other advisory stuff we don't want
else if (nextRecord.contentType.startsWith("x-marginalia/advisory")) {
// we don't care about the other advisory content types here
return;
}
else if (nextRecord.body != null) {
@ -135,21 +137,6 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
String etag = nextRecord.etagHeader;
String lastModified = nextRecord.lastModifiedHeader;
// If we have a previous record, and it was a 304, and this one is a 200, we'll use the ETag and Last-Modified
// from the previous record, as it's not guaranteed the reference copy will have the same headers due to a bug
// in the crawler. The bug is fixed, but we still need to support old crawls.
//
// This was added in 2024-01-18, so we can remove it in a few months.
if (previousRecord != null
&& previousRecord.url.equals(nextRecord.url)
&& previousRecord.httpStatus == 304
&& nextRecord.httpStatus == 200)
{
etag = previousRecord.etagHeader;
lastModified = previousRecord.lastModifiedHeader;
}
nextQ.add(new CrawledDocument("",
nextRecord.url,
nextRecord.contentType,
@ -166,13 +153,9 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
nextRecord.cookies,
lastModified,
etag));
previousRecord = nextRecord;
}
public void close() throws IOException {
previousRecord = null;
}
public void close() throws IOException {}
@Override
public SerializableCrawlData next() throws IOException {

View File

@ -87,12 +87,6 @@ public class CrawledDocument implements SerializableCrawlData {
return getHeader("Last-Modified");
}
public static final String SERIAL_IDENTIFIER = "// DOCUMENT";
@Override
public String getSerialIdentifier() {
return SERIAL_IDENTIFIER;
}
@Override
public String getDomain() {
if (url == null)

View File

@ -27,13 +27,4 @@ public class CrawledDomain implements SerializableCrawlData {
return doc.size();
}
public boolean hasCookies() {
return cookies != null && !cookies.isEmpty();
}
public static final String SERIAL_IDENTIFIER = "// DOMAIN";
@Override
public String getSerialIdentifier() {
return SERIAL_IDENTIFIER;
}
}

View File

@ -1,6 +1,5 @@
package nu.marginalia.crawling.model;
public interface SerializableCrawlData {
String getSerialIdentifier();
String getDomain();
}

View File

@ -35,7 +35,6 @@ public class CrawledDocumentParquetRecordFileReader {
public Integer finish(Integer target) { return target; }
}),
List.of("statusCode"))
.mapToInt(Integer::valueOf)
.count();
}
}

View File

@ -134,8 +134,6 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
return;
}
byte[] bodyBytes;
String contentType;

View File

@ -1,105 +0,0 @@
package plan;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.process.log.WorkLog;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Predicate;
import java.util.Optional;
@AllArgsConstructor @NoArgsConstructor @ToString
public class CrawlPlan {
private final Logger logger = LoggerFactory.getLogger(getClass());
public String jobSpec;
public WorkDir crawl;
public WorkDir process;
private final static String rootDirRewrite = System.getProperty("crawl.rootDirRewrite");
public Path getJobSpec() {
return Path.of(rewrite(jobSpec));
}
@AllArgsConstructor @NoArgsConstructor @ToString
public static class WorkDir {
public String dir;
public String logName;
public Path getDir() {
return Path.of(rewrite(dir));
}
public Path getLogFile() {
return Path.of(rewrite(dir)).resolve(logName);
}
}
private static String rewrite(String dir) {
if (rootDirRewrite == null) {
return dir;
}
String[] parts = rootDirRewrite.split(":");
return dir.replaceFirst(parts[0], parts[1]);
}
public Path getCrawledFilePath(String fileName) {
int sp = fileName.lastIndexOf('/');
// Normalize the filename
if (sp >= 0 && sp + 1< fileName.length())
fileName = fileName.substring(sp + 1);
if (fileName.length() < 4)
fileName = Strings.repeat("0", 4 - fileName.length()) + fileName;
String sp1 = fileName.substring(0, 2);
String sp2 = fileName.substring(2, 4);
return crawl.getDir().resolve(sp1).resolve(sp2).resolve(fileName);
}
public int countCrawledDomains() {
int count = 0;
for (var ignored : WorkLog.iterable(crawl.getLogFile())) {
count++;
}
return count;
}
@Deprecated
public Iterable<CrawledDomain> domainsIterable() {
// This is no longer supported
throw new UnsupportedOperationException();
}
public Iterable<SerializableCrawlDataStream> crawlDataIterable(Predicate<String> idPredicate) {
return WorkLog.iterableMap(crawl.getLogFile(),
entry -> {
if (!idPredicate.test(entry.id())) {
return Optional.empty();
}
var path = getCrawledFilePath(entry.path());
if (!Files.exists(path)) {
logger.warn("File not found: {}", path);
return Optional.empty();
}
try {
return Optional.of(CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.COMPATIBLE, path));
}
catch (IOException ex) {
return Optional.empty();
}
});
}
}

View File

@ -1,25 +0,0 @@
package plan;
import org.yaml.snakeyaml.Yaml;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Path;
public class CrawlPlanLoader {
private final Yaml yaml;
public CrawlPlanLoader() {
yaml = new Yaml();
}
public CrawlPlan load(Path yamlFile) throws IOException {
try (var reader = new FileReader(yamlFile.toFile())) {
return yaml.loadAs(reader, CrawlPlan.class);
}
catch (IOException ex) {
throw new IOException("Failed to load crawl plan " + yamlFile, ex);
}
}
}

View File

@ -31,6 +31,7 @@ class CrawledDocumentParquetRecordFileWriterTest {
@Test
void testWriteRead() throws IOException {
// Create a record
var original = new CrawledDocumentParquetRecord("www.marginalia.nu",
"https://www.marginalia.nu/",
"127.0.0.1",
@ -41,22 +42,26 @@ class CrawledDocumentParquetRecordFileWriterTest {
"hello world".getBytes(),
null, null);
// Write the record to a file
try (var writer = new CrawledDocumentParquetRecordFileWriter(tempFile)) {
writer.write(original);
}
// Read the file back
var items = new ArrayList<SerializableCrawlData>();
try (var stream = new ParquetSerializableCrawlDataStream(tempFile)) {
while (stream.hasNext()) {
items.add(stream.next());
}
}
// Verify the contents, we should have a domain and a document
assertEquals(2, items.size());
// Verify the domain
var firstItem = items.get(0);
assertInstanceOf(CrawledDomain.class, firstItem);
var domain = (CrawledDomain) firstItem;
assertEquals("www.marginalia.nu", domain.domain);
assertNull(domain.redirectDomain);
@ -65,6 +70,7 @@ class CrawledDocumentParquetRecordFileWriterTest {
assertEquals(new ArrayList<>(), domain.doc);
assertEquals(new ArrayList<>(), domain.cookies);
// Verify the document
var secondItem = items.get(1);
assertInstanceOf(CrawledDocument.class, secondItem);
@ -75,5 +81,31 @@ class CrawledDocumentParquetRecordFileWriterTest {
assertEquals(200, document.httpStatus);
}
// This is an inspection hatch test that reads a file from the odduck.neocities.org domain that didn't load properly,
// leaving as-is in case we need to look into other files in the future
@Test
public void testOdduck() {
Path testPath = Path.of("/home/vlofgren/Exports/22efad51-oddduck.neocities.org.parquet");
// Skip if the file doesn't exist
if (!Files.exists(testPath)) {
return;
}
// Read the file
try (var stream = new ParquetSerializableCrawlDataStream(testPath)) {
while (stream.hasNext()) {
var item = stream.next();
if (item instanceof CrawledDocument doc) {
System.out.println(doc.url);
System.out.println(doc.contentType);
System.out.println(doc.httpStatus);
System.out.println(doc.documentBody.length());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

View File

@ -11,6 +11,10 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.process.log.WorkLogEntry;
import nu.marginalia.service.ProcessMainClass;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.mq.MessageQueueFactory;
@ -23,11 +27,15 @@ import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.util.SimpleBlockingThreadPool;
import nu.marginalia.worklog.BatchingWorkLog;
import nu.marginalia.worklog.BatchingWorkLogImpl;
import plan.CrawlPlan;
import org.apache.logging.log4j.util.Strings;
import nu.marginalia.converting.model.CrawlPlan;
import nu.marginalia.converting.processor.DomainProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import nu.marginalia.converting.model.WorkDir;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collection;
@ -36,6 +44,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX;
@ -118,7 +127,8 @@ public class ConverterMain extends ProcessMainClass {
}
}
public void convert(CrawlPlan plan) throws Exception {
public void convert(int totalDomains, WorkDir crawlDir, WorkDir processedDir) throws Exception {
final int defaultPoolSize = Boolean.getBoolean("system.conserveMemory")
? Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 1, 4) // <-- conserve memory
@ -126,12 +136,11 @@ public class ConverterMain extends ProcessMainClass {
final int maxPoolSize = Integer.getInteger("converter.poolSize", defaultPoolSize);
try (BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(plan.process.getLogFile());
ConverterWriter converterWriter = new ConverterWriter(batchingWorkLog, plan.process.getDir()))
try (BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(processedDir.getLogFile());
ConverterWriter converterWriter = new ConverterWriter(batchingWorkLog, processedDir.getDir()))
{
var pool = new SimpleBlockingThreadPool("ConverterThread", maxPoolSize, 2);
int totalDomains = plan.countCrawledDomains();
AtomicInteger processedDomains = new AtomicInteger(0);
logger.info("Processing {} domains", totalDomains);
@ -139,7 +148,8 @@ public class ConverterMain extends ProcessMainClass {
processedDomains.set(batchingWorkLog.size());
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
for (var domain : plan.crawlDataIterable(id -> !batchingWorkLog.isItemProcessed(id)))
for (var domain : WorkLog.iterableMap(crawlDir.getLogFile(),
new CrawlDataLocator(crawlDir.getDir(), batchingWorkLog)))
{
pool.submit(() -> {
try {
@ -165,6 +175,52 @@ public class ConverterMain extends ProcessMainClass {
}
}
private static class CrawlDataLocator implements Function<WorkLogEntry, Optional<SerializableCrawlDataStream>> {
private final Path crawlRootDir;
private final BatchingWorkLog batchingWorkLog;
CrawlDataLocator(Path crawlRootDir, BatchingWorkLog workLog) {
this.crawlRootDir = crawlRootDir;
this.batchingWorkLog = workLog;
}
@Override
public Optional<SerializableCrawlDataStream> apply(WorkLogEntry entry) {
if (batchingWorkLog.isItemProcessed(entry.id())) {
return Optional.empty();
}
var path = getCrawledFilePath(crawlRootDir, entry.path());
if (!Files.exists(path)) {
logger.warn("File not found: {}", path);
return Optional.empty();
}
try {
return Optional.of(CrawledDomainReader.createDataStream(path));
}
catch (IOException ex) {
return Optional.empty();
}
}
private Path getCrawledFilePath(Path crawlDir, String fileName) {
int sp = fileName.lastIndexOf('/');
// Normalize the filename
if (sp >= 0 && sp + 1< fileName.length())
fileName = fileName.substring(sp + 1);
if (fileName.length() < 4)
fileName = Strings.repeat("0", 4 - fileName.length()) + fileName;
String sp1 = fileName.substring(0, 2);
String sp2 = fileName.substring(2, 4);
return crawlDir.resolve(sp1).resolve(sp2).resolve(fileName);
}
}
private abstract static class ConvertRequest {
private final MqMessage message;
private final MqSingleShotInbox inbox;
@ -196,6 +252,7 @@ public class ConverterMain extends ProcessMainClass {
this.sideloadSources = List.of(sideloadSource);
this.workDir = workDir;
}
SideloadAction(Collection<? extends SideloadSource> sideloadSources,
Path workDir,
MqMessage message, MqSingleShotInbox inbox) {
@ -227,7 +284,7 @@ public class ConverterMain extends ProcessMainClass {
@Override
public void execute(ConverterMain converterMain) throws Exception {
try {
converterMain.convert(plan);
converterMain.convert(plan.countCrawledDomains(), plan.crawl(), plan.process());
ok();
}
catch (Exception ex) {
@ -256,8 +313,9 @@ public class ConverterMain extends ProcessMainClass {
var processData = fileStorageService.getStorage(request.processedDataStorage);
var plan = new CrawlPlan(null,
new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"),
new CrawlPlan.WorkDir(processData.path(), "processor.log"));
new WorkDir(crawlData.path(), "crawler.log"),
new WorkDir(processData.path(), "processor.log")
);
yield new ConvertCrawlDataAction(plan, msg, inbox);
}

View File

@ -0,0 +1,15 @@
package nu.marginalia.converting.model;
import nu.marginalia.process.log.WorkLog;
public record CrawlPlan(String jobSpec, WorkDir crawl, WorkDir process) {
public int countCrawledDomains() {
int count = 0;
for (var ignored : WorkLog.iterable(crawl.getLogFile())) {
count++;
}
return count;
}
}

View File

@ -0,0 +1,13 @@
package nu.marginalia.converting.model;
import java.nio.file.Path;
public record WorkDir(String dir, String logName) {
public Path getDir() {
return Path.of(dir);
}
public Path getLogFile() {
return Path.of(dir).resolve(logName);
}
}

View File

@ -284,7 +284,7 @@ public class CrawlerMain extends ProcessMainClass {
private CrawlDataReference getReference() {
try {
return new CrawlDataReference(CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, outputDir, domain, id));
return new CrawlDataReference(CrawledDomainReader.createDataStream(outputDir, domain, id));
} catch (IOException e) {
logger.debug("Failed to read previous crawl data for {}", specification.domain);
return new CrawlDataReference();

View File

@ -272,6 +272,7 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
// Parse the document and enqueue links
try {
if (fetchedDoc instanceof HttpFetchResult.ResultOk ok) {
var docOpt = ok.parseDocument();

View File

@ -143,7 +143,6 @@ public class HttpFetcherImpl implements HttpFetcher {
public HttpFetchResult fetchContent(EdgeUrl url,
WarcRecorder warcRecorder,
ContentTags contentTags)
throws RateLimitException
{
// We don't want to waste time and resources on URLs that are not HTML, so if the file ending

View File

@ -1,5 +1,6 @@
package nu.marginalia.crawl.retreival.revisit;
import com.google.common.base.Strings;
import crawlercommons.robots.SimpleRobotRules;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
@ -48,23 +49,32 @@ public class CrawlerRevisitor {
continue;
var url = urlMaybe.get();
// If we've previously 404:d on this URL, we'll refrain from trying to fetch it again
// If we've previously 404:d on this URL, we'll refrain from trying to fetch it again,
// since it's likely to 404 again. It will be forgotten by the next crawl though, so
// we'll eventually try again.
if (doc.httpStatus == 404) {
crawlFrontier.addVisited(url);
continue;
}
// If the reference document is empty or the HTTP status is not 200, we'll skip it since it's
// unlikely to produce anything meaningful for us.
if (doc.httpStatus != 200)
continue;
if (Strings.isNullOrEmpty(doc.documentBody))
continue;
if (!crawlFrontier.filterLink(url))
continue;
if (!crawlFrontier.addVisited(url))
continue;
if (!robotsRules.isAllowed(url.toString())) {
warcRecorder.flagAsRobotsTxtError(url);
continue;
}
if (!crawlFrontier.filterLink(url))
continue;
if (!crawlFrontier.addVisited(url))
continue;
if (recrawled > 5
@ -79,10 +89,7 @@ public class CrawlerRevisitor {
crawlFrontier.addVisited(url);
// Hoover up any links from the document
if (doc.httpStatus == 200 && doc.documentBody != null) {
var parsedDoc = Jsoup.parse(doc.documentBody);
crawlFrontier.enqueueLinksFromDocument(url, parsedDoc);
}
crawlFrontier.enqueueLinksFromDocument(url, Jsoup.parse(doc.documentBody));
// Add a WARC record so we don't repeat this
warcRecorder.writeReferenceCopy(url,
@ -97,7 +104,8 @@ public class CrawlerRevisitor {
// providing etag and last-modified headers, so we can recycle the
// document if it hasn't changed without actually downloading it
var reference = new DocumentWithReference(doc, oldCrawlData);
DocumentWithReference reference = new DocumentWithReference(doc, oldCrawlData);
var result = crawlerRetreiver.fetchWriteAndSleep(url, delayTimer, reference);
if (reference.isSame(result)) {

View File

@ -1,51 +0,0 @@
package nu.marginalia.crawling;
import plan.CrawlPlanLoader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import static org.junit.jupiter.api.Assertions.assertEquals;
class CrawlPlanLoaderTest {
Path tempFile;
@BeforeEach
public void setUp() throws IOException {
tempFile = Files.createTempFile(getClass().getSimpleName(), ".yaml");
}
@AfterEach
public void tearDown() throws IOException {
Files.delete(tempFile);
}
@Test
void load() throws IOException {
Files.writeString(tempFile, """
jobSpec: "job.spec"
crawl:
dir: "/foo"
logName: "foo.log"
process:
dir: "/bar"
logName: "bar.log"
""");
var loader = new CrawlPlanLoader();
var ret = loader.load(tempFile);
assertEquals(Path.of("job.spec"), ret.getJobSpec());
assertEquals(Path.of("/foo"), ret.crawl.getDir());
assertEquals(Path.of("/foo/foo.log"), ret.crawl.getLogFile());
assertEquals(Path.of("/bar"), ret.process.getDir());
assertEquals(Path.of("/bar/bar.log"), ret.process.getLogFile());
System.out.println(ret);
}
}

View File

@ -183,7 +183,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1)) {
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@ -236,7 +236,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1)) {
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@ -284,7 +284,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1)) {
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@ -331,7 +331,7 @@ class CrawlerRetreiverTest {
doCrawl(tempFileWarc1, specs);
convertToParquet(tempFileWarc1, tempFileParquet1);
doCrawlWithReferenceStream(specs,
CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1)
CrawledDomainReader.createDataStream(tempFileParquet1)
);
convertToParquet(tempFileWarc2, tempFileParquet2);
@ -352,7 +352,7 @@ class CrawlerRetreiverTest {
});
}
try (var ds = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet2)) {
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
while (ds.hasNext()) {
var doc = ds.next();
if (doc instanceof CrawledDomain dr) {
@ -395,7 +395,7 @@ class CrawlerRetreiverTest {
convertToParquet(tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1)) {
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
var doc = stream.next();
data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc);
@ -404,7 +404,7 @@ class CrawlerRetreiverTest {
throw new RuntimeException(e);
}
var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1);
var stream = CrawledDomainReader.createDataStream(tempFileParquet1);
System.out.println("---");
@ -444,7 +444,7 @@ class CrawlerRetreiverTest {
});
}
try (var ds = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet2)) {
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
while (ds.hasNext()) {
var doc = ds.next();
if (doc instanceof CrawledDomain dr) {

View File

@ -1,57 +0,0 @@
plugins {
id 'java'
id 'application'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(22))
}
}
application {
mainClass = 'nu.marginalia.tools.CrawlDataUnfcker'
applicationName = 'crawl-data-unfcker'
}
tasks.distZip.enabled = false
apply from: "$rootProject.projectDir/srcsets.gradle"
dependencies {
implementation project(':third-party:rdrpostagger')
implementation project(':third-party:porterstemmer')
implementation project(':third-party:monkey-patch-opennlp')
implementation project(':code:common:model')
implementation project(':code:common:config')
implementation project(':code:common:process')
implementation project(':code:common:service')
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:term-frequency-dict')
implementation project(':code:libraries:big-string')
implementation project(':code:processes:converting-process')
implementation project(':code:process-models:crawling-model')
implementation project(':code:features-convert:adblock')
implementation project(':code:features-convert:topic-detection')
implementation project(':code:features-convert:keyword-extraction')
implementation libs.bundles.slf4j
implementation libs.notnull
implementation libs.guice
implementation libs.jsoup
implementation libs.trove
implementation libs.fastutil
implementation libs.bundles.nlp
implementation libs.commons.lang3
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}

View File

@ -1,75 +0,0 @@
package nu.marginalia.tools;
import nu.marginalia.crawling.io.CrawlerOutputFile;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.crawling.io.CrawledDomainReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
public class CrawlDataUnfcker {
public static void main(String... args) {
if (args.length != 2) {
System.out.println("Usage: crawl-data-unfcker input output");
return;
}
Path input = Path.of(args[0]);
Path output = Path.of(args[1]);
if (!Files.isDirectory(input)) {
System.err.println("Input directory is not valid");
return;
}
if (!Files.isDirectory(output)) {
System.err.println("Output directory is not valid");
return;
}
try (var wl = new WorkLog(output.resolve("crawler.log"))) {
for (var inputItem : WorkLog.iterable(input.resolve("crawler.log"))) {
Path inputPath = input.resolve(inputItem.relPath());
var domainMaybe = readDomain(inputPath).map(CrawledDomain::getDomain);
if (domainMaybe.isEmpty())
continue;
var domain = domainMaybe.get();
// Generate conformant ID
String newId = Integer.toHexString(domain.hashCode());
var outputPath = CrawlerOutputFile.createLegacyOutputPath(output, newId, domain);
var outputFileName = outputPath.toFile().getName();
System.out.println(inputPath + " -> " + outputPath);
Files.move(inputPath, outputPath);
wl.setJobToFinished(domain, outputFileName, inputItem.cnt());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
static Optional<CrawledDomain> readDomain(Path file) {
if (!Files.exists(file)) {
System.out.println("Missing file " + file);
return Optional.empty();
}
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.FAST, file)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDomain domain) {
return Optional.of(domain);
}
}
}
catch (Exception ex) {
ex.printStackTrace();
}
return Optional.empty();
}
}

View File

@ -1,3 +0,0 @@
# Crawl Data Unfcker
This is a migration tool that patches the generated ID of crawl data.

View File

@ -48,7 +48,7 @@ public class ExperimentRunnerMain {
Path basePath = Path.of(args[0]);
for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) {
Path crawlDataPath = basePath.resolve(item.relPath());
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.FAST, crawlDataPath)) {
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
experiment.process(stream);
}
catch (Exception ex) {

View File

@ -94,7 +94,6 @@ include 'code:process-models:processed-data'
include 'code:tools:experiment-runner'
include 'code:tools:screenshot-capture-tool'
include 'code:tools:load-test'
include 'code:tools:crawl-data-unfcker'
include 'third-party:porterstemmer'
include 'third-party:symspell'