diff --git a/code/common/process/src/main/java/nu/marginalia/process/log/WorkLoadIterable.java b/code/common/process/src/main/java/nu/marginalia/process/log/WorkLoadIterable.java new file mode 100644 index 00000000..992c1991 --- /dev/null +++ b/code/common/process/src/main/java/nu/marginalia/process/log/WorkLoadIterable.java @@ -0,0 +1,52 @@ +package nu.marginalia.process.log; + +import lombok.SneakyThrows; +import org.jetbrains.annotations.NotNull; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.Optional; +import java.util.function.Function; + +class WorkLoadIterable implements Iterable { + + private final Path logFile; + private final Function> mapper; + + WorkLoadIterable(Path logFile, Function> mapper) { + this.logFile = logFile; + this.mapper = mapper; + } + + @NotNull + @Override + @SneakyThrows + public Iterator iterator() { + var stream = Files.lines(logFile); + return new Iterator<>() { + final Iterator iter = stream + .filter(WorkLogEntry::isJobId) + .map(WorkLogEntry::parse) + .map(mapper) + .filter(Optional::isPresent) + .map(Optional::get) + .iterator(); + + @Override + public boolean hasNext() { + if (iter.hasNext()) { + return true; + } else { + stream.close(); + return false; + } + } + + @Override + public T next() { + return iter.next(); + } + }; + } +} diff --git a/code/common/process/src/main/java/nu/marginalia/process/log/WorkLog.java b/code/common/process/src/main/java/nu/marginalia/process/log/WorkLog.java index db5b22a8..c552d8f6 100644 --- a/code/common/process/src/main/java/nu/marginalia/process/log/WorkLog.java +++ b/code/common/process/src/main/java/nu/marginalia/process/log/WorkLog.java @@ -1,20 +1,14 @@ package nu.marginalia.process.log; -import com.google.errorprone.annotations.MustBeClosed; -import org.apache.logging.log4j.util.Strings; - -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.LocalDateTime; -import java.util.HashSet; -import java.util.Set; -import java.util.function.Consumer; +import java.util.*; +import java.util.function.Function; import java.util.regex.Pattern; -import java.util.stream.Stream; public class WorkLog implements AutoCloseable { private final Set finishedJobs = new HashSet<>(); @@ -27,24 +21,22 @@ public class WorkLog implements AutoCloseable { writeLogEntry("# Starting WorkLog @ " + LocalDateTime.now()); } - public static void readLog(Path logFile, Consumer entryConsumer) throws FileNotFoundException { - if (!Files.exists(logFile)) { - throw new FileNotFoundException("Log file not found " + logFile); - } - - try (var entries = streamLog(logFile)) { - entries.forEach(entryConsumer); - } catch (IOException e) { - e.printStackTrace(); - } + /** Create an iterable over the work log + *
+ * Caveat: If the iterator is not iterated to the end, + * it will leak a file descriptor. + */ + public static Iterable iterable(Path logFile) { + return new WorkLoadIterable<>(logFile, Optional::of); } - @MustBeClosed - public static Stream streamLog(Path logFile) throws IOException { - return Files.lines(logFile).filter(WorkLog::isJobId).map(line -> { - String[] parts = line.split("\\s+"); - return new WorkLogEntry(parts[0], parts[1], parts[2], Integer.parseInt(parts[3])); - }); + /** Create an iterable over the work log, applying a mapping function to each item + *
+ * Caveat: If the iterator is not iterated to the end, + * it will leak a file descriptor. + */ + public static Iterable iterableMap(Path logFile, Function> mapper) { + return new WorkLoadIterable<>(logFile, mapper); } private void loadLog(Path logFile) throws IOException { @@ -53,14 +45,12 @@ public class WorkLog implements AutoCloseable { } try (var lines = Files.lines(logFile)) { - lines.filter(WorkLog::isJobId).map(this::getJobIdFromWrittenString).forEach(finishedJobs::add); + lines.filter(WorkLogEntry::isJobId) + .map(this::getJobIdFromWrittenString) + .forEach(finishedJobs::add); } } - private static boolean isJobId(String s) { - return Strings.isNotBlank(s) && !s.startsWith("#"); - } - private static final Pattern splitPattern = Pattern.compile("\\s+"); private String getJobIdFromWrittenString(String s) { diff --git a/code/common/process/src/main/java/nu/marginalia/process/log/WorkLogEntry.java b/code/common/process/src/main/java/nu/marginalia/process/log/WorkLogEntry.java index 9f9579f3..31b93610 100644 --- a/code/common/process/src/main/java/nu/marginalia/process/log/WorkLogEntry.java +++ b/code/common/process/src/main/java/nu/marginalia/process/log/WorkLogEntry.java @@ -1,4 +1,15 @@ package nu.marginalia.process.log; +import org.apache.logging.log4j.util.Strings; + public record WorkLogEntry(String id, String ts, String path, int cnt) { + + static WorkLogEntry parse(String line) { + String[] parts = line.split("\\s+"); + return new WorkLogEntry(parts[0], parts[1], parts[2], Integer.parseInt(parts[3])); + } + + static boolean isJobId(String line) { + return Strings.isNotBlank(line) && !line.startsWith("#"); + } } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlerSpecificationLoader.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlerSpecificationLoader.java index cf6fb1fb..2ea956d5 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlerSpecificationLoader.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlerSpecificationLoader.java @@ -3,26 +3,38 @@ package nu.marginalia.crawling.model.spec; import com.github.luben.zstd.ZstdInputStream; import com.google.gson.Gson; import com.google.gson.JsonStreamParser; +import lombok.SneakyThrows; import nu.marginalia.model.gson.GsonFactory; import java.io.BufferedReader; import java.io.FileInputStream; -import java.io.IOException; import java.io.InputStreamReader; import java.nio.file.Path; -import java.util.function.Consumer; +import java.util.Iterator; public class CrawlerSpecificationLoader { private final static Gson gson = GsonFactory.get(); - public static void readInputSpec(Path inputSpec, Consumer consumer) { - try (var inputStream = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(inputSpec.toFile()))))) { - var parser = new JsonStreamParser(inputStream); - while (parser.hasNext()) { - consumer.accept(gson.fromJson(parser.next(), CrawlingSpecification.class)); + @SneakyThrows + public static Iterable asIterable(Path inputSpec) { + var inputStream = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(inputSpec.toFile())))); + var parser = new JsonStreamParser(inputStream); + + return () -> new Iterator<>() { + @Override + @SneakyThrows + public boolean hasNext() { + if (!parser.hasNext()) { + inputStream.close(); + return false; + } + return true; } - } catch (IOException e) { - e.printStackTrace(); - } + + @Override + public CrawlingSpecification next() { + return gson.fromJson(parser.next(), CrawlingSpecification.class); + } + }; } } diff --git a/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java b/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java index a23cdede..b425e29b 100644 --- a/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java +++ b/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java @@ -78,100 +78,48 @@ public class CrawlPlan { return new WorkLog(process.getLogFile()); } - public void forEachCrawlingSpecification(Consumer consumer) { - CrawlerSpecificationLoader.readInputSpec(getJobSpec(), consumer); - } - - public void forEachCrawlingLogEntry(Consumer consumer) throws FileNotFoundException { - WorkLog.readLog(this.crawl.getLogFile(), consumer); - } - public void forEachProcessingLogEntry(Consumer consumer) throws FileNotFoundException { - WorkLog.readLog(this.process.getLogFile(), consumer); - } - - public void forEachCrawledDomain(Consumer consumer) { - final CrawledDomainReader reader = new CrawledDomainReader(); - - try (Stream entryStream = WorkLog.streamLog(crawl.getLogFile())) { - entryStream - .map(WorkLogEntry::path) - .map(this::getCrawledFilePath) - .map(reader::readOptionally) - .filter(Optional::isPresent) - .map(Optional::get) - .forEach(consumer); - } - catch (IOException ex) { - logger.warn("Failed to read domains", ex); - - throw new RuntimeException(ex); - } + public Iterable crawlingSpecificationIterable() { + return CrawlerSpecificationLoader.asIterable(getJobSpec()); } public int countCrawledDomains() { - try (Stream entryStream = WorkLog.streamLog(crawl.getLogFile())) { - return (int) entryStream - .map(WorkLogEntry::path) - .count(); - } - catch (IOException ex) { - return 0; + int count = 0; + for (var ignored : WorkLog.iterable(crawl.getLogFile())) { + count++; } + return count; } - public void forEachCrawledDomain(Predicate idReadPredicate, Consumer consumer) { + public Iterable domainsIterable() { final CrawledDomainReader reader = new CrawledDomainReader(); - try (Stream entryStream = WorkLog.streamLog(crawl.getLogFile())) { - entryStream - .filter(entry -> idReadPredicate.test(entry.id())) - .map(WorkLogEntry::path) - .map(this::getCrawledFilePath) - .filter(path -> { - if (!Files.exists(path)) { - logger.warn("File not found: {}", path); - return false; - } - return true; - }) - .map(reader::readOptionally) - .filter(Optional::isPresent) - .map(Optional::get) - .forEach(consumer); - } - catch (IOException ex) { - logger.error("Failed to read domains", ex); - - throw new RuntimeException(ex); - } - } - public DomainsIterable domainsIterable() throws IOException { - return new DomainsIterable(); + return WorkLog.iterableMap(crawl.getLogFile(), + entry -> { + var path = getCrawledFilePath(entry.path()); + if (!Files.exists(path)) { + logger.warn("File not found: {}", path); + return Optional.empty(); + } + return reader.readOptionally(path); + }); } - public class DomainsIterable implements Iterable, AutoCloseable { - private final Stream stream; + public Iterable domainsIterable(Predicate idPredicate) { + final CrawledDomainReader reader = new CrawledDomainReader(); - DomainsIterable() throws IOException { - final CrawledDomainReader reader = new CrawledDomainReader(); + return WorkLog.iterableMap(crawl.getLogFile(), + entry -> { + if (!idPredicate.test(entry.path())) { + return Optional.empty(); + } - stream = WorkLog.streamLog(crawl.getLogFile()) - .map(WorkLogEntry::path) - .map(CrawlPlan.this::getCrawledFilePath) - .map(reader::readOptionally) - .filter(Optional::isPresent) - .map(Optional::get); - } + var path = getCrawledFilePath(entry.path()); - @Override - public void close() { - stream.close(); - } - - @NotNull - @Override - public Iterator iterator() { - return stream.iterator(); - } + if (!Files.exists(path)) { + logger.warn("File not found: {}", path); + return Optional.empty(); + } + return reader.readOptionally(path); + }); } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index 8f49c853..16381cc2 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -98,8 +98,9 @@ public class ConverterMain { }; - - plan.forEachCrawledDomain(id -> !processLog.isJobFinished(id), pipe::accept); + for (var domain : plan.domainsIterable(id -> !processLog.isJobFinished(id))) { + pipe.accept(domain); + } pipe.join(); } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index 4c436ca3..a0a3f8b7 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -100,18 +100,19 @@ public class CrawlerMain implements AutoCloseable { public void run() throws InterruptedException { // First a validation run to ensure the file is all good to parse logger.info("Validating JSON"); - AtomicInteger countTotal = new AtomicInteger(); - AtomicInteger countProcessed = new AtomicInteger(); + int countTotal = 0; + int countProcessed = 0; - plan.forEachCrawlingSpecification(unused -> countTotal.incrementAndGet()); + for (var unused : plan.crawlingSpecificationIterable()) { + countTotal++; + } logger.info("Let's go"); - // TODO: Make this into an iterable instead so we can abort it - plan.forEachCrawlingSpecification((spec) -> { - heartbeat.setProgress(countProcessed.incrementAndGet() / (double) countTotal.get()); + for (var spec : plan.crawlingSpecificationIterable()) { + heartbeat.setProgress(countProcessed / (double) countTotal); startCrawlTask(spec); - }); + } } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java index c70573a6..30b84527 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java @@ -107,16 +107,20 @@ public class LoaderMain { var logFile = plan.process.getLogFile(); try { - AtomicInteger loadTotal = new AtomicInteger(); - WorkLog.readLog(logFile, entry -> loadTotal.incrementAndGet()); - LoaderMain.loadTotal = loadTotal.get(); + int loadTotal = 0; + int loaded = 0; - AtomicInteger loaded = new AtomicInteger(); - WorkLog.readLog(logFile, entry -> { - heartbeat.setProgress(loaded.incrementAndGet() / (double) loadTotal.get()); + for (var unused : WorkLog.iterable(logFile)) { + loadTotal++; + } + + LoaderMain.loadTotal = loadTotal; + + for (var entry : WorkLog.iterable(logFile)) { + heartbeat.setProgress(loaded++ / (double) loadTotal); load(plan, entry.path(), entry.cnt()); - }); + } running = false; processorThread.join(); diff --git a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java index 09a3cc71..4febc294 100644 --- a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java +++ b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java @@ -47,14 +47,13 @@ public class ExperimentRunnerMain { experiment.args(Arrays.copyOfRange(args, 2, args.length)); Map idToDomain = new HashMap<>(); - plan.forEachCrawlingSpecification(spec -> { + for (var spec : plan.crawlingSpecificationIterable()) { idToDomain.put(spec.id, spec.domain); - }); + } - plan.forEachCrawledDomain( - id -> experiment.isInterested(idToDomain.get(id)), - experiment::process - ); + for (var domain : plan.domainsIterable(id -> experiment.isInterested(idToDomain.get(id)))) { + experiment.process(domain); + } experiment.onFinish();