(crawler) Refactor boundary between CrawlerRetreiver and HttpFetcherImpl

This code is still a bit too complex, but it's slowly getting better.
This commit is contained in:
Viktor Lofgren 2024-09-24 15:08:15 +02:00
parent 10d8fc4fe7
commit 40512511af
11 changed files with 327 additions and 424 deletions

View File

@ -6,6 +6,7 @@ import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.body.HttpFetchResult;
import nu.marginalia.model.crawldata.CrawlerDomainStatus;
import java.util.List;
@ -16,7 +17,12 @@ public interface HttpFetcher {
List<String> getCookies();
void clearCookies();
HttpFetcherImpl.ProbeResult probeDomain(EdgeUrl url);
DomainProbeResult probeDomain(EdgeUrl url);
ContentTypeProbeResult probeContentType(
EdgeUrl url,
WarcRecorder recorder,
ContentTags tags) throws HttpFetcherImpl.RateLimitException;
HttpFetchResult fetchContent(EdgeUrl url,
WarcRecorder recorder,
@ -30,6 +36,26 @@ public interface HttpFetcher {
enum ProbeType {
DISABLED,
FULL,
IF_MODIFIED_SINCE
}
sealed interface DomainProbeResult {
record Error(CrawlerDomainStatus status, String desc) implements DomainProbeResult {}
/** This domain redirects to another domain */
record Redirect(EdgeDomain domain) implements DomainProbeResult {}
/** If the retrieval of the probed url was successful, return the url as it was fetched
* (which may be different from the url we probed, if we attempted another URL schema).
*
* @param probedUrl The url we successfully probed
*/
record Ok(EdgeUrl probedUrl) implements DomainProbeResult {}
}
sealed interface ContentTypeProbeResult {
record Ok(EdgeUrl resolvedUrl) implements ContentTypeProbeResult { }
record BadContentType(String contentType, int statusCode) implements ContentTypeProbeResult { }
record Timeout(java.lang.Exception ex) implements ContentTypeProbeResult { }
record Exception(java.lang.Exception ex) implements ContentTypeProbeResult { }
}
}

View File

@ -9,9 +9,6 @@ import nu.marginalia.crawl.fetcher.socket.FastTerminatingSocketFactory;
import nu.marginalia.crawl.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.crawl.fetcher.socket.NoSecuritySSL;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.logic.ContentTypeProber;
import nu.marginalia.crawl.logic.ContentTypeProber.ContentTypeProbeResult;
import nu.marginalia.crawl.logic.SoftIfModifiedSinceProber;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.body.ContentTypeLogic;
@ -26,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.X509TrustManager;
import java.io.InterruptedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
@ -41,10 +39,7 @@ public class HttpFetcherImpl implements HttpFetcher {
private final Cookies cookies = new Cookies();
private static final SimpleRobotRulesParser robotsParser = new SimpleRobotRulesParser();
private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
private final ContentTypeProber contentTypeProber;
private final SoftIfModifiedSinceProber softIfModifiedSinceProber;
@Override
public void setAllowAllContentTypes(boolean allowAllContentTypes) {
@ -95,18 +90,19 @@ public class HttpFetcherImpl implements HttpFetcher {
this.client = createClient(dispatcher, connectionPool);
this.userAgentString = userAgent.uaString();
this.userAgentIdentifier = userAgent.uaIdentifier();
this.contentTypeProber = new ContentTypeProber(userAgentString, client);
this.softIfModifiedSinceProber = new SoftIfModifiedSinceProber(userAgentString, client);
}
public HttpFetcherImpl(String userAgent) {
this.client = createClient(null, new ConnectionPool());
this.userAgentString = userAgent;
this.userAgentIdentifier = userAgent;
this.contentTypeProber = new ContentTypeProber(userAgent, client);
this.softIfModifiedSinceProber = new SoftIfModifiedSinceProber(userAgent, client);
}
// Not necessary in prod, but useful in test
public void close() {
client.dispatcher().executorService().shutdown();
client.connectionPool().evictAll();
}
/**
* Probe the domain to see if it is reachable, attempting to identify which schema to use,
* and if there are any redirects. This is done by one or more HEAD requests.
@ -116,7 +112,7 @@ public class HttpFetcherImpl implements HttpFetcher {
*/
@Override
@SneakyThrows
public ProbeResult probeDomain(EdgeUrl url) {
public DomainProbeResult probeDomain(EdgeUrl url) {
var head = new Request.Builder().head().addHeader("User-agent", userAgentString)
.url(url.toString())
.build();
@ -127,22 +123,89 @@ public class HttpFetcherImpl implements HttpFetcher {
EdgeUrl requestUrl = new EdgeUrl(rsp.request().url().toString());
if (!Objects.equals(requestUrl.domain, url.domain)) {
return new ProbeResultRedirect(requestUrl.domain);
return new DomainProbeResult.Redirect(requestUrl.domain);
}
return new ProbeResultOk(requestUrl);
return new DomainProbeResult.Ok(requestUrl);
}
catch (Exception ex) {
if (url.proto.equalsIgnoreCase("http") && "/".equals(url.path)) {
return probeDomain(new EdgeUrl("https", url.domain, url.port, url.path, url.param));
}
logger.info("Error during fetching {}", ex.getMessage());
return new ProbeResultError(CrawlerDomainStatus.ERROR, ex.getMessage());
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, ex.getMessage());
}
}
/** Perform a HEAD request to fetch the content type of a URL.
* If the content type is not allowed, flag the URL as a failed
* content type probe.
* <p></p>
* The outcome of the probe is returned, and the result is also
* recorded in the WARC file on failure.
*/
public ContentTypeProbeResult probeContentType(EdgeUrl url,
WarcRecorder warcRecorder,
ContentTags tags) throws RateLimitException {
if (tags.isEmpty()) {
var headBuilder = new Request.Builder().head()
.addHeader("User-agent", userAgentString)
.addHeader("Accept-Encoding", "gzip")
.url(url.toString());
var head = headBuilder.build();
var call = client.newCall(head);
try (var rsp = call.execute()) {
var contentTypeHeader = rsp.header("Content-type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
warcRecorder.flagAsFailedContentTypeProbe(url, contentTypeHeader, rsp.code());
return new ContentTypeProbeResult.BadContentType(contentTypeHeader, rsp.code());
}
// Update the URL to the final URL of the HEAD request, otherwise we might end up doing
// HEAD 301 url1 -> url2
// HEAD 200 url2
// GET 301 url1 -> url2
// GET 200 url2
// which is not what we want. Overall we want to do as few requests as possible to not raise
// too many eyebrows when looking at the logs on the target server. Overall it's probably desirable
// that it looks like the traffic makes sense, as opposed to looking like a broken bot.
var redirectUrl = new EdgeUrl(rsp.request().url().toString());
EdgeUrl ret;
if (Objects.equals(redirectUrl.domain, url.domain)) ret = redirectUrl;
else ret = url;
// Intercept rate limiting
if (rsp.code() == 429) {
throw new HttpFetcherImpl.RateLimitException(Objects.requireNonNullElse(rsp.header("Retry-After"), "1"));
}
return new ContentTypeProbeResult.Ok(ret);
}
catch (RateLimitException ex) {
throw ex;
}
catch (InterruptedIOException ex) {
warcRecorder.flagAsTimeout(url);
return new ContentTypeProbeResult.Timeout(ex);
} catch (Exception ex) {
logger.error("Error during fetching {}[{}]", ex.getClass().getSimpleName(), ex.getMessage());
warcRecorder.flagAsError(url, ex);
return new ContentTypeProbeResult.Exception(ex);
}
}
return new ContentTypeProbeResult.Ok(url);
}
/** Fetch the content of a URL, and record it in a WARC file,
* returning a result object that can be used to determine
* the outcome of the fetch.
*/
@Override
@SneakyThrows
public HttpFetchResult fetchContent(EdgeUrl url,
@ -150,40 +213,6 @@ public class HttpFetcherImpl implements HttpFetcher {
ContentTags contentTags,
ProbeType probeType)
{
// We don't want to waste time and resources on URLs that are not HTML, so if the file ending
// looks like it might be something else, we perform a HEAD first to check the content type
if (probeType == ProbeType.FULL && contentTags.isEmpty() && contentTypeLogic.isUrlLikeBinary(url))
{
ContentTypeProbeResult probeResult = contentTypeProber.probeContentType(url);
if (probeResult instanceof ContentTypeProbeResult.Ok ok) {
url = ok.resolvedUrl();
}
else if (probeResult instanceof ContentTypeProbeResult.BadContentType badContentType) {
warcRecorder.flagAsFailedContentTypeProbe(url, badContentType.contentType(), badContentType.statusCode());
return new HttpFetchResult.ResultNone();
}
else if (probeResult instanceof ContentTypeProbeResult.BadContentType.Timeout timeout) {
warcRecorder.flagAsTimeout(url);
return new HttpFetchResult.ResultException(timeout.ex());
}
else if (probeResult instanceof ContentTypeProbeResult.Exception exception) {
warcRecorder.flagAsError(url, exception.ex());
return new HttpFetchResult.ResultException(exception.ex());
}
}
else {
// Possibly do a soft probe to see if the URL has been modified since the last time we crawled it
// if we have reason to suspect ETags are not supported by the server.
if (probeType == ProbeType.IF_MODIFIED_SINCE
&& softIfModifiedSinceProber.probeModificationTime(url, contentTags))
{
return new HttpFetchResult.Result304Raw();
}
}
var getBuilder = new Request.Builder().get();
getBuilder.url(url.toString())
@ -211,22 +240,26 @@ public class HttpFetcherImpl implements HttpFetcher {
return result;
}
@Override
public SimpleRobotRules fetchRobotRules(EdgeDomain domain, WarcRecorder recorder) {
return fetchRobotsForProto("https", recorder, domain)
.or(() -> fetchRobotsForProto("http", recorder, domain))
.orElseGet(() -> new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL));
}
@Override
public SitemapRetriever createSitemapRetriever() {
return new SitemapRetriever();
}
private Optional<SimpleRobotRules> fetchRobotsForProto(String proto, WarcRecorder recorder, EdgeDomain domain) {
try {
var url = new EdgeUrl(proto, domain, null, "/robots.txt", null);
@Override
public SimpleRobotRules fetchRobotRules(EdgeDomain domain, WarcRecorder recorder) {
var ret = fetchAndParseRobotsTxt(new EdgeUrl("https", domain, null, "/robots.txt", null), recorder);
if (ret.isPresent())
return ret.get();
ret = fetchAndParseRobotsTxt(new EdgeUrl("http", domain, null, "/robots.txt", null), recorder);
if (ret.isPresent())
return ret.get();
return new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL);
}
private Optional<SimpleRobotRules> fetchAndParseRobotsTxt(EdgeUrl url, WarcRecorder recorder) {
try {
var getBuilder = new Request.Builder().get();
getBuilder.url(url.toString())
@ -242,7 +275,6 @@ public class HttpFetcherImpl implements HttpFetcher {
contentType.toString(),
userAgentIdentifier)
);
}
catch (Exception ex) {
return Optional.empty();
@ -250,26 +282,6 @@ public class HttpFetcherImpl implements HttpFetcher {
}
public sealed interface ProbeResult permits ProbeResultError, ProbeResultRedirect, ProbeResultOk {}
/** The probing failed for one reason or another
* @param status Machine readable status
* @param desc Human-readable description of the error
*/
public record ProbeResultError(CrawlerDomainStatus status, String desc) implements ProbeResult {}
/** This domain redirects to another domain */
public record ProbeResultRedirect(EdgeDomain domain) implements ProbeResult {}
/** If the retrieval of the probed url was successful, return the url as it was fetched
* (which may be different from the url we probed, if we attempted another URL schema).
*
* @param probedUrl The url we successfully probed
*/
public record ProbeResultOk(EdgeUrl probedUrl) implements ProbeResult {}
/** Exception thrown when the server signals the rate limit is exceeded */
public static class RateLimitException extends Exception {
private final String retryAfter;

View File

@ -267,7 +267,7 @@ public class WarcRecorder implements AutoCloseable {
saveOldResponse(url, contentType, statusCode, documentBody, headers, ctags);
}
public void writeWarcinfoHeader(String ip, EdgeDomain domain, HttpFetcherImpl.ProbeResult result) throws IOException {
public void writeWarcinfoHeader(String ip, EdgeDomain domain, HttpFetcherImpl.DomainProbeResult result) throws IOException {
Map<String, List<String>> fields = new HashMap<>();
fields.put("ip", List.of(ip));
@ -275,13 +275,13 @@ public class WarcRecorder implements AutoCloseable {
fields.put("domain", List.of(domain.toString()));
switch (result) {
case HttpFetcherImpl.ProbeResultRedirect redirectDomain:
case HttpFetcherImpl.DomainProbeResult.Redirect redirectDomain:
fields.put("X-WARC-Probe-Status", List.of("REDIRECT;" + redirectDomain.domain()));
break;
case HttpFetcherImpl.ProbeResultError error:
case HttpFetcherImpl.DomainProbeResult.Error error:
fields.put("X-WARC-Probe-Status", List.of(error.status().toString() + ";" + error.desc()));
break;
case HttpFetcherImpl.ProbeResultOk ok:
case HttpFetcherImpl.DomainProbeResult.Ok ok:
fields.put("X-WARC-Probe-Status", List.of("OK"));
break;
}

View File

@ -1,86 +0,0 @@
package nu.marginalia.crawl.logic;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.body.ContentTypeLogic;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InterruptedIOException;
import java.util.Objects;
public class ContentTypeProber {
private static final Logger logger = LoggerFactory.getLogger(ContentTypeProber.class);
private final String userAgentString;
private final OkHttpClient client;
private final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
public ContentTypeProber(String userAgentString, OkHttpClient httpClient) {
this.userAgentString = userAgentString;
this.client = httpClient;
}
/** Probe the content type of the given URL with a HEAD request.
* This is used to detect binary files, which we don't want to crawl.
* <p>
* If the URL redirects, the final URL is returned, to avoid redundant
* requests.
*
* @param url The URL to probe
* @return A ContentTypeProbeResult
*/
public ContentTypeProbeResult probeContentType(EdgeUrl url) {
logger.debug("Probing suspected binary {}", url);
var headBuilder = new Request.Builder().head()
.addHeader("User-agent", userAgentString)
.addHeader("Accept-Encoding", "gzip")
.url(url.toString());
var head = headBuilder.build();
var call = client.newCall(head);
try (var rsp = call.execute()) {
var contentTypeHeader = rsp.header("Content-type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
return new ContentTypeProbeResult.BadContentType(contentTypeHeader, rsp.code());
}
// Update the URL to the final URL of the HEAD request, otherwise we might end up doing
// HEAD 301 url1 -> url2
// HEAD 200 url2
// GET 301 url1 -> url2
// GET 200 url2
// which is not what we want. Overall we want to do as few requests as possible to not raise
// too many eyebrows when looking at the logs on the target server. Overall it's probably desirable
// that it looks like the traffic makes sense, as opposed to looking like a broken bot.
var redirectUrl = new EdgeUrl(rsp.request().url().toString());
EdgeUrl ret;
if (Objects.equals(redirectUrl.domain, url.domain)) ret = redirectUrl;
else ret = url;
return new ContentTypeProbeResult.Ok(ret);
} catch (InterruptedIOException ex) {
return new ContentTypeProbeResult.Timeout(ex);
} catch (Exception ex) {
logger.error("Error during fetching {}[{}]", ex.getClass().getSimpleName(), ex.getMessage());
return new ContentTypeProbeResult.Exception(ex);
}
}
public sealed interface ContentTypeProbeResult {
record Ok(EdgeUrl resolvedUrl) implements ContentTypeProbeResult { }
record BadContentType(String contentType, int statusCode) implements ContentTypeProbeResult { }
record Timeout(java.lang.Exception ex) implements ContentTypeProbeResult { }
record Exception(java.lang.Exception ex) implements ContentTypeProbeResult { }
}
}

View File

@ -1,54 +0,0 @@
package nu.marginalia.crawl.logic;
import com.google.common.base.Strings;
import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.model.EdgeUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Objects;
public class SoftIfModifiedSinceProber {
private final String userAgentString;
private final OkHttpClient client;
public SoftIfModifiedSinceProber(String userAgentString, OkHttpClient httpClient) {
this.userAgentString = userAgentString;
this.client = httpClient;
}
/** Implement a soft probe of the last modified time of the given URL with a HEAD request.
* This is used to detect if the URL has been modified since the last time we crawled it.
*/
public boolean probeModificationTime(EdgeUrl url, ContentTags tags) throws IOException {
var headBuilder = new Request.Builder().head()
.addHeader("User-agent", userAgentString)
.addHeader("Accept-Encoding", "gzip")
.url(url.toString());
// This logic is only applicable if we only have a last-modified time, but no ETag.
if (Strings.isNullOrEmpty(tags.lastMod()))
return false;
if (!Strings.isNullOrEmpty(tags.etag()))
return false;
var head = headBuilder.build();
var call = client.newCall(head);
try (var rsp = call.execute()) {
if (rsp.code() != 200) {
return false;
}
var contentTypeHeader = rsp.header("Last-Modified");
return Objects.equals(contentTypeHeader, tags.lastMod());
}
catch (SocketTimeoutException e) { // suppress timeout exceptions to reduce log noise
return false;
}
}
}

View File

@ -89,7 +89,16 @@ public class CrawlerRetreiver implements AutoCloseable {
public int crawlDomain(DomainLinks domainLinks, CrawlDataReference oldCrawlData) {
try {
return crawlDomain(oldCrawlData, domainLinks);
// Do an initial domain probe to determine the root URL
EdgeUrl rootUrl;
if (probeRootUrl() instanceof HttpFetcher.DomainProbeResult.Ok ok) rootUrl = ok.probedUrl();
else return 1;
// Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay
TimeUnit.SECONDS.sleep(1);
return crawlDomain(oldCrawlData, rootUrl, domainLinks);
}
catch (Exception ex) {
logger.error("Error crawling domain {}", domain, ex);
@ -97,16 +106,9 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
private int crawlDomain(CrawlDataReference oldCrawlData, DomainLinks domainLinks) throws IOException, InterruptedException {
String ip = findIp(domain);
EdgeUrl rootUrl;
if (probeRootUrl(ip) instanceof HttpFetcherImpl.ProbeResultOk ok) rootUrl = ok.probedUrl();
else return 1;
// Sleep after the initial probe, we don't have access to the robots.txt yet
// so we don't know the crawl delay
TimeUnit.SECONDS.sleep(1);
private int crawlDomain(CrawlDataReference oldCrawlData,
EdgeUrl rootUrl,
DomainLinks domainLinks) throws InterruptedException {
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(rootUrl.domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
@ -180,15 +182,23 @@ public class CrawlerRetreiver implements AutoCloseable {
resync.run(warcFile);
}
private HttpFetcherImpl.ProbeResult probeRootUrl(String ip) throws IOException {
private HttpFetcher.DomainProbeResult probeRootUrl() throws IOException {
// Construct an URL to the root of the domain, we don't know the schema yet so we'll
// start with http and then try https if that fails
var httpUrl = new EdgeUrl("http", new EdgeDomain(domain), null, "/", null);
final HttpFetcherImpl.ProbeResult probeResult = domainProber.probeDomain(fetcher, domain, httpUrl);
var httpUrl = new EdgeUrl("https", new EdgeDomain(domain), null, "/", null);
final HttpFetcher.DomainProbeResult domainProbeResult = domainProber.probeDomain(fetcher, domain, httpUrl);
warcRecorder.writeWarcinfoHeader(ip, new EdgeDomain(domain), probeResult);
String ip;
try {
ip = InetAddress.getByName(domain).getHostAddress();
} catch (UnknownHostException e) {
ip = "";
}
return probeResult;
// Write the domain probe result to the WARC file
warcRecorder.writeWarcinfoHeader(ip, new EdgeDomain(domain), domainProbeResult);
return domainProbeResult;
}
private void sniffRootDocument(EdgeUrl rootUrl, CrawlDelayTimer timer) {
@ -308,6 +318,40 @@ public class CrawlerRetreiver implements AutoCloseable {
CrawlDelayTimer timer,
HttpFetcher.ProbeType probeType,
ContentTags contentTags) throws InterruptedException {
long probeStart = System.currentTimeMillis();
if (probeType == HttpFetcher.ProbeType.FULL) {
for (int i = 0; i <= HTTP_429_RETRY_LIMIT; i++) {
try {
var probeResult = fetcher.probeContentType(url, warcRecorder, contentTags);
if (probeResult instanceof HttpFetcher.ContentTypeProbeResult.Ok ok) {
url = ok.resolvedUrl(); // If we were redirected while probing, use the final URL for fetching
break;
} else if (probeResult instanceof HttpFetcher.ContentTypeProbeResult.BadContentType badContentType) {
return new HttpFetchResult.ResultNone();
} else if (probeResult instanceof HttpFetcher.ContentTypeProbeResult.BadContentType.Timeout timeout) {
return new HttpFetchResult.ResultException(timeout.ex());
} else if (probeResult instanceof HttpFetcher.ContentTypeProbeResult.Exception exception) {
return new HttpFetchResult.ResultException(exception.ex());
}
else { // should be unreachable
throw new IllegalStateException("Unknown probe result");
}
}
catch (HttpFetcherImpl.RateLimitException ex) {
timer.waitRetryDelay(ex);
}
catch (Exception ex) {
logger.warn("Failed to fetch {}", url, ex);
return new HttpFetchResult.ResultException(ex);
}
}
}
timer.waitFetchDelay(System.currentTimeMillis() - probeStart);
for (int i = 0; i <= HTTP_429_RETRY_LIMIT; i++) {
try {
return fetcher.fetchContent(url, warcRecorder, contentTags, probeType);
@ -329,14 +373,6 @@ public class CrawlerRetreiver implements AutoCloseable {
|| proto.equalsIgnoreCase("https");
}
private String findIp(String domain) {
try {
return InetAddress.getByName(domain).getHostAddress();
} catch (UnknownHostException e) {
return "";
}
}
@Override
public void close() throws Exception {
warcRecorder.close();

View File

@ -3,7 +3,6 @@ package nu.marginalia.crawl.retreival;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.ip_blocklist.IpBlockList;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
@ -34,18 +33,37 @@ public class DomainProber {
* doesn't immediately redirect to another domain (which should be crawled separately, not under the name
* of this domain).
*/
public HttpFetcherImpl.ProbeResult probeDomain(HttpFetcher fetcher, String domain, @Nullable EdgeUrl firstUrlInQueue) {
public HttpFetcher.DomainProbeResult probeDomain(HttpFetcher fetcher, String domain, @Nullable EdgeUrl firstUrlInQueue) {
if (firstUrlInQueue == null) {
logger.warn("No valid URLs for domain {}", domain);
return new HttpFetcherImpl.ProbeResultError(CrawlerDomainStatus.ERROR, "No known URLs");
return new HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus.ERROR, "No known URLs");
}
if (!domainBlacklist.test(firstUrlInQueue.domain))
return new HttpFetcherImpl.ProbeResultError(CrawlerDomainStatus.BLOCKED, "IP not allowed");
return new HttpFetcher.DomainProbeResult.Error(CrawlerDomainStatus.BLOCKED, "IP not allowed");
return fetcher.probeDomain(firstUrlInQueue.withPathAndParam("/", null));
HttpFetcher.DomainProbeResult result;
result = fetcher.probeDomain(firstUrlInQueue.withPathAndParam("/", null));
// If the domain is not reachable over HTTPS, try HTTP
if (result instanceof HttpFetcher.DomainProbeResult.Error) {
if ("https".equalsIgnoreCase(firstUrlInQueue.proto)) {
firstUrlInQueue = new EdgeUrl(
"http",
firstUrlInQueue.domain,
firstUrlInQueue.port,
firstUrlInQueue.path,
firstUrlInQueue.param
);
result = fetcher.probeDomain(firstUrlInQueue.withPathAndParam("/", null));
}
}
return result;
}
}

View File

@ -1,124 +0,0 @@
package nu.marginalia.crawl.logic;
import com.sun.net.httpserver.HttpServer;
import nu.marginalia.model.EdgeUrl;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
class ContentTypeProberTest {
private static int port;
private static HttpServer server;
private static OkHttpClient client;
static EdgeUrl htmlEndpoint;
static EdgeUrl htmlRedirEndpoint;
static EdgeUrl binaryEndpoint;
static EdgeUrl timeoutEndpoint;
@BeforeEach
void setUp() throws IOException {
Random r = new Random();
port = r.nextInt(10000) + 8000;
server = HttpServer.create(new InetSocketAddress("127.0.0.1", port), 10);
server.createContext("/html", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, -1);
exchange.close();
});
server.createContext("/redir", exchange -> {
exchange.getResponseHeaders().add("Location", "/html");
exchange.sendResponseHeaders(301, -1);
exchange.close();
});
server.createContext("/bin", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "application/binary");
exchange.sendResponseHeaders(200, -1);
exchange.close();
});
server.createContext("/timeout", exchange -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
exchange.getResponseHeaders().add("Content-Type", "application/binary");
exchange.sendResponseHeaders(200, -1);
exchange.close();
});
server.start();
htmlEndpoint = EdgeUrl.parse("http://localhost:" + port + "/html").get();
binaryEndpoint = EdgeUrl.parse("http://localhost:" + port + "/bin").get();
timeoutEndpoint = EdgeUrl.parse("http://localhost:" + port + "/timeout").get();
htmlRedirEndpoint = EdgeUrl.parse("http://localhost:" + port + "/redir").get();
client = new OkHttpClient.Builder()
.readTimeout(1, java.util.concurrent.TimeUnit.SECONDS)
.connectTimeout(1, java.util.concurrent.TimeUnit.SECONDS)
.callTimeout(1, java.util.concurrent.TimeUnit.SECONDS)
.writeTimeout(1, java.util.concurrent.TimeUnit.SECONDS)
.build();
}
@AfterEach
void tearDown() {
server.stop(0);
client.dispatcher().executorService().shutdown();
client.connectionPool().evictAll();
}
@Test
void probeContentTypeOk() {
ContentTypeProber.ContentTypeProbeResult result = new ContentTypeProber("test", client)
.probeContentType(htmlEndpoint);
System.out.println(result);
assertEquals(result, new ContentTypeProber.ContentTypeProbeResult.Ok(htmlEndpoint));
}
@Test
void probeContentTypeRedir() {
ContentTypeProber.ContentTypeProbeResult result = new ContentTypeProber("test", client)
.probeContentType(htmlRedirEndpoint);
System.out.println(result);
assertEquals(result, new ContentTypeProber.ContentTypeProbeResult.Ok(htmlEndpoint));
}
@Test
void probeContentTypeBad() {
ContentTypeProber.ContentTypeProbeResult result = new ContentTypeProber("test", client)
.probeContentType(binaryEndpoint);
System.out.println(result);
assertInstanceOf(ContentTypeProber.ContentTypeProbeResult.BadContentType.class, result);
}
@Test
void probeContentTypeTimeout() {
ContentTypeProber.ContentTypeProbeResult result = new ContentTypeProber("test", client)
.probeContentType(timeoutEndpoint);
System.out.println(result);
assertInstanceOf(ContentTypeProber.ContentTypeProbeResult.Timeout.class, result);
}
}

View File

@ -1,60 +1,129 @@
package nu.marginalia.crawl.retreival.fetcher;
import nu.marginalia.crawl.logic.ContentTypeProber;
import nu.marginalia.crawl.logic.ContentTypeProber.ContentTypeProbeResult.BadContentType;
import nu.marginalia.crawl.logic.ContentTypeProber.ContentTypeProbeResult.Ok;
import com.sun.net.httpserver.HttpServer;
import nu.marginalia.crawl.fetcher.ContentTags;
import nu.marginalia.crawl.fetcher.HttpFetcher;
import nu.marginalia.crawl.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
import nu.marginalia.model.EdgeUrl;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.net.URISyntaxException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
class ContentTypeProberTest {
ContentTypeProber prober;
private static int port;
private static HttpServer server;
private static HttpFetcherImpl fetcher;
static EdgeUrl htmlEndpoint;
static EdgeUrl htmlRedirEndpoint;
static EdgeUrl binaryEndpoint;
static EdgeUrl timeoutEndpoint;
static Path warcFile;
static WarcRecorder recorder;
@BeforeEach
void setUp() {
OkHttpClient client = new OkHttpClient.Builder()
.dispatcher(new Dispatcher(Executors.newVirtualThreadPerTaskExecutor()))
.connectionPool(new ConnectionPool(0, 1, TimeUnit.NANOSECONDS))
.build();
void setUp() throws IOException {
prober = new ContentTypeProber("test.marginalia.nu", client);
warcFile = Files.createTempFile("test", ".warc");
Random r = new Random();
port = r.nextInt(10000) + 8000;
server = HttpServer.create(new InetSocketAddress("127.0.0.1", port), 10);
server.createContext("/html", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/html");
exchange.sendResponseHeaders(200, -1);
exchange.close();
});
server.createContext("/redir", exchange -> {
exchange.getResponseHeaders().add("Location", "/html");
exchange.sendResponseHeaders(301, -1);
exchange.close();
});
server.createContext("/bin", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "application/binary");
exchange.sendResponseHeaders(200, -1);
exchange.close();
});
server.createContext("/timeout", exchange -> {
try {
Thread.sleep(15_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
exchange.getResponseHeaders().add("Content-Type", "application/binary");
exchange.sendResponseHeaders(200, -1);
exchange.close();
});
server.start();
htmlEndpoint = EdgeUrl.parse("http://localhost:" + port + "/html").get();
binaryEndpoint = EdgeUrl.parse("http://localhost:" + port + "/bin").get();
timeoutEndpoint = EdgeUrl.parse("http://localhost:" + port + "/timeout").get();
htmlRedirEndpoint = EdgeUrl.parse("http://localhost:" + port + "/redir").get();
fetcher = new HttpFetcherImpl("test");
recorder = new WarcRecorder(warcFile);
}
@AfterEach
void tearDown() throws IOException {
server.stop(0);
fetcher.close();
recorder.close();
Files.deleteIfExists(warcFile);
}
@Test
void probeContentType() throws URISyntaxException {
assertEquals(
new Ok(new EdgeUrl("https://www.marginalia.nu/robots.txt")),
prober.probeContentType(new EdgeUrl("https://www.marginalia.nu/robots.txt")),
"robots.txt is expected to pass the probing test since it's text/plain"
);
void probeContentTypeOk() throws Exception {
HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(htmlEndpoint, recorder, ContentTags.empty());
assertEquals(
new BadContentType("image/png", 200),
prober.probeContentType(new EdgeUrl("https://www.marginalia.nu/sanic.png")),
"sanic.png is expected to pass the probing test since it's image/png"
);
System.out.println(result);
assertEquals(
new Ok(new EdgeUrl("https://www.marginalia.nu/dev/null")),
prober.probeContentType(new EdgeUrl("https://www.marginalia.nu/dev/null")),
"Despite being a 404, we expect this to be passed as OK as it's NotMyJob(TM) to verify response codes"
);
assertEquals(result, new HttpFetcher.ContentTypeProbeResult.Ok(htmlEndpoint));
}
assertEquals(
new Ok(new EdgeUrl("https://www.marginalia.nu/projects/edge/about.gmi/")),
prober.probeContentType(new EdgeUrl("https://www.marginalia.nu/projects/edge/about.gmi")),
"about.gmi is expected to give a redirect to about.gmi/ which is served as text/html"
);
@Test
void probeContentTypeRedir() throws Exception {
HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(htmlRedirEndpoint, recorder, ContentTags.empty());
System.out.println(result);
assertEquals(result, new HttpFetcher.ContentTypeProbeResult.Ok(htmlEndpoint));
}
@Test
void probeContentTypeBad() throws Exception {
HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(binaryEndpoint, recorder, ContentTags.empty());
System.out.println(result);
assertInstanceOf(HttpFetcher.ContentTypeProbeResult.BadContentType.class, result);
}
@Test
void probeContentTypeTimeout() throws Exception {
HttpFetcher.ContentTypeProbeResult result = fetcher.probeContentType(timeoutEndpoint, recorder, ContentTags.empty());
System.out.println(result);
assertInstanceOf(HttpFetcher.ContentTypeProbeResult.Timeout.class, result);
}
}

View File

@ -118,9 +118,15 @@ public class CrawlerMockFetcherTest {
public void clearCookies() {}
@Override
public HttpFetcherImpl.ProbeResult probeDomain(EdgeUrl url) {
public HttpFetcherImpl.DomainProbeResult probeDomain(EdgeUrl url) {
logger.info("Probing {}", url);
return new HttpFetcherImpl.ProbeResultOk(url);
return new HttpFetcher.DomainProbeResult.Ok(url);
}
@Override
public ContentTypeProbeResult probeContentType(EdgeUrl url, WarcRecorder recorder, ContentTags tags) {
logger.info("Probing {}", url);
return new HttpFetcher.ContentTypeProbeResult.Ok(url);
}
@SneakyThrows

View File

@ -120,7 +120,7 @@ public class IntegrationTest {
/** CREATE WARC */
try (WarcRecorder warcRecorder = new WarcRecorder(warcData)) {
warcRecorder.writeWarcinfoHeader("127.0.0.1", new EdgeDomain("www.example.com"),
new HttpFetcherImpl.ProbeResultOk(new EdgeUrl("https://www.example.com/")));
new HttpFetcherImpl.DomainProbeResult.Ok(new EdgeUrl("https://www.example.com/")));
warcRecorder.writeReferenceCopy(new EdgeUrl("https://www.example.com/"),
"text/html", 200,