diff --git a/code/process-models/processed-data/build.gradle b/code/process-models/processed-data/build.gradle new file mode 100644 index 00000000..5159bc8b --- /dev/null +++ b/code/process-models/processed-data/build.gradle @@ -0,0 +1,37 @@ +plugins { + id 'java' + id "io.freefair.lombok" version "8.2.2" + + id 'jvm-test-suite' +} + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(20)) + } +} +dependencies { + implementation libs.lombok + annotationProcessor libs.lombok + implementation libs.bundles.slf4j + + implementation project(':third-party:parquet-floor') + + implementation libs.notnull + implementation libs.trove + implementation libs.bundles.parquet + + testImplementation libs.bundles.slf4j.test + testImplementation libs.bundles.junit + testImplementation libs.mockito +} + +test { + useJUnitPlatform() +} + +task fastTests(type: Test) { + useJUnitPlatform { + excludeTags "slow" + } +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDocumentDataDehydrator.java b/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDocumentDataDehydrator.java new file mode 100644 index 00000000..8a615186 --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDocumentDataDehydrator.java @@ -0,0 +1,37 @@ +package nu.marginalia.codec.processed; + +import blue.strategic.parquet.Dehydrator; +import blue.strategic.parquet.ValueWriter; +import nu.marginalia.model.processed.ProcessedDocumentData; + +public class ProcessedDocumentDataDehydrator implements Dehydrator { + @Override + public void dehydrate(ProcessedDocumentData record, ValueWriter valueWriter) { + valueWriter.write("domain", record.domain); + valueWriter.write("url", record.url); + valueWriter.write("ordinal", record.ordinal); + valueWriter.write("state", record.state); + + if (record.stateReason != null) + valueWriter.write("stateReason", record.stateReason); + if (record.title != null) + valueWriter.write("title", record.title); + if (record.description != null) + valueWriter.write("description", record.description); + valueWriter.write("htmlFeatures", record.htmlFeatures); + valueWriter.write("htmlStandard", record.htmlStandard); + valueWriter.write("length", record.length); + valueWriter.write("hash", record.hash); + valueWriter.write("quality", record.quality); + if (record.pubYear != null) { + valueWriter.write("pubYear", record.pubYear); + } + + if (record.metas != null) { + valueWriter.writeList("wordMeta", record.metas); + } + if (record.words != null) { + valueWriter.writeList("word", record.words); + } + } +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDocumentDataHydrator.java b/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDocumentDataHydrator.java new file mode 100644 index 00000000..c04147fd --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDocumentDataHydrator.java @@ -0,0 +1,24 @@ +package nu.marginalia.codec.processed; + +import blue.strategic.parquet.Hydrator; +import nu.marginalia.model.processed.ProcessedDocumentData; +import nu.marginalia.model.processed.ProcessedDomainData; + +public class ProcessedDocumentDataHydrator implements Hydrator { + + @Override + public ProcessedDocumentData start() { + return new ProcessedDocumentData(); + } + + @Override + public ProcessedDocumentData add(ProcessedDocumentData target, String heading, Object value) { + return target.add(heading, value); + } + + @Override + public ProcessedDocumentData finish(ProcessedDocumentData target) { + return target; + } + +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDomainDataDehydrator.java b/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDomainDataDehydrator.java new file mode 100644 index 00000000..4a52a54c --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDomainDataDehydrator.java @@ -0,0 +1,26 @@ +package nu.marginalia.codec.processed; + +import blue.strategic.parquet.Dehydrator; +import blue.strategic.parquet.ValueWriter; +import nu.marginalia.model.processed.ProcessedDomainData; + +public class ProcessedDomainDataDehydrator implements Dehydrator { + + + @Override + public void dehydrate(ProcessedDomainData record, ValueWriter valueWriter) { + valueWriter.write("domain", record.domain); + valueWriter.write("knownUrls", record.knownUrls); + valueWriter.write("goodUrls", record.goodUrls); + valueWriter.write("visitedUrls", record.visitedUrls); + if (record.state != null) { + valueWriter.write("state", record.state); + } + if (record.redirectDomain != null) { + valueWriter.write("redirectDomain", record.redirectDomain); + } + if (record.ip != null) { + valueWriter.write("ip", record.ip); + } + } +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDomainDataDomainNameHydrator.java b/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDomainDataDomainNameHydrator.java new file mode 100644 index 00000000..945fad26 --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDomainDataDomainNameHydrator.java @@ -0,0 +1,26 @@ +package nu.marginalia.codec.processed; + +import blue.strategic.parquet.Hydrator; + + +public class ProcessedDomainDataDomainNameHydrator implements Hydrator { + + @Override + public String start() { + return ""; + } + + @Override + public String add(String target, String heading, Object value) { + if ("domain".equals(heading)) { + return (String) value; + } + return target; + } + + @Override + public String finish(String target) { + return target; + } + +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDomainDataHydrator.java b/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDomainDataHydrator.java new file mode 100644 index 00000000..aa9531a1 --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/codec/processed/ProcessedDomainDataHydrator.java @@ -0,0 +1,23 @@ +package nu.marginalia.codec.processed; + +import blue.strategic.parquet.Hydrator; +import nu.marginalia.model.processed.ProcessedDomainData; + +public class ProcessedDomainDataHydrator implements Hydrator { + + @Override + public ProcessedDomainData start() { + return new ProcessedDomainData(); + } + + @Override + public ProcessedDomainData add(ProcessedDomainData target, String heading, Object value) { + return target.add(heading, value); + } + + @Override + public ProcessedDomainData finish(ProcessedDomainData target) { + return target; + } + +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDocumentParquetFileReader.java b/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDocumentParquetFileReader.java new file mode 100644 index 00000000..ff82a197 --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDocumentParquetFileReader.java @@ -0,0 +1,20 @@ +package nu.marginalia.io.processed; + +import blue.strategic.parquet.HydratorSupplier; +import blue.strategic.parquet.ParquetReader; +import nu.marginalia.codec.processed.ProcessedDocumentDataHydrator; +import nu.marginalia.model.processed.ProcessedDocumentData; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.stream.Stream; + +public class ProcessedDocumentParquetFileReader { + + @NotNull + public static Stream stream(Path path) throws IOException { + return ParquetReader.streamContent(path.toFile(), + HydratorSupplier.constantly(new ProcessedDocumentDataHydrator())); + } +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDocumentParquetFileWriter.java b/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDocumentParquetFileWriter.java new file mode 100644 index 00000000..37f92a78 --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDocumentParquetFileWriter.java @@ -0,0 +1,25 @@ +package nu.marginalia.io.processed; + +import blue.strategic.parquet.ParquetWriter; +import nu.marginalia.codec.processed.ProcessedDocumentDataDehydrator; +import nu.marginalia.model.processed.ProcessedDocumentData; + +import java.io.IOException; +import java.nio.file.Path; + +public class ProcessedDocumentParquetFileWriter implements AutoCloseable { + private final ParquetWriter writer; + + public ProcessedDocumentParquetFileWriter(Path file) throws IOException { + writer = ParquetWriter.writeFile(ProcessedDocumentData.schema, + file.toFile(), new ProcessedDocumentDataDehydrator()); + } + + public void write(ProcessedDocumentData domainData) throws IOException { + writer.write(domainData); + } + + public void close() throws IOException { + writer.close(); + } +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDomainParquetFileReader.java b/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDomainParquetFileReader.java new file mode 100644 index 00000000..1324cfe1 --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDomainParquetFileReader.java @@ -0,0 +1,32 @@ +package nu.marginalia.io.processed; + +import blue.strategic.parquet.HydratorSupplier; +import blue.strategic.parquet.ParquetReader; +import nu.marginalia.codec.processed.ProcessedDomainDataDomainNameHydrator; +import nu.marginalia.codec.processed.ProcessedDomainDataHydrator; +import nu.marginalia.model.processed.ProcessedDomainData; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Stream; + +public class ProcessedDomainParquetFileReader { + + @NotNull + public static Stream stream(Path path) throws IOException { + return ParquetReader.streamContent(path.toFile(), + HydratorSupplier.constantly(new ProcessedDomainDataHydrator())); + } + + @NotNull + public static List getDomainNames(Path path) throws IOException { + return ParquetReader.streamContent(path.toFile(), + HydratorSupplier.constantly(new ProcessedDomainDataDomainNameHydrator()), + List.of("domain")) + .toList(); + } + + +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDomainParquetFileWriter.java b/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDomainParquetFileWriter.java new file mode 100644 index 00000000..862615a5 --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/io/processed/ProcessedDomainParquetFileWriter.java @@ -0,0 +1,25 @@ +package nu.marginalia.io.processed; + +import blue.strategic.parquet.ParquetWriter; +import nu.marginalia.codec.processed.ProcessedDomainDataDehydrator; +import nu.marginalia.model.processed.ProcessedDomainData; + +import java.io.IOException; +import java.nio.file.Path; + +public class ProcessedDomainParquetFileWriter implements AutoCloseable { + private final ParquetWriter writer; + + public ProcessedDomainParquetFileWriter(Path file) throws IOException { + writer = ParquetWriter.writeFile(ProcessedDomainData.schema, + file.toFile(), new ProcessedDomainDataDehydrator()); + } + + public void write(ProcessedDomainData domainData) throws IOException { + writer.write(domainData); + } + + public void close() throws IOException { + writer.close(); + } +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/ProcessedDataCodec.java b/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/ProcessedDataCodec.java new file mode 100644 index 00000000..54ed35c7 --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/ProcessedDataCodec.java @@ -0,0 +1,6 @@ +package nu.marginalia.model.processed; + + +public class ProcessedDataCodec { + +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/ProcessedDocumentData.java b/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/ProcessedDocumentData.java new file mode 100644 index 00000000..5d7fad8f --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/ProcessedDocumentData.java @@ -0,0 +1,105 @@ +package nu.marginalia.model.processed; + +import gnu.trove.list.TLongList; +import gnu.trove.list.array.TLongArrayList; +import lombok.*; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public class ProcessedDocumentData { + @NotNull + public String domain; + @NotNull + public String url; + + public int ordinal; + + @NotNull + public String state; + @Nullable + public String stateReason; + + @Nullable + public String title; + @Nullable + public String description; + public int htmlFeatures; + @Nullable + public String htmlStandard; + + public int length; + public long hash; + public float quality; + + @Nullable + public Integer pubYear; + + @Nullable + public List words; + @Nullable + public List metas; + + public static MessageType schema = new MessageType( + ProcessedDocumentData.class.getSimpleName(), + Types.required(BINARY).as(stringType()).named("domain"), + Types.required(BINARY).as(stringType()).named("url"), + Types.required(INT32).named("ordinal"), + Types.required(BINARY).as(stringType()).named("state"), + Types.optional(BINARY).as(stringType()).named("stateReason"), + Types.optional(BINARY).as(stringType()).named("title"), + Types.optional(BINARY).as(stringType()).named("description"), + Types.optional(INT32).named("htmlFeatures"), + Types.optional(BINARY).as(stringType()).named("htmlStandard"), + Types.optional(INT64).named("hash"), + Types.optional(INT32).named("length"), + Types.optional(FLOAT).named("quality"), + Types.optional(INT32).named("pubYear"), + Types.repeated(INT64).named("wordMeta"), + Types.repeated(BINARY).as(stringType()).named("word") + ); + + public ProcessedDocumentData add(String heading, Object value) { + switch (heading) { + case "domain" -> domain = (String) value; + case "url" -> url = (String) value; + case "ordinal" -> ordinal = (Integer) value; + case "htmlFeatures" -> htmlFeatures = (Integer) value; + case "length" -> length = (Integer) value; + case "pubYear" -> pubYear = (Integer) value; + case "hash" -> hash = (Long) value; + case "quality" -> quality = (Float) value; + case "state" -> state = (String) value; + case "stateReason" -> stateReason = (String) value; + case "title" -> title = (String) value; + case "description" -> description = (String) value; + case "htmlStandard" -> htmlStandard = (String) value; + case "word" -> { + if (this.words == null) + this.words = new ArrayList<>(100); + this.words.add((String) value); + } + case "wordMeta" -> { + if (this.metas == null) { + this.metas = new ArrayList<>(100); + } + this.metas.add((Long) value); + } + default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"'); + } + return this; + } +} diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/ProcessedDomainData.java b/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/ProcessedDomainData.java new file mode 100644 index 00000000..81b0241c --- /dev/null +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/ProcessedDomainData.java @@ -0,0 +1,55 @@ +package nu.marginalia.model.processed; + +import lombok.*; +import org.apache.parquet.schema.*; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.*; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode +public class ProcessedDomainData { + @NotNull + public String domain; + + public int knownUrls; + public int goodUrls; + public int visitedUrls; + + @Nullable + public String state; + @Nullable + public String redirectDomain; + @Nullable + public String ip; + + public static MessageType schema = new MessageType( + ProcessedDomainData.class.getSimpleName(), + Types.required(BINARY).as(stringType()).named("domain"), + Types.optional(INT32).named("knownUrls"), + Types.optional(INT32).named("visitedUrls"), + Types.optional(INT32).named("goodUrls"), + Types.required(BINARY).as(stringType()).named("state"), + Types.optional(BINARY).as(stringType()).named("redirectDomain"), + Types.optional(BINARY).as(stringType()).named("ip")); + + public ProcessedDomainData add(String heading, Object value) { + switch (heading) { + case "domain" -> domain = (String) value; + case "knownUrls" -> knownUrls = (Integer) value; + case "visitedUrls" -> visitedUrls = (Integer) value; + case "goodUrls" -> goodUrls = (Integer) value; + case "state" -> state = (String) value; + case "redirectDomain" -> redirectDomain = (String) value; + case "ip" -> ip = (String) value; + default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"'); + } + return this; + } +} diff --git a/code/process-models/processed-data/src/test/java/nu/marginalia/io/processed/ProcessedDocumentParquetFileReaderTest.java b/code/process-models/processed-data/src/test/java/nu/marginalia/io/processed/ProcessedDocumentParquetFileReaderTest.java new file mode 100644 index 00000000..5090a65c --- /dev/null +++ b/code/process-models/processed-data/src/test/java/nu/marginalia/io/processed/ProcessedDocumentParquetFileReaderTest.java @@ -0,0 +1,89 @@ +package nu.marginalia.io.processed; + +import nu.marginalia.model.processed.ProcessedDocumentData; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.*; + +class ProcessedDocumentParquetFileReaderTest { + Path parquetFile; + + @BeforeEach + public void setUp() throws IOException { + parquetFile = Files.createTempFile(getClass().getSimpleName(), ".parquet"); + } + + @AfterEach + public void tearDown() throws IOException { + Files.deleteIfExists(parquetFile); + } + + @Test + public void test() throws IOException { + var doc = new ProcessedDocumentData( + "www.marginalia.nu", + "https://www.marginalia.nu/", + 0, + "OK", + null, + "Itsa me, Marginalia!", + "Hello World", + 3, + "HTML5", + 123, + 0xF00BA3L, + 0.25f, + null, + List.of("Hello", "world"), + List.of(2L, 3L) + ); + + try (var writer = new ProcessedDocumentParquetFileWriter(parquetFile)) { + writer.write(doc); + } + + var read = ProcessedDocumentParquetFileReader.stream(parquetFile).toList(); + assertEquals(List.of(doc), read); + } + + @Test + public void testHugePayload() throws IOException { + List words = IntStream.range(0, 100000).mapToObj(Integer::toString).toList(); + List metas = LongStream.range(0, 100000).boxed().toList(); + + var doc = new ProcessedDocumentData( + "www.marginalia.nu", + "https://www.marginalia.nu/", + 0, + "OK", + null, + "Itsa me, Marginalia!", + "Hello World", + 3, + "HTML5", + 123, + 0xF00BA3L, + 0.25f, + null, + words, + metas + ); + + try (var writer = new ProcessedDocumentParquetFileWriter(parquetFile)) { + writer.write(doc); + } + + var read = ProcessedDocumentParquetFileReader.stream(parquetFile).toList(); + assertEquals(List.of(doc), read); + } + +} \ No newline at end of file diff --git a/code/process-models/processed-data/src/test/java/nu/marginalia/io/processed/ProcessedDomainParquetFileReaderTest.java b/code/process-models/processed-data/src/test/java/nu/marginalia/io/processed/ProcessedDomainParquetFileReaderTest.java new file mode 100644 index 00000000..5264624d --- /dev/null +++ b/code/process-models/processed-data/src/test/java/nu/marginalia/io/processed/ProcessedDomainParquetFileReaderTest.java @@ -0,0 +1,63 @@ +package nu.marginalia.io.processed; + +import nu.marginalia.model.processed.ProcessedDomainData; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class ProcessedDomainParquetFileReaderTest { + Path parquetFile; + + @BeforeEach + public void setUp() throws IOException { + parquetFile = Files.createTempFile(getClass().getSimpleName(), ".parquet"); + } + + @AfterEach + public void tearDown() throws IOException { + Files.deleteIfExists(parquetFile); + } + + @Test + public void testReadFull() throws IOException { + var first = new ProcessedDomainData( + "www.marginalia.nu", + 10, + 3, + 5, + "'sall good man", + null, + "127.0.0.1" + ); + var second = new ProcessedDomainData( + "memex.marginalia.nu", + 0, + 0, + 0, + "REDIRECT", + "www.marginalia.nu", + "127.0.0.1" + ); + + try (var writer = new ProcessedDomainParquetFileWriter(parquetFile)) { + writer.write(first); + writer.write(second); + } + + var domainNames = ProcessedDomainParquetFileReader.getDomainNames(parquetFile); + assertEquals(List.of("www.marginalia.nu", "memex.marginalia.nu"), domainNames); + + var items = ProcessedDomainParquetFileReader + .stream(parquetFile) + .toList(); + assertEquals(List.of(first, second), items); + } + +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index d2b54d41..af44349d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -66,6 +66,7 @@ include 'code:processes:test-data' include 'code:process-models:converting-model' include 'code:process-models:crawling-model' include 'code:process-models:work-log' +include 'code:process-models:processed-data' include 'code:tools:term-frequency-extractor' include 'code:tools:crawl-job-extractor'