(warc) Clean up parquet conversion

This commit cleans up the warc->parquet conversion.  Records with a http status other than 200 are now included.

The commit also fixes a bug where the robots.txt parser would be fed the full HTTP response (and choke), instead of the body.

The DocumentBodyExtractor code has also been cleaned up, and now offers a way of just getting the byte[] representation for later processing, as conversion to and from strings is a bit wasteful.
This commit is contained in:
Viktor Lofgren 2023-12-14 16:05:48 +01:00
parent 787a20cbaa
commit 1328bc4938
18 changed files with 278 additions and 132 deletions

View File

@ -11,20 +11,54 @@ import java.io.IOException;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
public class DocumentBodyExtractor { public class DocumentBodyExtractor {
private static ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
private static final Logger logger = LoggerFactory.getLogger(DocumentBodyExtractor.class); private static final Logger logger = LoggerFactory.getLogger(DocumentBodyExtractor.class);
public static DocumentBodyResult extractBody(HttpFetchResult result) { public static DocumentBodyResult<String> asString(HttpFetchResult result) {
if (result instanceof HttpFetchResult.ResultOk fetchOk) { if (result instanceof HttpFetchResult.ResultOk ok) {
return extractBody(fetchOk); return asString(ok);
} }
else { else if (result instanceof HttpFetchResult.ResultRetained retained) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.ERROR, ""); return new DocumentBodyResult.Ok<>(retained.contentType(), retained.body());
}
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "Fetch Result Not Ok");
}
public static DocumentBodyResult<byte[]> asBytes(HttpFetchResult result) {
if (result instanceof HttpFetchResult.ResultOk fetchOk) {
return asBytes(fetchOk);
}
else if (result instanceof HttpFetchResult.ResultRetained retained) {
return new DocumentBodyResult.Ok<>(retained.contentType(), retained.body().getBytes());
}
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "Fetch Result Not Ok");
}
public static DocumentBodyResult<byte[]> asBytes(HttpFetchResult.ResultOk rsp) {
try {
var byteStream = rsp.getInputStream();
if ("gzip".equals(rsp.header("Content-Encoding"))) {
byteStream = new GZIPInputStream(byteStream);
}
byteStream = new BOMInputStream(byteStream);
var contentTypeHeader = rsp.header("Content-Type");
byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder
var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data);
return new DocumentBodyResult.Ok<>(contentType.contentType(), data);
} catch (Exception ex) {
logger.error("Failed to extract body", ex);
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "");
} }
} }
public static DocumentBodyResult extractBody(HttpFetchResult.ResultOk rsp) { public static DocumentBodyResult<String> asString(HttpFetchResult.ResultOk rsp) {
try { try {
var byteStream = rsp.getInputStream(); var byteStream = rsp.getInputStream();
@ -35,25 +69,25 @@ public class DocumentBodyExtractor {
var contentTypeHeader = rsp.header("Content-Type"); var contentTypeHeader = rsp.header("Content-Type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) { if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
} }
byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder
var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data); var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data);
if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) { if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
} }
if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) { if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CHARSET, ""); return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.BAD_CHARSET, "");
} }
return new DocumentBodyResult.Ok(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data)); return new DocumentBodyResult.Ok<>(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data));
} }
catch (IOException ex) { catch (IOException ex) {
logger.error("Failed to extract body", ex); logger.error("Failed to extract body", ex);
return new DocumentBodyResult.Error(CrawlerDocumentStatus.ERROR, ""); return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "");
} }
} }

View File

@ -5,19 +5,35 @@ import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import java.util.Optional; import java.util.Optional;
import java.util.function.BiFunction; import java.util.function.BiFunction;
public sealed interface DocumentBodyResult { public sealed interface DocumentBodyResult<T> {
record Ok(String contentType, String body) implements DocumentBodyResult { record Ok<T>(String contentType, T body) implements DocumentBodyResult<T> {
@Override @Override
public <T> Optional<T> map(BiFunction<String, String, T> fun) { public <T2> Optional<T2> mapOpt(BiFunction<String, T, T2> mapper) {
return Optional.of(fun.apply(contentType, body)); return Optional.of(mapper.apply(contentType, body));
}
@Override
public void ifPresent(ExConsumer<T, Exception> consumer) throws Exception {
consumer.accept(contentType, body);
} }
} }
record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult { record Error<T>(CrawlerDocumentStatus status, String why) implements DocumentBodyResult<T> {
@Override @Override
public <T> Optional<T> map(BiFunction<String, String, T> fun) { public <T2> Optional<T2> mapOpt(BiFunction<String, T, T2> mapper) {
return Optional.empty(); return Optional.empty();
} }
@Override
public void ifPresent(ExConsumer<T, Exception> consumer) throws Exception {
}
} }
<T> Optional<T> map(BiFunction<String, String, T> fun); <T2> Optional<T2> mapOpt(BiFunction<String, T, T2> mapper);
void ifPresent(ExConsumer<T,Exception> consumer) throws Exception;
interface ExConsumer<T,E extends Exception> {
void accept(String contentType, T t) throws E;
}
} }

View File

@ -4,12 +4,12 @@ import okhttp3.Headers;
import org.jsoup.Jsoup; import org.jsoup.Jsoup;
import org.netpreserve.jwarc.MessageHeaders; import org.netpreserve.jwarc.MessageHeaders;
import org.netpreserve.jwarc.WarcResponse; import org.netpreserve.jwarc.WarcResponse;
import org.netpreserve.jwarc.WarcRevisit;
import org.jsoup.nodes.Document; import org.jsoup.nodes.Document;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -18,44 +18,39 @@ public sealed interface HttpFetchResult {
boolean isOk(); boolean isOk();
static ResultOk importWarc(WarcResponse response) throws IOException { static HttpFetchResult importWarc(WarcResponse response) {
var http = response.http(); try {
try (var body = http.body()) { var http = response.http();
byte[] bytes = body.stream().readAllBytes();
return new ResultOk( try (var body = http.body()) {
response.targetURI(), byte[] bytes = body.stream().readAllBytes();
http.status(),
http.headers(), String ipAddress = response
bytes, .ipAddress()
0, .map(InetAddress::getHostAddress)
bytes.length .orElse("");
);
return new ResultOk(
response.targetURI(),
http.status(),
http.headers(),
ipAddress,
bytes,
0,
bytes.length
);
}
}
catch (Exception ex) {
return new ResultException(ex);
} }
} }
static ResultOk importWarc(WarcRevisit revisit) throws IOException {
var http = revisit.http();
try (var body = http.body()) {
byte[] bytes = body.stream().readAllBytes();
return new ResultOk(
revisit.targetURI(),
http.status(),
http.headers(),
bytes,
0,
bytes.length
);
}
finally {
revisit.body().consume();
}
}
record ResultOk(URI uri, record ResultOk(URI uri,
int statusCode, int statusCode,
Headers headers, Headers headers,
String ipAddress,
byte[] bytesRaw, byte[] bytesRaw,
int bytesStart, int bytesStart,
int bytesLength int bytesLength
@ -68,10 +63,11 @@ public sealed interface HttpFetchResult {
public ResultOk(URI uri, public ResultOk(URI uri,
int statusCode, int statusCode,
MessageHeaders headers, MessageHeaders headers,
String ipAddress,
byte[] bytesRaw, byte[] bytesRaw,
int bytesStart, int bytesStart,
int bytesLength) { int bytesLength) {
this(uri, statusCode, convertHeaders(headers), bytesRaw, bytesStart, bytesLength); this(uri, statusCode, convertHeaders(headers), ipAddress, bytesRaw, bytesStart, bytesLength);
} }
private static Headers convertHeaders(MessageHeaders headers) { private static Headers convertHeaders(MessageHeaders headers) {
@ -89,8 +85,8 @@ public sealed interface HttpFetchResult {
} }
public Optional<Document> parseDocument() throws IOException { public Optional<Document> parseDocument() throws IOException {
return switch(DocumentBodyExtractor.extractBody(this)) { return switch(DocumentBodyExtractor.asString(this)) {
case DocumentBodyResult.Ok ok when "text/html".equalsIgnoreCase(ok.contentType()) case DocumentBodyResult.Ok<String> ok when "text/html".equalsIgnoreCase(ok.contentType())
-> Optional.of(Jsoup.parse(ok.body())); -> Optional.of(Jsoup.parse(ok.body()));
default -> Optional.empty(); default -> Optional.empty();
}; };
@ -105,7 +101,7 @@ public sealed interface HttpFetchResult {
}; };
record ResultRetained(String url, String body) implements HttpFetchResult { record ResultRetained(String url, String contentType, String body) implements HttpFetchResult {
public boolean isOk() { public boolean isOk() {
return true; return true;

View File

@ -10,9 +10,7 @@ public class CrawlerOutputFile {
/** Return the Path to a file for the given id and name */ /** Return the Path to a file for the given id and name */
public static Path getLegacyOutputFile(Path base, String id, String name) { public static Path getLegacyOutputFile(Path base, String id, String name) {
if (id.length() < 4) { id = padId(id);
id = Strings.repeat("0", 4 - id.length()) + id;
}
String first = id.substring(0, 2); String first = id.substring(0, 2);
String second = id.substring(2, 4); String second = id.substring(2, 4);
@ -24,9 +22,7 @@ public class CrawlerOutputFile {
/** Return the Path to a file for the given id and name, creating the prerequisite /** Return the Path to a file for the given id and name, creating the prerequisite
* directory structure as necessary. */ * directory structure as necessary. */
public static Path createLegacyOutputPath(Path base, String id, String name) throws IOException { public static Path createLegacyOutputPath(Path base, String id, String name) throws IOException {
if (id.length() < 4) { id = padId(id);
id = Strings.repeat("0", 4 - id.length()) + id;
}
String first = id.substring(0, 2); String first = id.substring(0, 2);
String second = id.substring(2, 4); String second = id.substring(2, 4);
@ -54,9 +50,7 @@ public class CrawlerOutputFile {
} }
public static Path createWarcPath(Path basePath, String id, String domain, WarcFileVersion version) throws IOException { public static Path createWarcPath(Path basePath, String id, String domain, WarcFileVersion version) throws IOException {
if (id.length() < 4) { id = padId(id);
id = Strings.repeat("0", 4 - id.length()) + id;
}
String first = id.substring(0, 2); String first = id.substring(0, 2);
String second = id.substring(2, 4); String second = id.substring(2, 4);
@ -68,10 +62,20 @@ public class CrawlerOutputFile {
return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}-\{version.suffix}.warc.gz"); return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}-\{version.suffix}.warc.gz");
} }
public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) { public static Path createParquetPath(Path basePath, String id, String domain) throws IOException {
if (id.length() < 4) { id = padId(id);
id = Strings.repeat("0", 4 - id.length()) + id;
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = basePath.resolve(first).resolve(second);
if (!Files.exists(destDir)) {
Files.createDirectories(destDir);
} }
return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}.parquet");
}
public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) {
id = padId(id);
String first = id.substring(0, 2); String first = id.substring(0, 2);
String second = id.substring(2, 4); String second = id.substring(2, 4);
@ -80,6 +84,18 @@ public class CrawlerOutputFile {
return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}.warc\{version.suffix}"); return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}.warc\{version.suffix}");
} }
/**
* Pads the given ID with leading zeros to ensure it has a length of 4 characters.
*/
private static String padId(String id) {
if (id.length() < 4) {
id = Strings.repeat("0", 4 - id.length()) + id;
}
return id;
}
public enum WarcFileVersion { public enum WarcFileVersion {
LIVE("open"), LIVE("open"),
TEMP("tmp"), TEMP("tmp"),

View File

@ -88,10 +88,9 @@ public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, Se
if (http.status() != 200) { if (http.status() != 200) {
return; return;
} }
CrawledDocument document;
var parsedBody = DocumentBodyExtractor.extractBody(HttpFetchResult.importWarc(response)); var parsedBody = DocumentBodyExtractor.asString(HttpFetchResult.importWarc(response));
if (parsedBody instanceof DocumentBodyResult.Error error) { if (parsedBody instanceof DocumentBodyResult.Error<String> error) {
next = new CrawledDocument( next = new CrawledDocument(
"", "",
response.targetURI().toString(), response.targetURI().toString(),
@ -106,7 +105,7 @@ public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, Se
"", "",
"", "",
""); "");
} else if (parsedBody instanceof DocumentBodyResult.Ok ok) { } else if (parsedBody instanceof DocumentBodyResult.Ok<String> ok) {
next = new CrawledDocument( next = new CrawledDocument(
"", "",
response.targetURI().toString(), response.targetURI().toString(),

View File

@ -22,6 +22,7 @@ public class CrawledDocumentParquetRecord {
public String url; public String url;
public String ip; public String ip;
public boolean cookies; public boolean cookies;
public int httpStatus;
public String contentType; public String contentType;
public byte[] body; public byte[] body;
@ -39,6 +40,7 @@ public class CrawledDocumentParquetRecord {
Types.required(BINARY).as(stringType()).named("url"), Types.required(BINARY).as(stringType()).named("url"),
Types.required(BINARY).as(stringType()).named("ip"), Types.required(BINARY).as(stringType()).named("ip"),
Types.required(BOOLEAN).named("cookies"), Types.required(BOOLEAN).named("cookies"),
Types.required(INT32).named("httpStatus"),
Types.required(BINARY).as(stringType()).named("contentType"), Types.required(BINARY).as(stringType()).named("contentType"),
Types.required(BINARY).named("body") Types.required(BINARY).named("body")
); );
@ -49,6 +51,7 @@ public class CrawledDocumentParquetRecord {
case "domain" -> domain = (String) value; case "domain" -> domain = (String) value;
case "url" -> url = (String) value; case "url" -> url = (String) value;
case "ip" -> ip = (String) value; case "ip" -> ip = (String) value;
case "httpStatus" -> httpStatus = (Integer) value;
case "cookies" -> cookies = (Boolean) value; case "cookies" -> cookies = (Boolean) value;
case "contentType" -> contentType = (String) value; case "contentType" -> contentType = (String) value;
case "body" -> body = (byte[]) value; case "body" -> body = (byte[]) value;
@ -61,6 +64,7 @@ public class CrawledDocumentParquetRecord {
valueWriter.write("domain", domain); valueWriter.write("domain", domain);
valueWriter.write("url", url); valueWriter.write("url", url);
valueWriter.write("ip", ip); valueWriter.write("ip", ip);
valueWriter.write("httpStatus", httpStatus);
valueWriter.write("cookies", cookies); valueWriter.write("cookies", cookies);
valueWriter.write("contentType", contentType); valueWriter.write("contentType", contentType);
valueWriter.write("body", body); valueWriter.write("body", body);

View File

@ -1,12 +1,37 @@
package nu.marginalia.crawling.parquet; package nu.marginalia.crawling.parquet;
import blue.strategic.parquet.ParquetWriter; import blue.strategic.parquet.ParquetWriter;
import nu.marginalia.crawling.body.DocumentBodyExtractor;
import nu.marginalia.crawling.body.DocumentBodyResult;
import nu.marginalia.crawling.body.HttpFetchResult;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRecord;
import org.netpreserve.jwarc.WarcResponse;
import org.netpreserve.jwarc.WarcXResponseReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable { public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
private final ParquetWriter<CrawledDocumentParquetRecord> writer; private final ParquetWriter<CrawledDocumentParquetRecord> writer;
private static final Logger logger = LoggerFactory.getLogger(CrawledDocumentParquetRecordFileWriter.class);
public static void convertWarc(String domain, Path warcInputFile, Path parquetOutputFile) throws IOException {
try (var warcReader = new WarcReader(warcInputFile);
var parquetWriter = new CrawledDocumentParquetRecordFileWriter(parquetOutputFile)
) {
WarcXResponseReference.register(warcReader);
for (var record : warcReader) {
parquetWriter.write(domain, record);
}
}
catch (Exception ex) {
logger.error("Failed to convert WARC file to Parquet", ex);
}
}
public CrawledDocumentParquetRecordFileWriter(Path file) throws IOException { public CrawledDocumentParquetRecordFileWriter(Path file) throws IOException {
writer = ParquetWriter.writeFile(CrawledDocumentParquetRecord.schema, writer = ParquetWriter.writeFile(CrawledDocumentParquetRecord.schema,
@ -17,6 +42,42 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
writer.write(domainData); writer.write(domainData);
} }
public void write(String domain, WarcRecord record) throws IOException {
if (!(record instanceof WarcResponse ref)) {
return;
}
HttpFetchResult result = HttpFetchResult.importWarc(ref);
if (!(result instanceof HttpFetchResult.ResultOk fetchOk)) {
return;
}
byte[] bodyBytes;
String contentType;
var body = DocumentBodyExtractor.asBytes(result);
if (body instanceof DocumentBodyResult.Ok<byte[]> bodyOk) {
bodyBytes = bodyOk.body();
contentType = bodyOk.contentType();
}
else {
bodyBytes = new byte[0];
contentType = "";
}
write(new CrawledDocumentParquetRecord(
domain,
ref.target(),
fetchOk.ipAddress(),
false, // FIXME
fetchOk.statusCode(),
contentType,
bodyBytes)
);
}
public void close() throws IOException { public void close() throws IOException {
writer.close(); writer.close();
} }

View File

@ -3,6 +3,7 @@ package nu.marginalia.crawling.parquet;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.netpreserve.jwarc.net.WarcRecorder;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@ -29,6 +30,7 @@ class CrawledDocumentParquetRecordFileWriterTest {
"https://www.marginalia.nu/", "https://www.marginalia.nu/",
"127.0.0.1", "127.0.0.1",
false, false,
200,
"text/html", "text/html",
"hello world".getBytes()); "hello world".getBytes());
@ -41,4 +43,5 @@ class CrawledDocumentParquetRecordFileWriterTest {
assertEquals(original, actual); assertEquals(original, actual);
} }
} }
} }

View File

@ -19,6 +19,7 @@ import nu.marginalia.crawl.spec.DbCrawlSpecProvider;
import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider; import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider;
import nu.marginalia.crawling.io.CrawledDomainReader; import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.CrawlerOutputFile; import nu.marginalia.crawling.io.CrawlerOutputFile;
import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter;
import nu.marginalia.crawlspec.CrawlSpecFileNames; import nu.marginalia.crawlspec.CrawlSpecFileNames;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.model.crawlspec.CrawlSpecRecord; import nu.marginalia.model.crawlspec.CrawlSpecRecord;
@ -29,7 +30,6 @@ import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog; import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.crawling.io.CrawledDomainWriter;
import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.util.SimpleBlockingThreadPool; import nu.marginalia.util.SimpleBlockingThreadPool;
import okhttp3.ConnectionPool; import okhttp3.ConnectionPool;
@ -216,6 +216,7 @@ public class CrawlerMain {
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE); Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path finalWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); Path finalWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL);
Path parquetFile = CrawlerOutputFile.createParquetPath(outputDir, id, domain);
if (Files.exists(newWarcFile)) { if (Files.exists(newWarcFile)) {
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING); Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
@ -245,6 +246,9 @@ public class CrawlerMain {
Files.move(newWarcFile, finalWarcFile, StandardCopyOption.REPLACE_EXISTING); Files.move(newWarcFile, finalWarcFile, StandardCopyOption.REPLACE_EXISTING);
CrawledDocumentParquetRecordFileWriter
.convertWarc(domain, finalWarcFile, parquetFile);
workLog.setJobToFinished(domain, finalWarcFile.toString(), size); workLog.setJobToFinished(domain, finalWarcFile.toString(), size);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);

View File

@ -251,7 +251,7 @@ public class CrawlerRetreiver implements AutoCloseable {
var doc = reference.doc(); var doc = reference.doc();
if (doc != null) { if (doc != null) {
warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody); warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody);
fetchedDoc = new HttpFetchResult.ResultRetained(doc.url, doc.documentBody); fetchedDoc = new HttpFetchResult.ResultRetained(doc.url, doc.contentType, doc.documentBody);
} }
} }

View File

@ -33,6 +33,8 @@ public class CrawlerWarcResynchronizer {
public void run(Path tempFile) { public void run(Path tempFile) {
// First pass, enqueue links // First pass, enqueue links
try (var reader = new WarcReader(tempFile)) { try (var reader = new WarcReader(tempFile)) {
WarcXResponseReference.register(reader);
for (var item : reader) { for (var item : reader) {
accept(item); accept(item);
} }
@ -54,8 +56,6 @@ public class CrawlerWarcResynchronizer {
try { try {
if (item instanceof WarcResponse rsp) { if (item instanceof WarcResponse rsp) {
response(rsp); response(rsp);
} else if (item instanceof WarcRevisit revisit) {
revisit(revisit);
} else if (item instanceof WarcRequest req) { } else if (item instanceof WarcRequest req) {
request(req); request(req);
} }
@ -76,35 +76,18 @@ public class CrawlerWarcResynchronizer {
try { try {
var response = HttpFetchResult.importWarc(rsp); var response = HttpFetchResult.importWarc(rsp);
if (DocumentBodyExtractor.extractBody(response) instanceof DocumentBodyResult.Ok ok) { DocumentBodyExtractor
var doc = Jsoup.parse(ok.body()); .asString(response)
.ifPresent((ct, body) ->
{
var doc = Jsoup.parse(body);
crawlFrontier.enqueueLinksFromDocument(url, doc); crawlFrontier.enqueueLinksFromDocument(url, doc);
} });
} }
catch (Exception e) { catch (Exception e) {
logger.info(STR."Failed to parse response body for \{url}", e); logger.info(STR."Failed to parse response body for \{url}", e);
} }
} }
private void revisit(WarcRevisit revisit) throws IOException {
if (!WarcRecorder.documentRevisitURN.equals(revisit.profile())) {
return;
}
var url = new EdgeUrl(revisit.targetURI());
crawlFrontier.addVisited(url);
try {
var response = HttpFetchResult.importWarc(revisit);
if (DocumentBodyExtractor.extractBody(response) instanceof DocumentBodyResult.Ok ok) {
var doc = Jsoup.parse(ok.body());
crawlFrontier.enqueueLinksFromDocument(url, doc);
}
}
catch (Exception e) {
logger.info(STR."Failed to parse response body for \{url}", e);
}
}
} }

View File

@ -11,6 +11,8 @@ import nu.marginalia.crawl.retreival.fetcher.ContentTypeProber.ContentTypeProbeR
import nu.marginalia.crawl.retreival.fetcher.socket.FastTerminatingSocketFactory; import nu.marginalia.crawl.retreival.fetcher.socket.FastTerminatingSocketFactory;
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.crawl.retreival.fetcher.socket.NoSecuritySSL; import nu.marginalia.crawl.retreival.fetcher.socket.NoSecuritySSL;
import nu.marginalia.crawling.body.DocumentBodyExtractor;
import nu.marginalia.crawling.body.DocumentBodyResult;
import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.body.ContentTypeLogic; import nu.marginalia.crawling.body.ContentTypeLogic;
@ -263,24 +265,19 @@ public class HttpFetcherImpl implements HttpFetcher {
HttpFetchResult result = recorder.fetch(client, getBuilder.build()); HttpFetchResult result = recorder.fetch(client, getBuilder.build());
if (result instanceof HttpFetchResult.ResultOk ok) { return DocumentBodyExtractor.asBytes(result).mapOpt((contentType, body) ->
return Optional.of(parseRobotsTxt(ok)); robotsParser.parseContent(url.toString(),
} body,
else { contentType,
return Optional.empty(); userAgent)
} );
} }
catch (Exception ex) { catch (Exception ex) {
return Optional.empty(); return Optional.empty();
} }
} }
private SimpleRobotRules parseRobotsTxt(HttpFetchResult.ResultOk ok) {
return robotsParser.parseContent(ok.uri().toString(),
ok.bytesRaw(),
ok.header("Content-Type"),
userAgent);
}
} }

View File

@ -6,6 +6,8 @@ import okhttp3.Response;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.net.URI; import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.StringJoiner; import java.util.StringJoiner;
@ -18,7 +20,8 @@ public class WarcProtocolReconstructor {
static String getHttpRequestString(Request request, URI uri) { static String getHttpRequestString(Request request, URI uri) {
StringBuilder requestStringBuilder = new StringBuilder(); StringBuilder requestStringBuilder = new StringBuilder();
requestStringBuilder.append(request.method()).append(" ").append(uri.getPath()); requestStringBuilder.append(request.method()).append(" ").append(URLEncoder.encode(uri.getPath(), StandardCharsets.UTF_8));
if (uri.getQuery() != null) { if (uri.getQuery() != null) {
requestStringBuilder.append("?").append(uri.getQuery()); requestStringBuilder.append("?").append(uri.getQuery());
} }

View File

@ -29,8 +29,6 @@ import java.util.*;
* be reconstructed. * be reconstructed.
*/ */
public class WarcRecorder implements AutoCloseable { public class WarcRecorder implements AutoCloseable {
public static final URI documentRevisitURN = URI.create("urn:marginalia/data/doc/revisit");
public static final URI documentRobotsTxtSkippedURN = URI.create("urn:marginalia/meta/doc/robots-txt-skipped"); public static final URI documentRobotsTxtSkippedURN = URI.create("urn:marginalia/meta/doc/robots-txt-skipped");
public static final URI documentBadContentTypeURN = URI.create("urn:marginalia/meta/doc/content-type-failed-probe"); public static final URI documentBadContentTypeURN = URI.create("urn:marginalia/meta/doc/content-type-failed-probe");
public static final URI documentProbeTimeout = URI.create("urn:marginalia/meta/doc/timeout-probe"); public static final URI documentProbeTimeout = URI.create("urn:marginalia/meta/doc/timeout-probe");
@ -173,6 +171,7 @@ public class WarcRecorder implements AutoCloseable {
return new HttpFetchResult.ResultOk(uri, return new HttpFetchResult.ResultOk(uri,
response.code(), response.code(),
response.headers(), response.headers(),
ip,
responseDataBuffer.data, responseDataBuffer.data,
dataStart, dataStart,
responseDataBuffer.length() - dataStart); responseDataBuffer.length() - dataStart);

View File

@ -1,13 +1,11 @@
package nu.marginalia.crawl.retreival.revisit; package nu.marginalia.crawl.retreival.revisit;
import lombok.SneakyThrows;
import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.fetcher.ContentTags; import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.body.DocumentBodyExtractor; import nu.marginalia.crawling.body.DocumentBodyExtractor;
import nu.marginalia.crawling.body.DocumentBodyResult;
import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.model.EdgeUrl;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -40,9 +38,11 @@ public record DocumentWithReference(
if (doc.documentBody == null) if (doc.documentBody == null)
return false; return false;
return DocumentBodyExtractor.extractBody(resultOk) if (!(DocumentBodyExtractor.asString(resultOk) instanceof DocumentBodyResult.Ok<String> bodyOk)) {
.map((contentType, body) -> reference.isContentBodySame(doc.documentBody, body)) return false;
.orElse(false); }
return reference.isContentBodySame(doc.documentBody, bodyOk.body());
} }
public ContentTags getContentTags() { public ContentTags getContentTags() {

View File

@ -2,6 +2,8 @@ package nu.marginalia.crawl.retreival.fetcher;
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileReader;
import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
@ -20,10 +22,10 @@ import java.util.Map;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
class WarcRecorderTest { class WarcRecorderTest {
Path fileName; Path fileNameWarc;
Path fileNameParquet;
WarcRecorder client; WarcRecorder client;
OkHttpClient httpClient; OkHttpClient httpClient;
@BeforeEach @BeforeEach
@ -32,14 +34,16 @@ class WarcRecorderTest {
.addNetworkInterceptor(new IpInterceptingNetworkInterceptor()) .addNetworkInterceptor(new IpInterceptingNetworkInterceptor())
.build(); .build();
fileName = Files.createTempFile("test", ".warc"); fileNameWarc = Files.createTempFile("test", ".warc");
client = new WarcRecorder(fileName); fileNameParquet = Files.createTempFile("test", ".parquet");
client = new WarcRecorder(fileNameWarc);
} }
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
client.close(); client.close();
Files.delete(fileName); Files.delete(fileNameWarc);
} }
@Test @Test
@ -49,10 +53,10 @@ class WarcRecorderTest {
.addHeader("Accept-Encoding", "gzip") .addHeader("Accept-Encoding", "gzip")
.get().build()); .get().build());
new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out); new GZIPInputStream(Files.newInputStream(fileNameWarc)).transferTo(System.out);
Map<String, String> sampleData = new HashMap<>(); Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileName)) { try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> { warcReader.forEach(record -> {
if (record instanceof WarcRequest req) { if (record instanceof WarcRequest req) {
sampleData.put(record.type(), req.target()); sampleData.put(record.type(), req.target());
@ -70,14 +74,14 @@ class WarcRecorderTest {
@Test @Test
public void flagAsSkipped() throws IOException, URISyntaxException { public void flagAsSkipped() throws IOException, URISyntaxException {
try (var recorder = new WarcRecorder(fileName)) { try (var recorder = new WarcRecorder(fileNameWarc)) {
recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"), recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"),
"text/html", "text/html",
200, 200,
"<?doctype html><html><body>test</body></html>"); "<?doctype html><html><body>test</body></html>");
} }
try (var reader = new WarcReader(fileName)) { try (var reader = new WarcReader(fileNameWarc)) {
for (var record : reader) { for (var record : reader) {
if (record instanceof WarcResponse rsp) { if (record instanceof WarcResponse rsp) {
assertEquals("https://www.marginalia.nu/", rsp.target()); assertEquals("https://www.marginalia.nu/", rsp.target());
@ -88,19 +92,19 @@ class WarcRecorderTest {
} }
} }
new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out); new GZIPInputStream(Files.newInputStream(fileNameWarc)).transferTo(System.out);
} }
@Test @Test
public void testSaveImport() throws URISyntaxException, IOException { public void testSaveImport() throws URISyntaxException, IOException {
try (var recorder = new WarcRecorder(fileName)) { try (var recorder = new WarcRecorder(fileNameWarc)) {
recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"), recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"),
"text/html", "text/html",
200, 200,
"<?doctype html><html><body>test</body></html>"); "<?doctype html><html><body>test</body></html>");
} }
try (var reader = new WarcReader(fileName)) { try (var reader = new WarcReader(fileNameWarc)) {
WarcXResponseReference.register(reader); WarcXResponseReference.register(reader);
for (var record : reader) { for (var record : reader) {
@ -114,4 +118,30 @@ class WarcRecorderTest {
} }
@Test
public void testConvertToParquet() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
client.fetch(httpClient, new Request.Builder().url("https://www.marginalia.nu/")
.addHeader("User-agent", "test.marginalia.nu")
.addHeader("Accept-Encoding", "gzip")
.get().build());
client.fetch(httpClient, new Request.Builder().url("https://www.marginalia.nu/log/")
.addHeader("User-agent", "test.marginalia.nu")
.addHeader("Accept-Encoding", "gzip")
.get().build());
client.fetch(httpClient, new Request.Builder().url("https://www.marginalia.nu/sanic.png")
.addHeader("User-agent", "test.marginalia.nu")
.addHeader("Accept-Encoding", "gzip")
.get().build());
client.close();
CrawledDocumentParquetRecordFileWriter.convertWarc("www.marginalia.nu", fileNameWarc, fileNameParquet);
var urls = CrawledDocumentParquetRecordFileReader.stream(fileNameParquet).map(doc -> doc.url).toList();
assertEquals(3, urls.size());
assertEquals("https://www.marginalia.nu/", urls.get(0));
assertEquals("https://www.marginalia.nu/log/", urls.get(1));
assertEquals("https://www.marginalia.nu/sanic.png", urls.get(2));
}
} }

View File

@ -36,7 +36,7 @@ class HttpFetcherTest {
var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler"); var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler");
try (var recorder = new WarcRecorder()) { try (var recorder = new WarcRecorder()) {
var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, ContentTags.empty()); var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, ContentTags.empty());
if (DocumentBodyExtractor.extractBody(result) instanceof DocumentBodyResult.Ok bodyOk) { if (DocumentBodyExtractor.asString(result) instanceof DocumentBodyResult.Ok bodyOk) {
System.out.println(bodyOk.contentType()); System.out.println(bodyOk.contentType());
} }
} }
@ -48,7 +48,7 @@ class HttpFetcherTest {
try (var recorder = new WarcRecorder()) { try (var recorder = new WarcRecorder()) {
var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, ContentTags.empty()); var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, ContentTags.empty());
if (DocumentBodyExtractor.extractBody(result) instanceof DocumentBodyResult.Ok bodyOk) { if (DocumentBodyExtractor.asString(result) instanceof DocumentBodyResult.Ok bodyOk) {
System.out.println(bodyOk.contentType()); System.out.println(bodyOk.contentType());
} }
} }

View File

@ -127,6 +127,7 @@ public class CrawlerMockFetcherTest {
url.asURI(), url.asURI(),
200, 200,
new Headers.Builder().build(), new Headers.Builder().build(),
"127.0.0.1",
bodyBytes, bodyBytes,
0, 0,
bodyBytes.length bodyBytes.length