diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java index eb7ffd75..bb79dcf0 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java @@ -3,7 +3,6 @@ package nu.marginalia.crawling.io; import com.google.gson.Gson; import nu.marginalia.crawling.io.format.LegacySerializableCrawlDataStream; import nu.marginalia.crawling.io.format.ParquetSerializableCrawlDataStream; -import nu.marginalia.crawling.io.format.WarcSerializableCrawlDataStream; import nu.marginalia.model.gson.GsonFactory; import java.io.*; @@ -22,9 +21,6 @@ public class CrawledDomainReader { if (fileName.endsWith(".zstd")) { return new LegacySerializableCrawlDataStream(gson, fullPath.toFile()); } - else if (fileName.endsWith(".warc") || fileName.endsWith(".warc.gz")) { - return new WarcSerializableCrawlDataStream(fullPath); - } else if (fileName.endsWith(".parquet")) { return new ParquetSerializableCrawlDataStream(fullPath); } @@ -36,14 +32,10 @@ public class CrawledDomainReader { /** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */ public static SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException { Path parquetPath = CrawlerOutputFile.getParquetPath(basePath, id, domain); - Path warcPath = CrawlerOutputFile.getWarcPath(basePath, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); if (Files.exists(parquetPath)) { return createDataStream(parquetPath); } - if (Files.exists(warcPath)) { - return createDataStream(warcPath); - } else { return createDataStream(CrawlerOutputFile.getLegacyOutputFile(basePath, id, domain)); } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java index ad6b4358..25673f13 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java @@ -107,8 +107,7 @@ public class CrawlerOutputFile { public enum WarcFileVersion { LIVE("open"), - TEMP("tmp"), - FINAL("final"); + TEMP("tmp"); public final String suffix; diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcSerializableCrawlDataStream.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcSerializableCrawlDataStream.java deleted file mode 100644 index b848d7ad..00000000 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcSerializableCrawlDataStream.java +++ /dev/null @@ -1,157 +0,0 @@ -package nu.marginalia.crawling.io.format; - -import lombok.SneakyThrows; -import nu.marginalia.crawling.body.DocumentBodyExtractor; -import nu.marginalia.crawling.body.DocumentBodyResult; -import nu.marginalia.crawling.body.HttpFetchResult; -import nu.marginalia.crawling.io.SerializableCrawlDataStream; -import nu.marginalia.crawling.model.CrawledDocument; -import nu.marginalia.crawling.model.CrawledDomain; -import nu.marginalia.crawling.model.SerializableCrawlData; -import org.netpreserve.jwarc.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.*; - -public class WarcSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream { - private static final Logger logger = LoggerFactory.getLogger(WarcSerializableCrawlDataStream.class); - - private final WarcReader reader; - private final Iterator backingIterator; - private SerializableCrawlData next = null; - private final Path path; - - public WarcSerializableCrawlDataStream(Path file) throws IOException { - path = file; - reader = new WarcReader(file); - WarcXResponseReference.register(reader); - WarcXEntityRefused.register(reader); - - backingIterator = reader.iterator(); - } - - @Override - public Path path() { - return path; - } - - @Override - @SneakyThrows - public boolean hasNext() { - while (backingIterator.hasNext() && next == null) { - var nextRecord = backingIterator.next(); - if (nextRecord instanceof WarcResponse response) { // this also includes WarcXResponseReference - convertResponse(response); - } - else if (nextRecord instanceof Warcinfo warcinfo) { - convertWarcinfo(warcinfo); - } - } - return next != null; - } - - private void convertWarcinfo(Warcinfo warcinfo) throws IOException { - var headers = warcinfo.fields(); - String probeStatus = headers.first("X-WARC-Probe-Status").orElse(""); - String[] parts = probeStatus.split(" ", 2); - - - String domain = headers.first("domain").orElseThrow(() -> new IllegalStateException("Missing domain header")); - String status = parts[0]; - String statusReason = parts.length > 1 ? parts[1] : ""; - String ip = headers.first("ip").orElse(""); - - String redirectDomain = null; - if ("REDIRECT".equalsIgnoreCase(status)) { - redirectDomain = statusReason; - } - - next = new CrawledDomain(domain, redirectDomain, status, statusReason, ip, - new ArrayList<>(), - new ArrayList<>() - ); - } - - private void convertResponse(WarcResponse response) throws IOException { - var http = response.http(); - - if (http.status() != 200) { - return; - } - - var httpHeaders = http.headers(); - - var parsedBody = DocumentBodyExtractor.asString(HttpFetchResult.importWarc(response)); - if (parsedBody instanceof DocumentBodyResult.Error error) { - next = new CrawledDocument( - "", - response.targetURI().toString(), - http.contentType().raw(), - response.date().toString(), - http.status(), - error.status().toString(), - error.why(), - headers(http.headers()), - null, - response.payloadDigest().map(WarcDigest::base64).orElse(""), - "", - "", - "", - WarcXCookieInformationHeader.hasCookies(response), - null, - null - ); - } else if (parsedBody instanceof DocumentBodyResult.Ok ok) { - next = new CrawledDocument( - "", - response.targetURI().toString(), - ok.contentType().toString(), - response.date().toString(), - http.status(), - "OK", - "", - headers(http.headers()), - ok.body(), - response.payloadDigest().map(WarcDigest::base64).orElse(""), - "", - "", - "", - WarcXCookieInformationHeader.hasCookies(response), - httpHeaders.first("Last-Modified").orElse(""), - httpHeaders.first("ETag").orElse("")); - } else { - // unreachable - throw new IllegalStateException("Unknown body type: " + parsedBody); - } - } - - public String headers(MessageHeaders headers) { - StringJoiner ret = new StringJoiner("\n"); - for (var header : headers.map().entrySet()) { - for (var value : header.getValue()) { - ret.add(STR."\{header.getKey()}: \{value}"); - } - } - return ret.toString(); - } - - public void close() throws IOException { - reader.close(); - } - - @Override - public SerializableCrawlData next() throws IOException { - if (!hasNext()) - throw new NoSuchElementException(); - try { - return next; - } - finally { - next = null; - } - } - -} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index c3864868..f4b5b1e9 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -217,7 +217,6 @@ public class CrawlerMain { Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE); Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); - Path finalWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); Path parquetFile = CrawlerOutputFile.createParquetPath(outputDir, id, domain); if (Files.exists(newWarcFile)) { diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java index bfcc1617..c2dc70d4 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java @@ -1,6 +1,7 @@ package nu.marginalia.crawling.retreival; import lombok.SneakyThrows; +import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.crawl.retreival.CrawlDataReference; @@ -10,10 +11,11 @@ import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawling.io.CrawledDomainReader; -import nu.marginalia.crawling.io.CrawledDomainWriter; import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.crawling.model.CrawledDomain; import nu.marginalia.crawling.model.SerializableCrawlData; +import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter; +import nu.marginalia.io.crawlspec.CrawlSpecRecordParquetFileWriter; import nu.marginalia.model.crawlspec.CrawlSpecRecord; import org.junit.jupiter.api.*; import org.netpreserve.jwarc.*; @@ -23,7 +25,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.*; import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -32,11 +33,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class CrawlerRetreiverTest { private HttpFetcher httpFetcher; - Path tempFile; - Path tempFile2; + Path tempFileWarc1; + Path tempFileParquet1; + Path tempFileWarc2; + Path tempFileParquet2; @BeforeEach - public void setUp() { + public void setUp() throws IOException { httpFetcher = new HttpFetcherImpl("search.marginalia.nu; testing a bit :D"); + tempFileParquet1 = Files.createTempFile("crawling-process", ".parquet"); + tempFileParquet2 = Files.createTempFile("crawling-process", ".parquet"); + } @SneakyThrows @@ -48,11 +54,17 @@ class CrawlerRetreiverTest { @AfterEach public void tearDown() throws IOException { - if (tempFile != null) { - Files.deleteIfExists(tempFile); + if (tempFileWarc1 != null) { + Files.deleteIfExists(tempFileWarc1); } - if (tempFile2 != null) { - Files.deleteIfExists(tempFile2); + if (tempFileParquet1 != null) { + Files.deleteIfExists(tempFileParquet1); + } + if (tempFileWarc2 != null) { + Files.deleteIfExists(tempFileWarc2); + } + if (tempFileParquet2 != null) { + Files.deleteIfExists(tempFileParquet2); } } @Test @@ -111,17 +123,19 @@ class CrawlerRetreiverTest { List data = new ArrayList<>(); - tempFile = Files.createTempFile("crawling-process", ".warc"); + tempFileWarc1 = Files.createTempFile("crawling-process", ".warc"); - try (var recorder = new WarcRecorder(tempFile)) { + try (var recorder = new WarcRecorder(tempFileWarc1)) { new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } + CrawledDocumentParquetRecordFileWriter.convertWarc("www.marginalia.nu", + new UserAgent("test"), tempFileWarc1, tempFileParquet1); - try (var stream = CrawledDomainReader.createDataStream(tempFile)) { + try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { while (stream.hasNext()) { if (stream.next() instanceof CrawledDocument doc) { data.add(doc); @@ -161,17 +175,20 @@ class CrawlerRetreiverTest { List data = new ArrayList<>(); - tempFile = Files.createTempFile("crawling-process", ".warc"); + tempFileWarc1 = Files.createTempFile("crawling-process", ".warc"); - try (var recorder = new WarcRecorder(tempFile)) { + try (var recorder = new WarcRecorder(tempFileWarc1)) { new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } + CrawledDocumentParquetRecordFileWriter.convertWarc("www.marginalia.nu", + new UserAgent("test"), tempFileWarc1, tempFileParquet1); - try (var stream = CrawledDomainReader.createDataStream(tempFile)) { + + try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { while (stream.hasNext()) { if (stream.next() instanceof CrawledDocument doc) { data.add(doc); @@ -212,19 +229,22 @@ class CrawlerRetreiverTest { .build(); - tempFile = Files.createTempFile("crawling-process", ".warc.gz"); - tempFile2 = Files.createTempFile("crawling-process", ".warc.gz"); + tempFileWarc1 = Files.createTempFile("crawling-process", ".warc.gz"); + tempFileWarc2 = Files.createTempFile("crawling-process", ".warc.gz"); Map, List> data = new HashMap<>(); - try (var recorder = new WarcRecorder(tempFile)) { + try (var recorder = new WarcRecorder(tempFileWarc1)) { new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } - try (var stream = CrawledDomainReader.createDataStream(tempFile)) { + CrawledDocumentParquetRecordFileWriter.convertWarc("www.marginalia.nu", + new UserAgent("test"), tempFileWarc1, tempFileParquet1); + + try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) { while (stream.hasNext()) { var doc = stream.next(); data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc); @@ -232,13 +252,15 @@ class CrawlerRetreiverTest { } catch (Exception e) { throw new RuntimeException(e); } - var stream = CrawledDomainReader.createDataStream(tempFile); + + + var stream = CrawledDomainReader.createDataStream(tempFileParquet1); System.out.println("---"); CrawledDomain domain = (CrawledDomain) data.get(CrawledDomain.class).get(0); domain.doc = data.get(CrawledDocument.class).stream().map(CrawledDocument.class::cast).collect(Collectors.toList()); - try (var recorder = new WarcRecorder(tempFile2)) { + try (var recorder = new WarcRecorder(tempFileWarc2)) { new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(new DomainLinks(), new CrawlDataReference(stream)); } @@ -246,7 +268,12 @@ class CrawlerRetreiverTest { Assertions.fail(ex); } - try (var reader = new WarcReader(tempFile2)) { + + CrawledDocumentParquetRecordFileWriter.convertWarc("www.marginalia.nu", + new UserAgent("test"), tempFileWarc2, tempFileParquet2); + + + try (var reader = new WarcReader(tempFileWarc2)) { WarcXResponseReference.register(reader); reader.forEach(record -> { @@ -263,7 +290,7 @@ class CrawlerRetreiverTest { }); } - try (var ds = CrawledDomainReader.createDataStream(tempFile2)) { + try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) { while (ds.hasNext()) { var doc = ds.next(); if (doc instanceof CrawledDomain dr) { @@ -275,7 +302,6 @@ class CrawlerRetreiverTest { } } catch (Exception e) { throw new RuntimeException(e); - } } } \ No newline at end of file