From 9f672a0cf4a648f18e3e1b3d3c90b4d027640842 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Wed, 13 Sep 2023 15:56:35 +0200 Subject: [PATCH] (parquet-floor) Modify the parquet library to permit list-fields. --- third-party/parquet-floor/build.gradle | 2 + .../blue/strategic/parquet/ParquetReader.java | 5 +- .../blue/strategic/parquet/ParquetWriter.java | 61 +++++++++++++++++++ .../blue/strategic/parquet/ValueWriter.java | 5 ++ 4 files changed, 72 insertions(+), 1 deletion(-) diff --git a/third-party/parquet-floor/build.gradle b/third-party/parquet-floor/build.gradle index f8393044..05277c51 100644 --- a/third-party/parquet-floor/build.gradle +++ b/third-party/parquet-floor/build.gradle @@ -16,6 +16,8 @@ dependencies { exclude group: 'commons-pool', module: 'commons-pool' transitive = true } + + implementation libs.trove } test { diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java index 3eee03d0..1ec3e7fb 100644 --- a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java @@ -192,7 +192,10 @@ public final class ParquetReader implements Spliterator, Closeable { U record = hydrator.start(); for (ColumnReader columnReader: this.currentRowGroupColumnReaders) { do { - record = hydrator.add(record, columnReader.getDescriptor().getPath()[0], readValue(columnReader)); + var value = readValue(columnReader); + if (value != null) { + record = hydrator.add(record, columnReader.getDescriptor().getPath()[0], value); + } columnReader.consume(); } while (columnReader.getCurrentRepetitionLevel() != 0); diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java index 7840c49e..6e53c189 100644 --- a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java @@ -1,5 +1,7 @@ package blue.strategic.parquet; +import gnu.trove.list.TIntList; +import gnu.trove.list.TLongList; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.api.WriteSupport; @@ -20,6 +22,9 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + public final class ParquetWriter implements Closeable { private final org.apache.parquet.hadoop.ParquetWriter writer; @@ -117,6 +122,28 @@ public final class ParquetWriter implements Closeable { @Override public void writeList(String name, List value) { + if (value.isEmpty()) { + return; + } + + SimpleWriteSupport.this.writeList(name, value); + } + + @Override + public void writeList(String name, TLongList value) { + if (value.isEmpty()) { + return; + } + + SimpleWriteSupport.this.writeList(name, value); + } + + @Override + public void writeList(String name, TIntList value) { + if (value.isEmpty()) { + return; + } + SimpleWriteSupport.this.writeList(name, value); } }; @@ -170,6 +197,40 @@ public final class ParquetWriter implements Closeable { recordConsumer.endField(name, fieldIndex); } + private void writeList(String name, TLongList values) { + int fieldIndex = schema.getFieldIndex(name); + PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType(); + recordConsumer.startField(name, fieldIndex); + + for (int i = 0; i < values.size(); i++) { + writeValue(type, values.get(i)); + } + + recordConsumer.endField(name, fieldIndex); + } + + private void writeList(String name, TIntList values) { + int fieldIndex = schema.getFieldIndex(name); + PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType(); + recordConsumer.startField(name, fieldIndex); + + for (int i = 0; i < values.size(); i++) { + writeValue(type, values.get(i)); + } + + recordConsumer.endField(name, fieldIndex); + } + + void writeValue(PrimitiveType type, long value) { + assert type.getPrimitiveTypeName() == INT64; + recordConsumer.addLong(value); + } + + void writeValue(PrimitiveType type, int value) { + assert type.getPrimitiveTypeName() == INT32; + recordConsumer.addInteger(value); + } + void writeValue(PrimitiveType type, Object value) { switch (type.getPrimitiveTypeName()) { case INT32: recordConsumer.addInteger((int)value); break; diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java index e8cda912..962f3b50 100644 --- a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java @@ -1,8 +1,13 @@ package blue.strategic.parquet; +import gnu.trove.list.TIntList; +import gnu.trove.list.TLongList; + import java.util.List; public interface ValueWriter { void write(String name, Object value); void writeList(String name, List value); + void writeList(String name, TLongList value); + void writeList(String name, TIntList value); }