From a52d78c8ee38057c87cc8284620d9693a45b5164 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 11 Sep 2023 14:07:52 +0200 Subject: [PATCH] (work-log) New batching work log --- code/process-models/work-log/build.gradle | 33 +++ .../marginalia/worklog/BatchingWorkLog.java | 35 +++ .../worklog/BatchingWorkLogImpl.java | 221 ++++++++++++++++++ .../worklog/BatchingWorkLogImplTest.java | 63 +++++ settings.gradle | 1 + 5 files changed, 353 insertions(+) create mode 100644 code/process-models/work-log/build.gradle create mode 100644 code/process-models/work-log/src/main/java/nu/marginalia/worklog/BatchingWorkLog.java create mode 100644 code/process-models/work-log/src/main/java/nu/marginalia/worklog/BatchingWorkLogImpl.java create mode 100644 code/process-models/work-log/src/test/java/nu/marginalia/worklog/BatchingWorkLogImplTest.java diff --git a/code/process-models/work-log/build.gradle b/code/process-models/work-log/build.gradle new file mode 100644 index 00000000..507621c8 --- /dev/null +++ b/code/process-models/work-log/build.gradle @@ -0,0 +1,33 @@ +plugins { + id 'java' + id "io.freefair.lombok" version "8.2.2" + + id 'jvm-test-suite' +} + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(20)) + } +} +dependencies { + implementation libs.lombok + annotationProcessor libs.lombok + implementation libs.bundles.slf4j + + implementation libs.notnull + + testImplementation libs.bundles.slf4j.test + testImplementation libs.bundles.junit + testImplementation libs.mockito +} + +test { + useJUnitPlatform() +} + +task fastTests(type: Test) { + useJUnitPlatform { + excludeTags "slow" + } +} diff --git a/code/process-models/work-log/src/main/java/nu/marginalia/worklog/BatchingWorkLog.java b/code/process-models/work-log/src/main/java/nu/marginalia/worklog/BatchingWorkLog.java new file mode 100644 index 00000000..9c6ba9bb --- /dev/null +++ b/code/process-models/work-log/src/main/java/nu/marginalia/worklog/BatchingWorkLog.java @@ -0,0 +1,35 @@ +package nu.marginalia.worklog; + +import java.io.IOException; + +/** The BatchingWorkLog is a work log for items of work performed in batches, + * where each batch needs to be finalized before the items it consists of can be + * considered done. This is needed when the data is serialized into a format such + * as Parquet, where disparate items go into the same file, and the writer needs to be + * properly closed before the file can be read. + */ +public interface BatchingWorkLog extends AutoCloseable { + + /** Returns true if logItem(id) has been run, + * and logFinishedBatch has been run after that. + */ + boolean isItemCommitted(String id); + + /** Returns true if logItem(id) has been run + * but not logFinishedBatch(). + *

+ * Unlike isItemCommitted(), this state is ephemeral and not + * retained if e.g. the process crashes and resumes. + * */ + boolean isItemInCurrentBatch(String id); + + /** Log additional item to the current batch */ + void logItem(String id) throws IOException; + + /** Mark the current batch as finished and increment + * the batch number counter + */ + void logFinishedBatch() throws IOException; + + int getBatchNumber(); +} diff --git a/code/process-models/work-log/src/main/java/nu/marginalia/worklog/BatchingWorkLogImpl.java b/code/process-models/work-log/src/main/java/nu/marginalia/worklog/BatchingWorkLogImpl.java new file mode 100644 index 00000000..b1538b10 --- /dev/null +++ b/code/process-models/work-log/src/main/java/nu/marginalia/worklog/BatchingWorkLogImpl.java @@ -0,0 +1,221 @@ +package nu.marginalia.worklog; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.Set; + +public class BatchingWorkLogImpl implements BatchingWorkLog { + private int batchNumber = 0; + private final Set currentBatchItems = new HashSet<>(1000); + private final Set commitedItems = new HashSet<>(10_000); + private final OutputStream writer; + + public BatchingWorkLogImpl(Path file) throws IOException { + if (Files.exists(file)) { + try (var linesStream = Files.lines(file)) { + linesStream.map(WorkLogItem::parse).forEach( + item -> item.replay(this) + ); + } + + writer = Files.newOutputStream(file, StandardOpenOption.APPEND); + writeLogEntry(new CommentLine("Log resumed on " + LocalDateTime.now())); + if (getCurrentBatchSize() > 0) { + writeLogEntry(new CrashMarker()); + } + } + else { + writer = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW); + writeLogEntry(new CommentLine("Log created on " + LocalDateTime.now())); + writeLogEntry(new CommentLine(" Format: ")); + writeLogEntry(new CommentLine(" " + AddItem.MARKER + " ID\tsignifies adding an item to the current batch")); + writeLogEntry(new CommentLine(" " + FinishBatch.MARKER + "\tsignifies finalizing the current batch and switching to the next")); + writeLogEntry(new CommentLine(" " + CrashMarker.MARKER + "\tdiscard contents from the current batch and start over, written after a crash")); + writeLogEntry(new CommentLine("Upon a crash, items that have re-process until their batch is finalized")); + } + + + } + + void writeLogEntry(WorkLogItem item) throws IOException { + item.write(this); + } + + void writeLine(String line) throws IOException { + writer.write(line.getBytes(StandardCharsets.UTF_8)); + writer.write('\n'); + writer.flush(); + } + + @Override + public boolean isItemCommitted(String id) { + return commitedItems.contains(id); + } + + @Override + public boolean isItemInCurrentBatch(String id) { + return currentBatchItems.contains(id); + } + @Override + public void logItem(String id) throws IOException { + writeLogEntry(new AddItem(id)); + } + + @Override + public void logFinishedBatch() throws IOException { + writeLogEntry(new FinishBatch()); + incrementBatch(); + } + + void incrementBatch() { + batchNumber++; + + // Transfer all items from the current batch to the committed items' batch + commitedItems.addAll(currentBatchItems); + currentBatchItems.clear(); + } + + void restartBatch() { + currentBatchItems.clear(); + } + + void addItemToCurrentBatch(String id) { + currentBatchItems.add(id); + } + + @Override + public void close() throws IOException { + writer.flush(); + writer.close(); + } + + @Override + public int getBatchNumber() { + return batchNumber; + } + + public int getCurrentBatchSize() { + return currentBatchItems.size(); + } +} + +interface WorkLogItem { + + void replay(BatchingWorkLogImpl bwl); + void write(BatchingWorkLogImpl bwl) throws IOException; + + static WorkLogItem parse(String line) { + if (line.isBlank()) + return new BlankLine(); + + var lineParts = LogLineParts.parse(line); + + return switch (lineParts.tag()) { + case CommentLine.MARKER -> new CommentLine(lineParts.arg()); + case AddItem.MARKER -> new AddItem(lineParts.arg()); + case FinishBatch.MARKER -> new FinishBatch(); + case CrashMarker.MARKER -> new CrashMarker(); + default -> throw new WorkLogParseException(line); + }; + } +} + +record LogLineParts(char tag, String arg) { + public static LogLineParts parse(String line) { + line = line.trim(); + + char tag = line.charAt(0); + String arg = line.substring(1).trim(); + + int commentIdx = arg.indexOf('#'); + if (commentIdx >= 0) arg = arg.substring(0, commentIdx).trim(); + + return new LogLineParts(tag, arg); + } +} + +record CommentLine(String comment) implements WorkLogItem { + final static char MARKER = '#'; + + @Override + public void replay(BatchingWorkLogImpl bwl) {} + + @Override + public void write(BatchingWorkLogImpl bwl) throws IOException { + bwl.writeLine(MARKER + " " + comment); + } +} +record BlankLine() implements WorkLogItem { + final static char MARKER = ' '; + + @Override + public void replay(BatchingWorkLogImpl bwl) {} + + @Override + public void write(BatchingWorkLogImpl bwl) throws IOException { + bwl.writeLine(MARKER + ""); + } +} + +record FinishBatch() implements WorkLogItem { + final static char MARKER = 'F'; + + @Override + public void replay(BatchingWorkLogImpl bwl) { + bwl.incrementBatch(); + } + + @Override + public void write(BatchingWorkLogImpl bwl) throws IOException { + bwl.writeLine("# " + LocalDateTime.now()); + bwl.writeLine("# finalizing batchNumber = " + bwl.getBatchNumber()); + bwl.writeLine(Character.toString(MARKER)); + } + + +} + +record CrashMarker() implements WorkLogItem { + final static char MARKER = 'X'; + + @Override + public void replay(BatchingWorkLogImpl bwl) { + bwl.restartBatch(); + } + + @Override + public void write(BatchingWorkLogImpl bwl) throws IOException { + bwl.writeLine("# " + LocalDateTime.now()); + bwl.writeLine("# discarding batchNumber = " + bwl.getBatchNumber()); + bwl.writeLine(Character.toString(MARKER)); + } + + +} +record AddItem(String id) implements WorkLogItem { + final static char MARKER = '+'; + + @Override + public void replay(BatchingWorkLogImpl bwl) { + bwl.addItemToCurrentBatch(id); + } + + @Override + public void write(BatchingWorkLogImpl bwl) throws IOException { + bwl.writeLine(MARKER + " " + id); + } +} + +class WorkLogParseException extends RuntimeException { + @Serial + private static final long serialVersionUID = -1238138989389021166L; + + public WorkLogParseException(String logLine) { + super("Failed to parse work log line: '" + logLine + "'"); + } +} \ No newline at end of file diff --git a/code/process-models/work-log/src/test/java/nu/marginalia/worklog/BatchingWorkLogImplTest.java b/code/process-models/work-log/src/test/java/nu/marginalia/worklog/BatchingWorkLogImplTest.java new file mode 100644 index 00000000..fa765fd2 --- /dev/null +++ b/code/process-models/work-log/src/test/java/nu/marginalia/worklog/BatchingWorkLogImplTest.java @@ -0,0 +1,63 @@ +package nu.marginalia.worklog; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.*; + +class BatchingWorkLogImplTest { + Path fileName; + + @BeforeEach + public void setUp() throws IOException { + fileName = Files.createTempFile(getClass().getSimpleName(), ".test"); + } + + @AfterEach + public void tearDown() throws IOException { + Files.deleteIfExists(fileName); + } + + @Test + public void testResumeOnEmptyFile() throws IOException { + Files.delete(fileName); + + try (var wl = new BatchingWorkLogImpl(fileName)) { + wl.logItem("1"); + wl.logItem("2"); + wl.logItem("3"); + wl.logFinishedBatch(); + wl.logItem("4"); + wl.logItem("5"); + wl.logFinishedBatch(); + wl.logItem("6"); + } + + try (var wl = new BatchingWorkLogImpl(fileName)) { + assertTrue(wl.isItemCommitted("1")); + assertTrue(wl.isItemCommitted("2")); + assertTrue(wl.isItemCommitted("3")); + assertTrue(wl.isItemCommitted("4")); + assertTrue(wl.isItemCommitted("5")); + assertFalse(wl.isItemCommitted("6")); + wl.logItem("7"); + wl.logFinishedBatch(); + } + try (var wl = new BatchingWorkLogImpl(fileName)) { + assertTrue(wl.isItemCommitted("1")); + assertTrue(wl.isItemCommitted("2")); + assertTrue(wl.isItemCommitted("3")); + assertTrue(wl.isItemCommitted("4")); + assertTrue(wl.isItemCommitted("5")); + assertFalse(wl.isItemCommitted("6")); + assertTrue(wl.isItemCommitted("7")); + } + + Files.readAllLines(fileName).forEach(System.out::println); + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 884160c9..d2b54d41 100644 --- a/settings.gradle +++ b/settings.gradle @@ -65,6 +65,7 @@ include 'code:processes:test-data' include 'code:process-models:converting-model' include 'code:process-models:crawling-model' +include 'code:process-models:work-log' include 'code:tools:term-frequency-extractor' include 'code:tools:crawl-job-extractor'