(processed-data) New parquet-serializable models for converter output

This commit is contained in:
Viktor Lofgren 2023-09-11 14:08:40 +02:00
parent a52d78c8ee
commit 064bc5ee76
16 changed files with 594 additions and 0 deletions

View File

@ -0,0 +1,37 @@
plugins {
id 'java'
id "io.freefair.lombok" version "8.2.2"
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(20))
}
}
dependencies {
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation project(':third-party:parquet-floor')
implementation libs.notnull
implementation libs.trove
implementation libs.bundles.parquet
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
useJUnitPlatform()
}
task fastTests(type: Test) {
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -0,0 +1,37 @@
package nu.marginalia.codec.processed;
import blue.strategic.parquet.Dehydrator;
import blue.strategic.parquet.ValueWriter;
import nu.marginalia.model.processed.ProcessedDocumentData;
public class ProcessedDocumentDataDehydrator implements Dehydrator<ProcessedDocumentData> {
@Override
public void dehydrate(ProcessedDocumentData record, ValueWriter valueWriter) {
valueWriter.write("domain", record.domain);
valueWriter.write("url", record.url);
valueWriter.write("ordinal", record.ordinal);
valueWriter.write("state", record.state);
if (record.stateReason != null)
valueWriter.write("stateReason", record.stateReason);
if (record.title != null)
valueWriter.write("title", record.title);
if (record.description != null)
valueWriter.write("description", record.description);
valueWriter.write("htmlFeatures", record.htmlFeatures);
valueWriter.write("htmlStandard", record.htmlStandard);
valueWriter.write("length", record.length);
valueWriter.write("hash", record.hash);
valueWriter.write("quality", record.quality);
if (record.pubYear != null) {
valueWriter.write("pubYear", record.pubYear);
}
if (record.metas != null) {
valueWriter.writeList("wordMeta", record.metas);
}
if (record.words != null) {
valueWriter.writeList("word", record.words);
}
}
}

View File

@ -0,0 +1,24 @@
package nu.marginalia.codec.processed;
import blue.strategic.parquet.Hydrator;
import nu.marginalia.model.processed.ProcessedDocumentData;
import nu.marginalia.model.processed.ProcessedDomainData;
public class ProcessedDocumentDataHydrator implements Hydrator<ProcessedDocumentData, ProcessedDocumentData> {
@Override
public ProcessedDocumentData start() {
return new ProcessedDocumentData();
}
@Override
public ProcessedDocumentData add(ProcessedDocumentData target, String heading, Object value) {
return target.add(heading, value);
}
@Override
public ProcessedDocumentData finish(ProcessedDocumentData target) {
return target;
}
}

View File

@ -0,0 +1,26 @@
package nu.marginalia.codec.processed;
import blue.strategic.parquet.Dehydrator;
import blue.strategic.parquet.ValueWriter;
import nu.marginalia.model.processed.ProcessedDomainData;
public class ProcessedDomainDataDehydrator implements Dehydrator<ProcessedDomainData> {
@Override
public void dehydrate(ProcessedDomainData record, ValueWriter valueWriter) {
valueWriter.write("domain", record.domain);
valueWriter.write("knownUrls", record.knownUrls);
valueWriter.write("goodUrls", record.goodUrls);
valueWriter.write("visitedUrls", record.visitedUrls);
if (record.state != null) {
valueWriter.write("state", record.state);
}
if (record.redirectDomain != null) {
valueWriter.write("redirectDomain", record.redirectDomain);
}
if (record.ip != null) {
valueWriter.write("ip", record.ip);
}
}
}

View File

@ -0,0 +1,26 @@
package nu.marginalia.codec.processed;
import blue.strategic.parquet.Hydrator;
public class ProcessedDomainDataDomainNameHydrator implements Hydrator<String, String> {
@Override
public String start() {
return "";
}
@Override
public String add(String target, String heading, Object value) {
if ("domain".equals(heading)) {
return (String) value;
}
return target;
}
@Override
public String finish(String target) {
return target;
}
}

View File

@ -0,0 +1,23 @@
package nu.marginalia.codec.processed;
import blue.strategic.parquet.Hydrator;
import nu.marginalia.model.processed.ProcessedDomainData;
public class ProcessedDomainDataHydrator implements Hydrator<ProcessedDomainData, ProcessedDomainData> {
@Override
public ProcessedDomainData start() {
return new ProcessedDomainData();
}
@Override
public ProcessedDomainData add(ProcessedDomainData target, String heading, Object value) {
return target.add(heading, value);
}
@Override
public ProcessedDomainData finish(ProcessedDomainData target) {
return target;
}
}

View File

@ -0,0 +1,20 @@
package nu.marginalia.io.processed;
import blue.strategic.parquet.HydratorSupplier;
import blue.strategic.parquet.ParquetReader;
import nu.marginalia.codec.processed.ProcessedDocumentDataHydrator;
import nu.marginalia.model.processed.ProcessedDocumentData;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.nio.file.Path;
import java.util.stream.Stream;
public class ProcessedDocumentParquetFileReader {
@NotNull
public static Stream<ProcessedDocumentData> stream(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(new ProcessedDocumentDataHydrator()));
}
}

View File

@ -0,0 +1,25 @@
package nu.marginalia.io.processed;
import blue.strategic.parquet.ParquetWriter;
import nu.marginalia.codec.processed.ProcessedDocumentDataDehydrator;
import nu.marginalia.model.processed.ProcessedDocumentData;
import java.io.IOException;
import java.nio.file.Path;
public class ProcessedDocumentParquetFileWriter implements AutoCloseable {
private final ParquetWriter<ProcessedDocumentData> writer;
public ProcessedDocumentParquetFileWriter(Path file) throws IOException {
writer = ParquetWriter.writeFile(ProcessedDocumentData.schema,
file.toFile(), new ProcessedDocumentDataDehydrator());
}
public void write(ProcessedDocumentData domainData) throws IOException {
writer.write(domainData);
}
public void close() throws IOException {
writer.close();
}
}

View File

@ -0,0 +1,32 @@
package nu.marginalia.io.processed;
import blue.strategic.parquet.HydratorSupplier;
import blue.strategic.parquet.ParquetReader;
import nu.marginalia.codec.processed.ProcessedDomainDataDomainNameHydrator;
import nu.marginalia.codec.processed.ProcessedDomainDataHydrator;
import nu.marginalia.model.processed.ProcessedDomainData;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Stream;
public class ProcessedDomainParquetFileReader {
@NotNull
public static Stream<ProcessedDomainData> stream(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(new ProcessedDomainDataHydrator()));
}
@NotNull
public static List<String> getDomainNames(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(new ProcessedDomainDataDomainNameHydrator()),
List.of("domain"))
.toList();
}
}

View File

@ -0,0 +1,25 @@
package nu.marginalia.io.processed;
import blue.strategic.parquet.ParquetWriter;
import nu.marginalia.codec.processed.ProcessedDomainDataDehydrator;
import nu.marginalia.model.processed.ProcessedDomainData;
import java.io.IOException;
import java.nio.file.Path;
public class ProcessedDomainParquetFileWriter implements AutoCloseable {
private final ParquetWriter<ProcessedDomainData> writer;
public ProcessedDomainParquetFileWriter(Path file) throws IOException {
writer = ParquetWriter.writeFile(ProcessedDomainData.schema,
file.toFile(), new ProcessedDomainDataDehydrator());
}
public void write(ProcessedDomainData domainData) throws IOException {
writer.write(domainData);
}
public void close() throws IOException {
writer.close();
}
}

View File

@ -0,0 +1,6 @@
package nu.marginalia.model.processed;
public class ProcessedDataCodec {
}

View File

@ -0,0 +1,105 @@
package nu.marginalia.model.processed;
import gnu.trove.list.TLongList;
import gnu.trove.list.array.TLongArrayList;
import lombok.*;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.List;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class ProcessedDocumentData {
@NotNull
public String domain;
@NotNull
public String url;
public int ordinal;
@NotNull
public String state;
@Nullable
public String stateReason;
@Nullable
public String title;
@Nullable
public String description;
public int htmlFeatures;
@Nullable
public String htmlStandard;
public int length;
public long hash;
public float quality;
@Nullable
public Integer pubYear;
@Nullable
public List<String> words;
@Nullable
public List<Long> metas;
public static MessageType schema = new MessageType(
ProcessedDocumentData.class.getSimpleName(),
Types.required(BINARY).as(stringType()).named("domain"),
Types.required(BINARY).as(stringType()).named("url"),
Types.required(INT32).named("ordinal"),
Types.required(BINARY).as(stringType()).named("state"),
Types.optional(BINARY).as(stringType()).named("stateReason"),
Types.optional(BINARY).as(stringType()).named("title"),
Types.optional(BINARY).as(stringType()).named("description"),
Types.optional(INT32).named("htmlFeatures"),
Types.optional(BINARY).as(stringType()).named("htmlStandard"),
Types.optional(INT64).named("hash"),
Types.optional(INT32).named("length"),
Types.optional(FLOAT).named("quality"),
Types.optional(INT32).named("pubYear"),
Types.repeated(INT64).named("wordMeta"),
Types.repeated(BINARY).as(stringType()).named("word")
);
public ProcessedDocumentData add(String heading, Object value) {
switch (heading) {
case "domain" -> domain = (String) value;
case "url" -> url = (String) value;
case "ordinal" -> ordinal = (Integer) value;
case "htmlFeatures" -> htmlFeatures = (Integer) value;
case "length" -> length = (Integer) value;
case "pubYear" -> pubYear = (Integer) value;
case "hash" -> hash = (Long) value;
case "quality" -> quality = (Float) value;
case "state" -> state = (String) value;
case "stateReason" -> stateReason = (String) value;
case "title" -> title = (String) value;
case "description" -> description = (String) value;
case "htmlStandard" -> htmlStandard = (String) value;
case "word" -> {
if (this.words == null)
this.words = new ArrayList<>(100);
this.words.add((String) value);
}
case "wordMeta" -> {
if (this.metas == null) {
this.metas = new ArrayList<>(100);
}
this.metas.add((Long) value);
}
default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"');
}
return this;
}
}

View File

@ -0,0 +1,55 @@
package nu.marginalia.model.processed;
import lombok.*;
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.parquet.schema.LogicalTypeAnnotation.*;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class ProcessedDomainData {
@NotNull
public String domain;
public int knownUrls;
public int goodUrls;
public int visitedUrls;
@Nullable
public String state;
@Nullable
public String redirectDomain;
@Nullable
public String ip;
public static MessageType schema = new MessageType(
ProcessedDomainData.class.getSimpleName(),
Types.required(BINARY).as(stringType()).named("domain"),
Types.optional(INT32).named("knownUrls"),
Types.optional(INT32).named("visitedUrls"),
Types.optional(INT32).named("goodUrls"),
Types.required(BINARY).as(stringType()).named("state"),
Types.optional(BINARY).as(stringType()).named("redirectDomain"),
Types.optional(BINARY).as(stringType()).named("ip"));
public ProcessedDomainData add(String heading, Object value) {
switch (heading) {
case "domain" -> domain = (String) value;
case "knownUrls" -> knownUrls = (Integer) value;
case "visitedUrls" -> visitedUrls = (Integer) value;
case "goodUrls" -> goodUrls = (Integer) value;
case "state" -> state = (String) value;
case "redirectDomain" -> redirectDomain = (String) value;
case "ip" -> ip = (String) value;
default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"');
}
return this;
}
}

View File

@ -0,0 +1,89 @@
package nu.marginalia.io.processed;
import nu.marginalia.model.processed.ProcessedDocumentData;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import static org.junit.jupiter.api.Assertions.*;
class ProcessedDocumentParquetFileReaderTest {
Path parquetFile;
@BeforeEach
public void setUp() throws IOException {
parquetFile = Files.createTempFile(getClass().getSimpleName(), ".parquet");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(parquetFile);
}
@Test
public void test() throws IOException {
var doc = new ProcessedDocumentData(
"www.marginalia.nu",
"https://www.marginalia.nu/",
0,
"OK",
null,
"Itsa me, Marginalia!",
"Hello World",
3,
"HTML5",
123,
0xF00BA3L,
0.25f,
null,
List.of("Hello", "world"),
List.of(2L, 3L)
);
try (var writer = new ProcessedDocumentParquetFileWriter(parquetFile)) {
writer.write(doc);
}
var read = ProcessedDocumentParquetFileReader.stream(parquetFile).toList();
assertEquals(List.of(doc), read);
}
@Test
public void testHugePayload() throws IOException {
List<String> words = IntStream.range(0, 100000).mapToObj(Integer::toString).toList();
List<Long> metas = LongStream.range(0, 100000).boxed().toList();
var doc = new ProcessedDocumentData(
"www.marginalia.nu",
"https://www.marginalia.nu/",
0,
"OK",
null,
"Itsa me, Marginalia!",
"Hello World",
3,
"HTML5",
123,
0xF00BA3L,
0.25f,
null,
words,
metas
);
try (var writer = new ProcessedDocumentParquetFileWriter(parquetFile)) {
writer.write(doc);
}
var read = ProcessedDocumentParquetFileReader.stream(parquetFile).toList();
assertEquals(List.of(doc), read);
}
}

View File

@ -0,0 +1,63 @@
package nu.marginalia.io.processed;
import nu.marginalia.model.processed.ProcessedDomainData;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
class ProcessedDomainParquetFileReaderTest {
Path parquetFile;
@BeforeEach
public void setUp() throws IOException {
parquetFile = Files.createTempFile(getClass().getSimpleName(), ".parquet");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(parquetFile);
}
@Test
public void testReadFull() throws IOException {
var first = new ProcessedDomainData(
"www.marginalia.nu",
10,
3,
5,
"'sall good man",
null,
"127.0.0.1"
);
var second = new ProcessedDomainData(
"memex.marginalia.nu",
0,
0,
0,
"REDIRECT",
"www.marginalia.nu",
"127.0.0.1"
);
try (var writer = new ProcessedDomainParquetFileWriter(parquetFile)) {
writer.write(first);
writer.write(second);
}
var domainNames = ProcessedDomainParquetFileReader.getDomainNames(parquetFile);
assertEquals(List.of("www.marginalia.nu", "memex.marginalia.nu"), domainNames);
var items = ProcessedDomainParquetFileReader
.stream(parquetFile)
.toList();
assertEquals(List.of(first, second), items);
}
}

View File

@ -66,6 +66,7 @@ include 'code:processes:test-data'
include 'code:process-models:converting-model' include 'code:process-models:converting-model'
include 'code:process-models:crawling-model' include 'code:process-models:crawling-model'
include 'code:process-models:work-log' include 'code:process-models:work-log'
include 'code:process-models:processed-data'
include 'code:tools:term-frequency-extractor' include 'code:tools:term-frequency-extractor'
include 'code:tools:crawl-job-extractor' include 'code:tools:crawl-job-extractor'