(processes) Remove forEach-constructs in favor of iterators.

This commit is contained in:
Viktor Lofgren 2023-07-12 17:47:36 +02:00
parent 7087ab5f07
commit 74caf9e38a
9 changed files with 161 additions and 143 deletions

View File

@ -0,0 +1,52 @@
package nu.marginalia.process.log;
import lombok.SneakyThrows;
import org.jetbrains.annotations.NotNull;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Optional;
import java.util.function.Function;
class WorkLoadIterable<T> implements Iterable<T> {
private final Path logFile;
private final Function<WorkLogEntry, Optional<T>> mapper;
WorkLoadIterable(Path logFile, Function<WorkLogEntry, Optional<T>> mapper) {
this.logFile = logFile;
this.mapper = mapper;
}
@NotNull
@Override
@SneakyThrows
public Iterator<T> iterator() {
var stream = Files.lines(logFile);
return new Iterator<>() {
final Iterator<T> iter = stream
.filter(WorkLogEntry::isJobId)
.map(WorkLogEntry::parse)
.map(mapper)
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();
@Override
public boolean hasNext() {
if (iter.hasNext()) {
return true;
} else {
stream.close();
return false;
}
}
@Override
public T next() {
return iter.next();
}
};
}
}

View File

@ -1,20 +1,14 @@
package nu.marginalia.process.log;
import com.google.errorprone.annotations.MustBeClosed;
import org.apache.logging.log4j.util.Strings;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Stream;
public class WorkLog implements AutoCloseable {
private final Set<String> finishedJobs = new HashSet<>();
@ -27,24 +21,22 @@ public class WorkLog implements AutoCloseable {
writeLogEntry("# Starting WorkLog @ " + LocalDateTime.now());
}
public static void readLog(Path logFile, Consumer<WorkLogEntry> entryConsumer) throws FileNotFoundException {
if (!Files.exists(logFile)) {
throw new FileNotFoundException("Log file not found " + logFile);
/** Create an iterable over the work log
* <br>
* <b>Caveat: </b> If the iterator is not iterated to the end,
* it will leak a file descriptor.
*/
public static Iterable<WorkLogEntry> iterable(Path logFile) {
return new WorkLoadIterable<>(logFile, Optional::of);
}
try (var entries = streamLog(logFile)) {
entries.forEach(entryConsumer);
} catch (IOException e) {
e.printStackTrace();
}
}
@MustBeClosed
public static Stream<WorkLogEntry> streamLog(Path logFile) throws IOException {
return Files.lines(logFile).filter(WorkLog::isJobId).map(line -> {
String[] parts = line.split("\\s+");
return new WorkLogEntry(parts[0], parts[1], parts[2], Integer.parseInt(parts[3]));
});
/** Create an iterable over the work log, applying a mapping function to each item
* <br>
* <b>Caveat: </b> If the iterator is not iterated to the end,
* it will leak a file descriptor.
*/
public static <T> Iterable<T> iterableMap(Path logFile, Function<WorkLogEntry, Optional<T>> mapper) {
return new WorkLoadIterable<>(logFile, mapper);
}
private void loadLog(Path logFile) throws IOException {
@ -53,14 +45,12 @@ public class WorkLog implements AutoCloseable {
}
try (var lines = Files.lines(logFile)) {
lines.filter(WorkLog::isJobId).map(this::getJobIdFromWrittenString).forEach(finishedJobs::add);
lines.filter(WorkLogEntry::isJobId)
.map(this::getJobIdFromWrittenString)
.forEach(finishedJobs::add);
}
}
private static boolean isJobId(String s) {
return Strings.isNotBlank(s) && !s.startsWith("#");
}
private static final Pattern splitPattern = Pattern.compile("\\s+");
private String getJobIdFromWrittenString(String s) {

View File

@ -1,4 +1,15 @@
package nu.marginalia.process.log;
import org.apache.logging.log4j.util.Strings;
public record WorkLogEntry(String id, String ts, String path, int cnt) {
static WorkLogEntry parse(String line) {
String[] parts = line.split("\\s+");
return new WorkLogEntry(parts[0], parts[1], parts[2], Integer.parseInt(parts[3]));
}
static boolean isJobId(String line) {
return Strings.isNotBlank(line) && !line.startsWith("#");
}
}

View File

@ -3,26 +3,38 @@ package nu.marginalia.crawling.model.spec;
import com.github.luben.zstd.ZstdInputStream;
import com.google.gson.Gson;
import com.google.gson.JsonStreamParser;
import lombok.SneakyThrows;
import nu.marginalia.model.gson.GsonFactory;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Path;
import java.util.function.Consumer;
import java.util.Iterator;
public class CrawlerSpecificationLoader {
private final static Gson gson = GsonFactory.get();
public static void readInputSpec(Path inputSpec, Consumer<CrawlingSpecification> consumer) {
try (var inputStream = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(inputSpec.toFile()))))) {
@SneakyThrows
public static Iterable<CrawlingSpecification> asIterable(Path inputSpec) {
var inputStream = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(inputSpec.toFile()))));
var parser = new JsonStreamParser(inputStream);
while (parser.hasNext()) {
consumer.accept(gson.fromJson(parser.next(), CrawlingSpecification.class));
return () -> new Iterator<>() {
@Override
@SneakyThrows
public boolean hasNext() {
if (!parser.hasNext()) {
inputStream.close();
return false;
}
} catch (IOException e) {
e.printStackTrace();
return true;
}
@Override
public CrawlingSpecification next() {
return gson.fromJson(parser.next(), CrawlingSpecification.class);
}
};
}
}

View File

@ -78,100 +78,48 @@ public class CrawlPlan {
return new WorkLog(process.getLogFile());
}
public void forEachCrawlingSpecification(Consumer<CrawlingSpecification> consumer) {
CrawlerSpecificationLoader.readInputSpec(getJobSpec(), consumer);
}
public void forEachCrawlingLogEntry(Consumer<WorkLogEntry> consumer) throws FileNotFoundException {
WorkLog.readLog(this.crawl.getLogFile(), consumer);
}
public void forEachProcessingLogEntry(Consumer<WorkLogEntry> consumer) throws FileNotFoundException {
WorkLog.readLog(this.process.getLogFile(), consumer);
}
public void forEachCrawledDomain(Consumer<CrawledDomain> consumer) {
final CrawledDomainReader reader = new CrawledDomainReader();
try (Stream<WorkLogEntry> entryStream = WorkLog.streamLog(crawl.getLogFile())) {
entryStream
.map(WorkLogEntry::path)
.map(this::getCrawledFilePath)
.map(reader::readOptionally)
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(consumer);
}
catch (IOException ex) {
logger.warn("Failed to read domains", ex);
throw new RuntimeException(ex);
}
public Iterable<CrawlingSpecification> crawlingSpecificationIterable() {
return CrawlerSpecificationLoader.asIterable(getJobSpec());
}
public int countCrawledDomains() {
try (Stream<WorkLogEntry> entryStream = WorkLog.streamLog(crawl.getLogFile())) {
return (int) entryStream
.map(WorkLogEntry::path)
.count();
}
catch (IOException ex) {
return 0;
int count = 0;
for (var ignored : WorkLog.iterable(crawl.getLogFile())) {
count++;
}
return count;
}
public void forEachCrawledDomain(Predicate<String> idReadPredicate, Consumer<CrawledDomain> consumer) {
public Iterable<CrawledDomain> domainsIterable() {
final CrawledDomainReader reader = new CrawledDomainReader();
try (Stream<WorkLogEntry> entryStream = WorkLog.streamLog(crawl.getLogFile())) {
entryStream
.filter(entry -> idReadPredicate.test(entry.id()))
.map(WorkLogEntry::path)
.map(this::getCrawledFilePath)
.filter(path -> {
return WorkLog.iterableMap(crawl.getLogFile(),
entry -> {
var path = getCrawledFilePath(entry.path());
if (!Files.exists(path)) {
logger.warn("File not found: {}", path);
return false;
return Optional.empty();
}
return true;
})
.map(reader::readOptionally)
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(consumer);
}
catch (IOException ex) {
logger.error("Failed to read domains", ex);
throw new RuntimeException(ex);
}
}
public DomainsIterable domainsIterable() throws IOException {
return new DomainsIterable();
return reader.readOptionally(path);
});
}
public class DomainsIterable implements Iterable<CrawledDomain>, AutoCloseable {
private final Stream<CrawledDomain> stream;
DomainsIterable() throws IOException {
public Iterable<CrawledDomain> domainsIterable(Predicate<String> idPredicate) {
final CrawledDomainReader reader = new CrawledDomainReader();
stream = WorkLog.streamLog(crawl.getLogFile())
.map(WorkLogEntry::path)
.map(CrawlPlan.this::getCrawledFilePath)
.map(reader::readOptionally)
.filter(Optional::isPresent)
.map(Optional::get);
return WorkLog.iterableMap(crawl.getLogFile(),
entry -> {
if (!idPredicate.test(entry.path())) {
return Optional.empty();
}
@Override
public void close() {
stream.close();
}
var path = getCrawledFilePath(entry.path());
@NotNull
@Override
public Iterator<CrawledDomain> iterator() {
return stream.iterator();
if (!Files.exists(path)) {
logger.warn("File not found: {}", path);
return Optional.empty();
}
return reader.readOptionally(path);
});
}
}

View File

@ -98,8 +98,9 @@ public class ConverterMain {
};
plan.forEachCrawledDomain(id -> !processLog.isJobFinished(id), pipe::accept);
for (var domain : plan.domainsIterable(id -> !processLog.isJobFinished(id))) {
pipe.accept(domain);
}
pipe.join();
}

View File

@ -100,18 +100,19 @@ public class CrawlerMain implements AutoCloseable {
public void run() throws InterruptedException {
// First a validation run to ensure the file is all good to parse
logger.info("Validating JSON");
AtomicInteger countTotal = new AtomicInteger();
AtomicInteger countProcessed = new AtomicInteger();
int countTotal = 0;
int countProcessed = 0;
plan.forEachCrawlingSpecification(unused -> countTotal.incrementAndGet());
for (var unused : plan.crawlingSpecificationIterable()) {
countTotal++;
}
logger.info("Let's go");
// TODO: Make this into an iterable instead so we can abort it
plan.forEachCrawlingSpecification((spec) -> {
heartbeat.setProgress(countProcessed.incrementAndGet() / (double) countTotal.get());
for (var spec : plan.crawlingSpecificationIterable()) {
heartbeat.setProgress(countProcessed / (double) countTotal);
startCrawlTask(spec);
});
}
}

View File

@ -107,16 +107,20 @@ public class LoaderMain {
var logFile = plan.process.getLogFile();
try {
AtomicInteger loadTotal = new AtomicInteger();
WorkLog.readLog(logFile, entry -> loadTotal.incrementAndGet());
LoaderMain.loadTotal = loadTotal.get();
int loadTotal = 0;
int loaded = 0;
AtomicInteger loaded = new AtomicInteger();
WorkLog.readLog(logFile, entry -> {
heartbeat.setProgress(loaded.incrementAndGet() / (double) loadTotal.get());
for (var unused : WorkLog.iterable(logFile)) {
loadTotal++;
}
LoaderMain.loadTotal = loadTotal;
for (var entry : WorkLog.iterable(logFile)) {
heartbeat.setProgress(loaded++ / (double) loadTotal);
load(plan, entry.path(), entry.cnt());
});
}
running = false;
processorThread.join();

View File

@ -47,14 +47,13 @@ public class ExperimentRunnerMain {
experiment.args(Arrays.copyOfRange(args, 2, args.length));
Map<String, String> idToDomain = new HashMap<>();
plan.forEachCrawlingSpecification(spec -> {
for (var spec : plan.crawlingSpecificationIterable()) {
idToDomain.put(spec.id, spec.domain);
});
}
plan.forEachCrawledDomain(
id -> experiment.isInterested(idToDomain.get(id)),
experiment::process
);
for (var domain : plan.domainsIterable(id -> experiment.isInterested(idToDomain.get(id)))) {
experiment.process(domain);
}
experiment.onFinish();