(work-log) New batching work log

This commit is contained in:
Viktor Lofgren 2023-09-11 14:07:52 +02:00
parent a00cabe223
commit a52d78c8ee
5 changed files with 353 additions and 0 deletions

View File

@ -0,0 +1,33 @@
plugins {
id 'java'
id "io.freefair.lombok" version "8.2.2"
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(20))
}
}
dependencies {
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation libs.notnull
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
useJUnitPlatform()
}
task fastTests(type: Test) {
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -0,0 +1,35 @@
package nu.marginalia.worklog;
import java.io.IOException;
/** The BatchingWorkLog is a work log for items of work performed in batches,
* where each batch needs to be finalized before the items it consists of can be
* considered done. This is needed when the data is serialized into a format such
* as Parquet, where disparate items go into the same file, and the writer needs to be
* properly closed before the file can be read.
*/
public interface BatchingWorkLog extends AutoCloseable {
/** Returns true if logItem(id) has been run,
* and logFinishedBatch has been run after that.
*/
boolean isItemCommitted(String id);
/** Returns true if logItem(id) has been run
* but not logFinishedBatch().
* <p/>
* Unlike isItemCommitted(), this state is ephemeral and not
* retained if e.g. the process crashes and resumes.
* */
boolean isItemInCurrentBatch(String id);
/** Log additional item to the current batch */
void logItem(String id) throws IOException;
/** Mark the current batch as finished and increment
* the batch number counter
*/
void logFinishedBatch() throws IOException;
int getBatchNumber();
}

View File

@ -0,0 +1,221 @@
package nu.marginalia.worklog;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.Set;
public class BatchingWorkLogImpl implements BatchingWorkLog {
private int batchNumber = 0;
private final Set<String> currentBatchItems = new HashSet<>(1000);
private final Set<String> commitedItems = new HashSet<>(10_000);
private final OutputStream writer;
public BatchingWorkLogImpl(Path file) throws IOException {
if (Files.exists(file)) {
try (var linesStream = Files.lines(file)) {
linesStream.map(WorkLogItem::parse).forEach(
item -> item.replay(this)
);
}
writer = Files.newOutputStream(file, StandardOpenOption.APPEND);
writeLogEntry(new CommentLine("Log resumed on " + LocalDateTime.now()));
if (getCurrentBatchSize() > 0) {
writeLogEntry(new CrashMarker());
}
}
else {
writer = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW);
writeLogEntry(new CommentLine("Log created on " + LocalDateTime.now()));
writeLogEntry(new CommentLine(" Format: "));
writeLogEntry(new CommentLine(" " + AddItem.MARKER + " ID\tsignifies adding an item to the current batch"));
writeLogEntry(new CommentLine(" " + FinishBatch.MARKER + "\tsignifies finalizing the current batch and switching to the next"));
writeLogEntry(new CommentLine(" " + CrashMarker.MARKER + "\tdiscard contents from the current batch and start over, written after a crash"));
writeLogEntry(new CommentLine("Upon a crash, items that have re-process until their batch is finalized"));
}
}
void writeLogEntry(WorkLogItem item) throws IOException {
item.write(this);
}
void writeLine(String line) throws IOException {
writer.write(line.getBytes(StandardCharsets.UTF_8));
writer.write('\n');
writer.flush();
}
@Override
public boolean isItemCommitted(String id) {
return commitedItems.contains(id);
}
@Override
public boolean isItemInCurrentBatch(String id) {
return currentBatchItems.contains(id);
}
@Override
public void logItem(String id) throws IOException {
writeLogEntry(new AddItem(id));
}
@Override
public void logFinishedBatch() throws IOException {
writeLogEntry(new FinishBatch());
incrementBatch();
}
void incrementBatch() {
batchNumber++;
// Transfer all items from the current batch to the committed items' batch
commitedItems.addAll(currentBatchItems);
currentBatchItems.clear();
}
void restartBatch() {
currentBatchItems.clear();
}
void addItemToCurrentBatch(String id) {
currentBatchItems.add(id);
}
@Override
public void close() throws IOException {
writer.flush();
writer.close();
}
@Override
public int getBatchNumber() {
return batchNumber;
}
public int getCurrentBatchSize() {
return currentBatchItems.size();
}
}
interface WorkLogItem {
void replay(BatchingWorkLogImpl bwl);
void write(BatchingWorkLogImpl bwl) throws IOException;
static WorkLogItem parse(String line) {
if (line.isBlank())
return new BlankLine();
var lineParts = LogLineParts.parse(line);
return switch (lineParts.tag()) {
case CommentLine.MARKER -> new CommentLine(lineParts.arg());
case AddItem.MARKER -> new AddItem(lineParts.arg());
case FinishBatch.MARKER -> new FinishBatch();
case CrashMarker.MARKER -> new CrashMarker();
default -> throw new WorkLogParseException(line);
};
}
}
record LogLineParts(char tag, String arg) {
public static LogLineParts parse(String line) {
line = line.trim();
char tag = line.charAt(0);
String arg = line.substring(1).trim();
int commentIdx = arg.indexOf('#');
if (commentIdx >= 0) arg = arg.substring(0, commentIdx).trim();
return new LogLineParts(tag, arg);
}
}
record CommentLine(String comment) implements WorkLogItem {
final static char MARKER = '#';
@Override
public void replay(BatchingWorkLogImpl bwl) {}
@Override
public void write(BatchingWorkLogImpl bwl) throws IOException {
bwl.writeLine(MARKER + " " + comment);
}
}
record BlankLine() implements WorkLogItem {
final static char MARKER = ' ';
@Override
public void replay(BatchingWorkLogImpl bwl) {}
@Override
public void write(BatchingWorkLogImpl bwl) throws IOException {
bwl.writeLine(MARKER + "");
}
}
record FinishBatch() implements WorkLogItem {
final static char MARKER = 'F';
@Override
public void replay(BatchingWorkLogImpl bwl) {
bwl.incrementBatch();
}
@Override
public void write(BatchingWorkLogImpl bwl) throws IOException {
bwl.writeLine("# " + LocalDateTime.now());
bwl.writeLine("# finalizing batchNumber = " + bwl.getBatchNumber());
bwl.writeLine(Character.toString(MARKER));
}
}
record CrashMarker() implements WorkLogItem {
final static char MARKER = 'X';
@Override
public void replay(BatchingWorkLogImpl bwl) {
bwl.restartBatch();
}
@Override
public void write(BatchingWorkLogImpl bwl) throws IOException {
bwl.writeLine("# " + LocalDateTime.now());
bwl.writeLine("# discarding batchNumber = " + bwl.getBatchNumber());
bwl.writeLine(Character.toString(MARKER));
}
}
record AddItem(String id) implements WorkLogItem {
final static char MARKER = '+';
@Override
public void replay(BatchingWorkLogImpl bwl) {
bwl.addItemToCurrentBatch(id);
}
@Override
public void write(BatchingWorkLogImpl bwl) throws IOException {
bwl.writeLine(MARKER + " " + id);
}
}
class WorkLogParseException extends RuntimeException {
@Serial
private static final long serialVersionUID = -1238138989389021166L;
public WorkLogParseException(String logLine) {
super("Failed to parse work log line: '" + logLine + "'");
}
}

View File

@ -0,0 +1,63 @@
package nu.marginalia.worklog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import static org.junit.jupiter.api.Assertions.*;
class BatchingWorkLogImplTest {
Path fileName;
@BeforeEach
public void setUp() throws IOException {
fileName = Files.createTempFile(getClass().getSimpleName(), ".test");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(fileName);
}
@Test
public void testResumeOnEmptyFile() throws IOException {
Files.delete(fileName);
try (var wl = new BatchingWorkLogImpl(fileName)) {
wl.logItem("1");
wl.logItem("2");
wl.logItem("3");
wl.logFinishedBatch();
wl.logItem("4");
wl.logItem("5");
wl.logFinishedBatch();
wl.logItem("6");
}
try (var wl = new BatchingWorkLogImpl(fileName)) {
assertTrue(wl.isItemCommitted("1"));
assertTrue(wl.isItemCommitted("2"));
assertTrue(wl.isItemCommitted("3"));
assertTrue(wl.isItemCommitted("4"));
assertTrue(wl.isItemCommitted("5"));
assertFalse(wl.isItemCommitted("6"));
wl.logItem("7");
wl.logFinishedBatch();
}
try (var wl = new BatchingWorkLogImpl(fileName)) {
assertTrue(wl.isItemCommitted("1"));
assertTrue(wl.isItemCommitted("2"));
assertTrue(wl.isItemCommitted("3"));
assertTrue(wl.isItemCommitted("4"));
assertTrue(wl.isItemCommitted("5"));
assertFalse(wl.isItemCommitted("6"));
assertTrue(wl.isItemCommitted("7"));
}
Files.readAllLines(fileName).forEach(System.out::println);
}
}

View File

@ -65,6 +65,7 @@ include 'code:processes:test-data'
include 'code:process-models:converting-model'
include 'code:process-models:crawling-model'
include 'code:process-models:work-log'
include 'code:tools:term-frequency-extractor'
include 'code:tools:crawl-job-extractor'