diff --git a/code/common/model/src/main/java/nu/marginalia/model/EdgeUrl.java b/code/common/model/src/main/java/nu/marginalia/model/EdgeUrl.java index 9def0480..f0f23956 100644 --- a/code/common/model/src/main/java/nu/marginalia/model/EdgeUrl.java +++ b/code/common/model/src/main/java/nu/marginalia/model/EdgeUrl.java @@ -232,4 +232,11 @@ public class EdgeUrl implements Serializable { return new URL(this.proto, this.domain.toString(), port, this.path); } + + public URI asURI() throws URISyntaxException { + if (port == null) + return new URI(this.proto, null, this.domain.toString(), this.path, this.param); + else + return new URI(this.proto, null, this.domain.toString(), this.port, this.path, this.param, null); + } } diff --git a/code/processes/crawling-process/build.gradle b/code/processes/crawling-process/build.gradle index dbac9b66..baa02906 100644 --- a/code/processes/crawling-process/build.gradle +++ b/code/processes/crawling-process/build.gradle @@ -49,6 +49,7 @@ dependencies { implementation libs.guice implementation libs.gson implementation libs.zstd + implementation libs.jwarc implementation libs.crawlercommons implementation libs.okhttp3 implementation libs.jsoup diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java index 872e00f3..57e73c44 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java @@ -8,9 +8,9 @@ import lombok.SneakyThrows; import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.crawl.retreival.Cookies; import nu.marginalia.crawl.retreival.RateLimitException; +import nu.marginalia.crawl.retreival.fetcher.socket.*; import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.crawling.model.CrawlerDocumentStatus; -import nu.marginalia.contenttype.ContentType; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; @@ -26,10 +26,7 @@ import javax.net.ssl.X509TrustManager; import java.io.EOFException; import java.io.IOException; import java.net.*; -import java.nio.charset.Charset; import java.nio.charset.IllegalCharsetNameException; -import java.nio.charset.StandardCharsets; -import java.nio.charset.UnsupportedCharsetException; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.TimeUnit; @@ -65,6 +62,7 @@ public class HttpFetcherImpl implements HttpFetcher { return builder.sslSocketFactory(NoSecuritySSL.buildSocketFactory(), (X509TrustManager) NoSecuritySSL.trustAllCerts[0]) .socketFactory(ftSocketFactory) .hostnameVerifier(NoSecuritySSL.buildHostnameVerifyer()) + .addNetworkInterceptor(new IpInterceptingNetworkInterceptor()) .connectionPool(pool) .cookieJar(cookies.getJar()) .followRedirects(true) @@ -141,8 +139,8 @@ public class HttpFetcherImpl implements HttpFetcher { var headBuilder = new Request.Builder().head() .addHeader("User-agent", userAgent) - .url(url.toString()) - .addHeader("Accept-Encoding", "gzip"); + .addHeader("Accept-Encoding", "gzip") + .url(url.toString()); var head = headBuilder.build(); var call = client.newCall(head); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/FastTerminatingSocketFactory.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/socket/FastTerminatingSocketFactory.java similarity index 96% rename from code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/FastTerminatingSocketFactory.java rename to code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/socket/FastTerminatingSocketFactory.java index add64e29..ffb29b33 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/FastTerminatingSocketFactory.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/socket/FastTerminatingSocketFactory.java @@ -1,4 +1,4 @@ -package nu.marginalia.crawl.retreival.fetcher; +package nu.marginalia.crawl.retreival.fetcher.socket; import javax.net.SocketFactory; import java.io.IOException; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/socket/IpInterceptingNetworkInterceptor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/socket/IpInterceptingNetworkInterceptor.java new file mode 100644 index 00000000..c5eb76ac --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/socket/IpInterceptingNetworkInterceptor.java @@ -0,0 +1,24 @@ +package nu.marginalia.crawl.retreival.fetcher.socket; + +import okhttp3.Interceptor; +import okhttp3.Response; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; + +public class IpInterceptingNetworkInterceptor implements Interceptor { + @NotNull + @Override + public Response intercept(@NotNull Interceptor.Chain chain) throws IOException { + String IP = chain.connection().socket().getInetAddress().getHostAddress(); + + return chain.proceed(chain.request()) + .newBuilder() + .addHeader("X-Remote-IP", IP) + .build(); + } + + public static String getIpFromResponse(Response response) { + return response.header("X-Remote-IP"); + } +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/NoSecuritySSL.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/socket/NoSecuritySSL.java similarity index 89% rename from code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/NoSecuritySSL.java rename to code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/socket/NoSecuritySSL.java index f86d2c48..45dc431c 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/NoSecuritySSL.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/socket/NoSecuritySSL.java @@ -1,4 +1,4 @@ -package nu.marginalia.crawl.retreival.fetcher; +package nu.marginalia.crawl.retreival.fetcher.socket; import lombok.SneakyThrows; @@ -8,6 +8,8 @@ import java.security.cert.X509Certificate; public class NoSecuritySSL { // Create a trust manager that does not validate certificate chains + // We want to accept e.g. self-signed certificates and certificates + // that are not signed by a CA is generally trusted by the system. public static final TrustManager[] trustAllCerts = new TrustManager[]{ new X509TrustManager() { @Override diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcDigestBuilder.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcDigestBuilder.java new file mode 100644 index 00000000..88052a7e --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcDigestBuilder.java @@ -0,0 +1,29 @@ +package nu.marginalia.crawl.retreival.fetcher.warc; + +import org.netpreserve.jwarc.WarcDigest; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +class WarcDigestBuilder { + private final MessageDigest digest; + + private static final String digestAlgorithm = "SHA-1"; + + public WarcDigestBuilder() throws NoSuchAlgorithmException { + this.digest = MessageDigest.getInstance(digestAlgorithm); + } + + public void update(String s) { + byte[] bytes = s.getBytes(); + update(bytes, bytes.length); + } + + public void update(byte[] buffer, int n) { + digest.update(buffer, 0, n); + } + + public WarcDigest build() { + return new WarcDigest(digest); + } +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java new file mode 100644 index 00000000..a583bcc9 --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java @@ -0,0 +1,127 @@ +package nu.marginalia.crawl.retreival.fetcher.warc; + +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import org.apache.commons.lang3.StringUtils; + +import java.net.URI; +import java.util.Arrays; +import java.util.Map; +import java.util.StringJoiner; +import java.util.stream.Collectors; + +/** We don't have access to the raw HTTP request and response, so we need to reconstruct them + * as best is possible from the data we have available. + */ +public class WarcProtocolReconstructor { + + static String getHttpRequestString(Request request, URI uri) { + StringBuilder requestStringBuilder = new StringBuilder(); + requestStringBuilder.append(request.method()).append(" ").append(uri.getPath()); + if (uri.getQuery() != null) { + requestStringBuilder.append("?").append(uri.getQuery()); + } + requestStringBuilder.append(" HTTP/1.1\r\n"); + requestStringBuilder.append("Host: ").append(uri.getHost()).append("\r\n"); + + request.headers().toMultimap().forEach((k, values) -> { + for (var value : values) { + requestStringBuilder.append(capitalizeHeader(k)).append(": ").append(value).append("\r\n"); + } + }); + + return requestStringBuilder.toString(); + } + + static String getResponseHeader(Response response) { + String version = response.protocol() == Protocol.HTTP_1_1 ? "1.1" : "2.0"; + + String statusCode = String.valueOf(response.code()); + String statusMessage = STATUS_CODE_MAP.getOrDefault(response.code(), "Unknown"); + + String headerString = getHeadersAsString(response); + + return STR."HTTP/\{version} \{statusCode} \{statusMessage}\r\n\{headerString}\r\n\r\n"; + } + + private static final Map STATUS_CODE_MAP = Map.ofEntries( + Map.entry(200, "OK"), + Map.entry(201, "Created"), + Map.entry(202, "Accepted"), + Map.entry(203, "Non-Authoritative Information"), + Map.entry(204, "No Content"), + Map.entry(205, "Reset Content"), + Map.entry(206, "Partial Content"), + Map.entry(207, "Multi-Status"), + Map.entry(208, "Already Reported"), + Map.entry(226, "IM Used"), + Map.entry(300, "Multiple Choices"), + Map.entry(301, "Moved Permanently"), + Map.entry(302, "Found"), + Map.entry(303, "See Other"), + Map.entry(304, "Not Modified"), + Map.entry(307, "Temporary Redirect"), + Map.entry(308, "Permanent Redirect"), + Map.entry(400, "Bad Request"), + Map.entry(401, "Unauthorized"), + Map.entry(403, "Forbidden"), + Map.entry(404, "Not Found"), + Map.entry(405, "Method Not Allowed"), + Map.entry(406, "Not Acceptable"), + Map.entry(408, "Request Timeout"), + Map.entry(409, "Conflict"), + Map.entry(410, "Gone"), + Map.entry(411, "Length Required"), + Map.entry(412, "Precondition Failed"), + Map.entry(413, "Payload Too Large"), + Map.entry(414, "URI Too Long"), + Map.entry(415, "Unsupported Media Type"), + Map.entry(416, "Range Not Satisfiable"), + Map.entry(417, "Expectation Failed"), + Map.entry(418, "I'm a teapot"), + Map.entry(421, "Misdirected Request"), + Map.entry(426, "Upgrade Required"), + Map.entry(428, "Precondition Required"), + Map.entry(429, "Too Many Requests"), + Map.entry(431, "Request Header Fields Too Large"), + Map.entry(451, "Unavailable For Legal Reasons"), + Map.entry(500, "Internal Server Error"), + Map.entry(501, "Not Implemented"), + Map.entry(502, "Bad Gateway"), + Map.entry(503, "Service Unavailable"), + Map.entry(504, "Gateway Timeout"), + Map.entry(505, "HTTP Version Not Supported"), + Map.entry(506, "Variant Also Negotiates"), + Map.entry(507, "Insufficient Storage"), + Map.entry(508, "Loop Detected"), + Map.entry(510, "Not Extended"), + Map.entry(511, "Network Authentication Required") + ); + + + static private String getHeadersAsString(Response response) { + StringJoiner joiner = new StringJoiner("\r\n"); + + response.headers().toMultimap().forEach((k, values) -> { + String headerCapitalized = capitalizeHeader(k); + + if (headerCapitalized.startsWith("X")) + return; + + for (var value : values) { + joiner.add(headerCapitalized + ": " + value); + } + }); + return joiner.toString(); + } + + // okhttp gives us flattened headers, so we need to reconstruct Camel-Kebab-Case style + // for the WARC parser's sake... + static private String capitalizeHeader(String k) { + return Arrays.stream(StringUtils.split(k, '-')) + .map(StringUtils::capitalize) + .collect(Collectors.joining("-")); + } + +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecordingFetcherClient.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecordingFetcherClient.java new file mode 100644 index 00000000..a3440c5a --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecordingFetcherClient.java @@ -0,0 +1,175 @@ +package nu.marginalia.crawl.retreival.fetcher.warc; + +import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; +import nu.marginalia.model.EdgeDomain; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import org.netpreserve.jwarc.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.security.NoSuchAlgorithmException; +import java.time.Instant; +import java.util.Optional; + +/** Based on JWarc's fetch method, APL 2.0 license + *

+ * This class wraps OkHttp's OkHttpClient and records the HTTP request and response in a WARC file, + * as best is possible given not all the data is available at the same time and needs to + * be reconstructed. + */ +public class WarcRecordingFetcherClient implements AutoCloseable { + private static final int MAX_TIME = 30_000; + private static final int MAX_SIZE = 1024 * 1024 * 10; + private final WarcWriter writer; + + private final EdgeDomain domain; + private static final Logger logger = LoggerFactory.getLogger(WarcRecordingFetcherClient.class); + + + public WarcRecordingFetcherClient(Path warcFile, EdgeDomain domain) throws IOException { + this.writer = new WarcWriter(warcFile); + this.domain = domain; + } + + public Optional fetch(OkHttpClient client, Request request) throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException { + URI uri = request.url().uri(); + + WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder(); + WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder(); + + String ip; + Instant date = Instant.now(); + long startMillis = date.toEpochMilli(); + + Path tempFileName = Files.createTempFile(domain.toString(), ".data"); + + var call = client.newCall(request); + + int totalLength = 0; + + WarcTruncationReason truncationReason = null; + + + + try (FileChannel tempFile = + (FileChannel) Files.newByteChannel(tempFileName, StandardOpenOption.READ, StandardOpenOption.WRITE); + var response = call.execute() + ) { + var body = response.body(); + InputStream inputStream; + + if (body == null) { + inputStream = null; + truncationReason = WarcTruncationReason.DISCONNECT; + } + else { + inputStream = body.byteStream(); + } + + byte[] buf = new byte[8192]; + + ip = IpInterceptingNetworkInterceptor.getIpFromResponse(response); + + String responseHeaders = WarcProtocolReconstructor.getResponseHeader(response); + tempFile.write(ByteBuffer.wrap(responseHeaders.getBytes())); + responseDigestBuilder.update(responseHeaders); + + while (inputStream != null) { + int remainingLength; + + if (MAX_SIZE > 0 && MAX_SIZE - totalLength < buf.length) { + remainingLength = (MAX_SIZE - totalLength); + } else { + remainingLength = buf.length; + } + + int n = inputStream.read(buf, 0, remainingLength); + if (n < 0) + break; + + totalLength += n; + + for (int i = 0; i < n; ) { + int written = tempFile.write(ByteBuffer.wrap(buf, i, n - i)); + i += written; + } + + responseDigestBuilder.update(buf, n); + payloadDigestBuilder.update(buf, n); + + if (MAX_TIME > 0 && System.currentTimeMillis() - startMillis > MAX_TIME) { + truncationReason = WarcTruncationReason.TIME; + break; + } + if (MAX_SIZE > 0 && totalLength >= MAX_SIZE) { + truncationReason = WarcTruncationReason.LENGTH; + break; + } + } + + tempFile.position(0); + WarcResponse.Builder responseBuilder = new WarcResponse.Builder(uri) + .blockDigest(responseDigestBuilder.build()) + .date(date) + .body(MediaType.HTTP_RESPONSE, tempFile, tempFile.size()); + + if (ip != null) responseBuilder.ipAddress(InetAddress.getByName(ip)); + + responseBuilder.payloadDigest(payloadDigestBuilder.build()); + + if (truncationReason != null) + responseBuilder.truncated(truncationReason); + + // Build and write the response + + var warcResponse = responseBuilder.build(); + warcResponse.http(); // force HTTP header to be parsed before body is consumed so that caller can use it + writer.write(warcResponse); + + // Build and write the request + + WarcDigestBuilder requestDigestBuilder = new WarcDigestBuilder(); + + String httpRequestString = WarcProtocolReconstructor.getHttpRequestString(response.request(), uri); + + requestDigestBuilder.update(httpRequestString); + + WarcRequest warcRequest = new WarcRequest.Builder(uri) + .blockDigest(requestDigestBuilder.build()) + .date(date) + .body(MediaType.HTTP_REQUEST, httpRequestString.getBytes()) + .concurrentTo(warcResponse.id()) + .build(); + warcRequest.http(); // force HTTP header to be parsed before body is consumed so that caller can use it + writer.write(warcRequest); + + return Optional.of(warcResponse); + } + catch (Exception ex) { + logger.warn("Failed to fetch URL {}", uri, ex); + return Optional.empty(); + } + finally { + Files.deleteIfExists(tempFileName); + } + } + + public void close() { + try { + writer.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecordingFetcherClientTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecordingFetcherClientTest.java new file mode 100644 index 00000000..c1129e86 --- /dev/null +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecordingFetcherClientTest.java @@ -0,0 +1,70 @@ +package nu.marginalia.crawl.retreival.fetcher; + +import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; +import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecordingFetcherClient; +import nu.marginalia.model.EdgeDomain; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.netpreserve.jwarc.WarcReader; +import org.netpreserve.jwarc.WarcRequest; +import org.netpreserve.jwarc.WarcResponse; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; +import java.util.zip.GZIPInputStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class WarcRecordingFetcherClientTest { + Path fileName; + WarcRecordingFetcherClient client; + OkHttpClient httpClient; + @BeforeEach + public void setUp() throws Exception { + httpClient = new OkHttpClient.Builder() + .addNetworkInterceptor(new IpInterceptingNetworkInterceptor()) + .build(); + + fileName = Files.createTempFile("test", ".warc.gz"); + client = new WarcRecordingFetcherClient(fileName, new EdgeDomain("www.marginalia.nu")); + } + + @AfterEach + public void tearDown() throws Exception { + client.close(); + Files.delete(fileName); + } + + @Test + void fetch() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException { + client.fetch(httpClient, new Request.Builder().url("https://www.marginalia.nu/") + .addHeader("User-agent", "test.marginalia.nu") + .addHeader("Accept-Encoding", "gzip") + .get().build()); + + new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out); + + Map sampleData = new HashMap<>(); + try (var warcReader = new WarcReader(fileName)) { + warcReader.forEach(record -> { + if (record instanceof WarcRequest req) { + sampleData.put(record.type(), req.target()); + } + if (record instanceof WarcResponse rsp) { + sampleData.put(record.type(), rsp.target()); + } + }); + } + + assertEquals("https://www.marginalia.nu/", sampleData.get("request")); + assertEquals("https://www.marginalia.nu/", sampleData.get("response")); + } +} \ No newline at end of file