(cleanup) Remove vestigial support for WARC crawl data streams

This commit is contained in:
Viktor Lofgren 2023-12-20 15:46:21 +01:00
parent bfae478251
commit a5bc29245b
5 changed files with 51 additions and 192 deletions

View File

@ -3,7 +3,6 @@ package nu.marginalia.crawling.io;
import com.google.gson.Gson;
import nu.marginalia.crawling.io.format.LegacySerializableCrawlDataStream;
import nu.marginalia.crawling.io.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.crawling.io.format.WarcSerializableCrawlDataStream;
import nu.marginalia.model.gson.GsonFactory;
import java.io.*;
@ -22,9 +21,6 @@ public class CrawledDomainReader {
if (fileName.endsWith(".zstd")) {
return new LegacySerializableCrawlDataStream(gson, fullPath.toFile());
}
else if (fileName.endsWith(".warc") || fileName.endsWith(".warc.gz")) {
return new WarcSerializableCrawlDataStream(fullPath);
}
else if (fileName.endsWith(".parquet")) {
return new ParquetSerializableCrawlDataStream(fullPath);
}
@ -36,14 +32,10 @@ public class CrawledDomainReader {
/** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */
public static SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException {
Path parquetPath = CrawlerOutputFile.getParquetPath(basePath, id, domain);
Path warcPath = CrawlerOutputFile.getWarcPath(basePath, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL);
if (Files.exists(parquetPath)) {
return createDataStream(parquetPath);
}
if (Files.exists(warcPath)) {
return createDataStream(warcPath);
}
else {
return createDataStream(CrawlerOutputFile.getLegacyOutputFile(basePath, id, domain));
}

View File

@ -107,8 +107,7 @@ public class CrawlerOutputFile {
public enum WarcFileVersion {
LIVE("open"),
TEMP("tmp"),
FINAL("final");
TEMP("tmp");
public final String suffix;

View File

@ -1,157 +0,0 @@
package nu.marginalia.crawling.io.format;
import lombok.SneakyThrows;
import nu.marginalia.crawling.body.DocumentBodyExtractor;
import nu.marginalia.crawling.body.DocumentBodyResult;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import org.netpreserve.jwarc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.*;
public class WarcSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
private static final Logger logger = LoggerFactory.getLogger(WarcSerializableCrawlDataStream.class);
private final WarcReader reader;
private final Iterator<WarcRecord> backingIterator;
private SerializableCrawlData next = null;
private final Path path;
public WarcSerializableCrawlDataStream(Path file) throws IOException {
path = file;
reader = new WarcReader(file);
WarcXResponseReference.register(reader);
WarcXEntityRefused.register(reader);
backingIterator = reader.iterator();
}
@Override
public Path path() {
return path;
}
@Override
@SneakyThrows
public boolean hasNext() {
while (backingIterator.hasNext() && next == null) {
var nextRecord = backingIterator.next();
if (nextRecord instanceof WarcResponse response) { // this also includes WarcXResponseReference
convertResponse(response);
}
else if (nextRecord instanceof Warcinfo warcinfo) {
convertWarcinfo(warcinfo);
}
}
return next != null;
}
private void convertWarcinfo(Warcinfo warcinfo) throws IOException {
var headers = warcinfo.fields();
String probeStatus = headers.first("X-WARC-Probe-Status").orElse("");
String[] parts = probeStatus.split(" ", 2);
String domain = headers.first("domain").orElseThrow(() -> new IllegalStateException("Missing domain header"));
String status = parts[0];
String statusReason = parts.length > 1 ? parts[1] : "";
String ip = headers.first("ip").orElse("");
String redirectDomain = null;
if ("REDIRECT".equalsIgnoreCase(status)) {
redirectDomain = statusReason;
}
next = new CrawledDomain(domain, redirectDomain, status, statusReason, ip,
new ArrayList<>(),
new ArrayList<>()
);
}
private void convertResponse(WarcResponse response) throws IOException {
var http = response.http();
if (http.status() != 200) {
return;
}
var httpHeaders = http.headers();
var parsedBody = DocumentBodyExtractor.asString(HttpFetchResult.importWarc(response));
if (parsedBody instanceof DocumentBodyResult.Error<String> error) {
next = new CrawledDocument(
"",
response.targetURI().toString(),
http.contentType().raw(),
response.date().toString(),
http.status(),
error.status().toString(),
error.why(),
headers(http.headers()),
null,
response.payloadDigest().map(WarcDigest::base64).orElse(""),
"",
"",
"",
WarcXCookieInformationHeader.hasCookies(response),
null,
null
);
} else if (parsedBody instanceof DocumentBodyResult.Ok<String> ok) {
next = new CrawledDocument(
"",
response.targetURI().toString(),
ok.contentType().toString(),
response.date().toString(),
http.status(),
"OK",
"",
headers(http.headers()),
ok.body(),
response.payloadDigest().map(WarcDigest::base64).orElse(""),
"",
"",
"",
WarcXCookieInformationHeader.hasCookies(response),
httpHeaders.first("Last-Modified").orElse(""),
httpHeaders.first("ETag").orElse(""));
} else {
// unreachable
throw new IllegalStateException("Unknown body type: " + parsedBody);
}
}
public String headers(MessageHeaders headers) {
StringJoiner ret = new StringJoiner("\n");
for (var header : headers.map().entrySet()) {
for (var value : header.getValue()) {
ret.add(STR."\{header.getKey()}: \{value}");
}
}
return ret.toString();
}
public void close() throws IOException {
reader.close();
}
@Override
public SerializableCrawlData next() throws IOException {
if (!hasNext())
throw new NoSuchElementException();
try {
return next;
}
finally {
next = null;
}
}
}

View File

@ -217,7 +217,6 @@ public class CrawlerMain {
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path finalWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL);
Path parquetFile = CrawlerOutputFile.createParquetPath(outputDir, id, domain);
if (Files.exists(newWarcFile)) {

View File

@ -1,6 +1,7 @@
package nu.marginalia.crawling.retreival;
import lombok.SneakyThrows;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.crawl.retreival.CrawlDataReference;
@ -10,10 +11,11 @@ import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.CrawledDomainWriter;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter;
import nu.marginalia.io.crawlspec.CrawlSpecRecordParquetFileWriter;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import org.junit.jupiter.api.*;
import org.netpreserve.jwarc.*;
@ -23,7 +25,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -32,11 +33,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class CrawlerRetreiverTest {
private HttpFetcher httpFetcher;
Path tempFile;
Path tempFile2;
Path tempFileWarc1;
Path tempFileParquet1;
Path tempFileWarc2;
Path tempFileParquet2;
@BeforeEach
public void setUp() {
public void setUp() throws IOException {
httpFetcher = new HttpFetcherImpl("search.marginalia.nu; testing a bit :D");
tempFileParquet1 = Files.createTempFile("crawling-process", ".parquet");
tempFileParquet2 = Files.createTempFile("crawling-process", ".parquet");
}
@SneakyThrows
@ -48,11 +54,17 @@ class CrawlerRetreiverTest {
@AfterEach
public void tearDown() throws IOException {
if (tempFile != null) {
Files.deleteIfExists(tempFile);
if (tempFileWarc1 != null) {
Files.deleteIfExists(tempFileWarc1);
}
if (tempFile2 != null) {
Files.deleteIfExists(tempFile2);
if (tempFileParquet1 != null) {
Files.deleteIfExists(tempFileParquet1);
}
if (tempFileWarc2 != null) {
Files.deleteIfExists(tempFileWarc2);
}
if (tempFileParquet2 != null) {
Files.deleteIfExists(tempFileParquet2);
}
}
@Test
@ -111,17 +123,19 @@ class CrawlerRetreiverTest {
List<SerializableCrawlData> data = new ArrayList<>();
tempFile = Files.createTempFile("crawling-process", ".warc");
tempFileWarc1 = Files.createTempFile("crawling-process", ".warc");
try (var recorder = new WarcRecorder(tempFile)) {
try (var recorder = new WarcRecorder(tempFileWarc1)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch();
}
catch (IOException ex) {
Assertions.fail(ex);
}
CrawledDocumentParquetRecordFileWriter.convertWarc("www.marginalia.nu",
new UserAgent("test"), tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFile)) {
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@ -161,17 +175,20 @@ class CrawlerRetreiverTest {
List<SerializableCrawlData> data = new ArrayList<>();
tempFile = Files.createTempFile("crawling-process", ".warc");
tempFileWarc1 = Files.createTempFile("crawling-process", ".warc");
try (var recorder = new WarcRecorder(tempFile)) {
try (var recorder = new WarcRecorder(tempFileWarc1)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch();
}
catch (IOException ex) {
Assertions.fail(ex);
}
CrawledDocumentParquetRecordFileWriter.convertWarc("www.marginalia.nu",
new UserAgent("test"), tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFile)) {
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
@ -212,19 +229,22 @@ class CrawlerRetreiverTest {
.build();
tempFile = Files.createTempFile("crawling-process", ".warc.gz");
tempFile2 = Files.createTempFile("crawling-process", ".warc.gz");
tempFileWarc1 = Files.createTempFile("crawling-process", ".warc.gz");
tempFileWarc2 = Files.createTempFile("crawling-process", ".warc.gz");
Map<Class<? extends SerializableCrawlData>, List<SerializableCrawlData>> data = new HashMap<>();
try (var recorder = new WarcRecorder(tempFile)) {
try (var recorder = new WarcRecorder(tempFileWarc1)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch();
}
catch (IOException ex) {
Assertions.fail(ex);
}
try (var stream = CrawledDomainReader.createDataStream(tempFile)) {
CrawledDocumentParquetRecordFileWriter.convertWarc("www.marginalia.nu",
new UserAgent("test"), tempFileWarc1, tempFileParquet1);
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
while (stream.hasNext()) {
var doc = stream.next();
data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc);
@ -232,13 +252,15 @@ class CrawlerRetreiverTest {
} catch (Exception e) {
throw new RuntimeException(e);
}
var stream = CrawledDomainReader.createDataStream(tempFile);
var stream = CrawledDomainReader.createDataStream(tempFileParquet1);
System.out.println("---");
CrawledDomain domain = (CrawledDomain) data.get(CrawledDomain.class).get(0);
domain.doc = data.get(CrawledDocument.class).stream().map(CrawledDocument.class::cast).collect(Collectors.toList());
try (var recorder = new WarcRecorder(tempFile2)) {
try (var recorder = new WarcRecorder(tempFileWarc2)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(new DomainLinks(),
new CrawlDataReference(stream));
}
@ -246,7 +268,12 @@ class CrawlerRetreiverTest {
Assertions.fail(ex);
}
try (var reader = new WarcReader(tempFile2)) {
CrawledDocumentParquetRecordFileWriter.convertWarc("www.marginalia.nu",
new UserAgent("test"), tempFileWarc2, tempFileParquet2);
try (var reader = new WarcReader(tempFileWarc2)) {
WarcXResponseReference.register(reader);
reader.forEach(record -> {
@ -263,7 +290,7 @@ class CrawlerRetreiverTest {
});
}
try (var ds = CrawledDomainReader.createDataStream(tempFile2)) {
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
while (ds.hasNext()) {
var doc = ds.next();
if (doc instanceof CrawledDomain dr) {
@ -275,7 +302,6 @@ class CrawlerRetreiverTest {
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}