diff --git a/code/common/service/java/nu/marginalia/process/log/WorkLog.java b/code/common/service/java/nu/marginalia/process/log/WorkLog.java index 9be31d17..1b5c41a6 100644 --- a/code/common/service/java/nu/marginalia/process/log/WorkLog.java +++ b/code/common/service/java/nu/marginalia/process/log/WorkLog.java @@ -10,7 +10,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.LocalDateTime; -import java.util.*; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; import java.util.function.Function; /** WorkLog is a journal of work done by a process, @@ -61,6 +63,12 @@ public class WorkLog implements AutoCloseable, Closeable { return new WorkLoadIterable<>(logFile, mapper); } + public static int countEntries(Path crawlerLog) throws IOException{ + try (var linesStream = Files.lines(crawlerLog)) { + return (int) linesStream.filter(WorkLogEntry::isJobId).count(); + } + } + // Use synchro over concurrent set to avoid competing writes // - correct is better than fast here, it's sketchy enough to use // a PrintWriter diff --git a/code/execution/java/nu/marginalia/actor/task/MigrateCrawlDataActor.java b/code/execution/java/nu/marginalia/actor/task/MigrateCrawlDataActor.java index 15e6ddb2..1ec3d6fa 100644 --- a/code/execution/java/nu/marginalia/actor/task/MigrateCrawlDataActor.java +++ b/code/execution/java/nu/marginalia/actor/task/MigrateCrawlDataActor.java @@ -8,6 +8,7 @@ import nu.marginalia.actor.state.ActorStep; import nu.marginalia.io.CrawlerOutputFile; import nu.marginalia.process.log.WorkLog; import nu.marginalia.process.log.WorkLogEntry; +import nu.marginalia.service.control.ServiceHeartbeat; import nu.marginalia.slop.SlopCrawlDataRecord; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorage; @@ -26,14 +27,15 @@ import java.util.function.Function; public class MigrateCrawlDataActor extends RecordActorPrototype { private final FileStorageService fileStorageService; - + private final ServiceHeartbeat serviceHeartbeat; private static final Logger logger = LoggerFactory.getLogger(MigrateCrawlDataActor.class); @Inject - public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService) { + public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService, ServiceHeartbeat serviceHeartbeat) { super(gson); this.fileStorageService = fileStorageService; + this.serviceHeartbeat = serviceHeartbeat; } public record Run(long fileStorageId) implements ActorStep {} @@ -49,28 +51,39 @@ public class MigrateCrawlDataActor extends RecordActorPrototype { Path crawlerLog = root.resolve("crawler.log"); Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log"); - try (WorkLog workLog = new WorkLog(newCrawlerLog)) { + int totalEntries = WorkLog.countEntries(crawlerLog); + + try (WorkLog workLog = new WorkLog(newCrawlerLog); + var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Migrating") + ) { + int entryIdx = 0; + for (Map.Entry item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) { var entry = item.getKey(); var path = item.getValue(); - logger.info("Converting {}", entry.id()); + heartbeat.progress("Migrating" + path.toFile().getName(), entryIdx++, totalEntries); + if (path.toFile().getName().endsWith(".parquet") && Files.exists(path)) { + try { + String domain = entry.id(); + String id = Integer.toHexString(domain.hashCode()); - if (path.toFile().getName().endsWith(".parquet")) { - String domain = entry.id(); - String id = Integer.toHexString(domain.hashCode()); + Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain); - Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain); + SlopCrawlDataRecord.convertFromParquet(path, outputFile); - SlopCrawlDataRecord.convertFromParquet(path, outputFile); - - workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt()); + workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt()); + } + catch (Exception ex) { + logger.error("Failed to convert " + path, ex); + } } else { workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt()); } + } } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/HttpFetcherImpl.java b/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/HttpFetcherImpl.java index 4b0d5158..af5b9f5a 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/HttpFetcherImpl.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/HttpFetcherImpl.java @@ -45,6 +45,7 @@ public class HttpFetcherImpl implements HttpFetcher { private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); private final Duration requestTimeout = Duration.ofSeconds(10); + private final Duration probeTimeout = Duration.ofSeconds(30); @Override public void setAllowAllContentTypes(boolean allowAllContentTypes) { @@ -107,23 +108,27 @@ public class HttpFetcherImpl implements HttpFetcher { .HEAD() .uri(url.asURI()) .header("User-agent", userAgentString) - .timeout(requestTimeout) + .timeout(probeTimeout) .build(); } catch (URISyntaxException e) { return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, "Invalid URL"); } - try { - var rsp = client.send(head, HttpResponse.BodyHandlers.discarding()); - EdgeUrl rspUri = new EdgeUrl(rsp.uri()); + for (int tries = 0;; tries++) { + try { + var rsp = client.send(head, HttpResponse.BodyHandlers.discarding()); + EdgeUrl rspUri = new EdgeUrl(rsp.uri()); - if (!Objects.equals(rspUri.domain, url.domain)) { - return new DomainProbeResult.Redirect(rspUri.domain); + if (!Objects.equals(rspUri.domain, url.domain)) { + return new DomainProbeResult.Redirect(rspUri.domain); + } + return new DomainProbeResult.Ok(rspUri); + } catch (Exception ex) { + if (tries > 3) { + return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, ex.getMessage()); + } + // else try again ... } - return new DomainProbeResult.Ok(rspUri); - } - catch (Exception ex) { - return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, ex.getMessage()); } } @@ -143,7 +148,7 @@ public class HttpFetcherImpl implements HttpFetcher { var headBuilder = HttpRequest.newBuilder() .HEAD() .uri(url.asURI()) - .header("User-agent", userAgentString) + .header("User-Agent", userAgentString) .header("Accept-Encoding", "gzip") .timeout(requestTimeout) ; @@ -215,7 +220,7 @@ public class HttpFetcherImpl implements HttpFetcher { var getBuilder = HttpRequest.newBuilder() .GET() .uri(url.asURI()) - .header("User-agent", userAgentString) + .header("User-Agent", userAgentString) .header("Accept-Encoding", "gzip") .header("Accept-Language", "en,*;q=0.5") .header("Accept", "text/html, application/xhtml+xml, text/*;q=0.8") @@ -307,7 +312,7 @@ public class HttpFetcherImpl implements HttpFetcher { .uri(sitemapUrl.asURI()) .header("Accept-Encoding", "gzip") .header("Accept", "text/*, */*;q=0.9") - .header("User-agent", userAgentString) + .header("User-Agent", userAgentString) .timeout(requestTimeout) .build(); @@ -386,7 +391,7 @@ public class HttpFetcherImpl implements HttpFetcher { .uri(url.asURI()) .header("Accept-Encoding", "gzip") .header("Accept", "text/*, */*;q=0.9") - .header("User-agent", userAgentString) + .header("User-Agent", userAgentString) .timeout(requestTimeout); HttpFetchResult result = recorder.fetch(client, getRequest.build());