Skip to content

Commit

Permalink
Parquet: Implement Variant readers (#12139)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue authored Feb 18, 2025
1 parent 5e1ce86 commit 3c8f369
Show file tree
Hide file tree
Showing 12 changed files with 2,359 additions and 110 deletions.
36 changes: 20 additions & 16 deletions core/src/main/java/org/apache/iceberg/variants/Variants.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ enum BasicType {
ARRAY
}

public static VariantMetadata emptyMetadata() {
return SerializedMetadata.EMPTY_V1_METADATA;
}

public static VariantMetadata metadata(ByteBuffer metadata) {
return SerializedMetadata.from(metadata);
}
Expand Down Expand Up @@ -89,59 +93,59 @@ public static VariantPrimitive<Void> ofNull() {
return new PrimitiveWrapper<>(PhysicalType.NULL, null);
}

static VariantPrimitive<Boolean> of(boolean value) {
public static VariantPrimitive<Boolean> of(boolean value) {
return new PrimitiveWrapper<>(PhysicalType.BOOLEAN_TRUE, value);
}

static VariantPrimitive<Byte> of(byte value) {
public static VariantPrimitive<Byte> of(byte value) {
return new PrimitiveWrapper<>(PhysicalType.INT8, value);
}

static VariantPrimitive<Short> of(short value) {
public static VariantPrimitive<Short> of(short value) {
return new PrimitiveWrapper<>(PhysicalType.INT16, value);
}

static VariantPrimitive<Integer> of(int value) {
public static VariantPrimitive<Integer> of(int value) {
return new PrimitiveWrapper<>(PhysicalType.INT32, value);
}

static VariantPrimitive<Long> of(long value) {
public static VariantPrimitive<Long> of(long value) {
return new PrimitiveWrapper<>(PhysicalType.INT64, value);
}

static VariantPrimitive<Float> of(float value) {
public static VariantPrimitive<Float> of(float value) {
return new PrimitiveWrapper<>(PhysicalType.FLOAT, value);
}

static VariantPrimitive<Double> of(double value) {
public static VariantPrimitive<Double> of(double value) {
return new PrimitiveWrapper<>(PhysicalType.DOUBLE, value);
}

static VariantPrimitive<Integer> ofDate(int value) {
public static VariantPrimitive<Integer> ofDate(int value) {
return new PrimitiveWrapper<>(PhysicalType.DATE, value);
}

static VariantPrimitive<Integer> ofIsoDate(String value) {
public static VariantPrimitive<Integer> ofIsoDate(String value) {
return ofDate(DateTimeUtil.isoDateToDays(value));
}

static VariantPrimitive<Long> ofTimestamptz(long value) {
public static VariantPrimitive<Long> ofTimestamptz(long value) {
return new PrimitiveWrapper<>(PhysicalType.TIMESTAMPTZ, value);
}

static VariantPrimitive<Long> ofIsoTimestamptz(String value) {
public static VariantPrimitive<Long> ofIsoTimestamptz(String value) {
return ofTimestamptz(DateTimeUtil.isoTimestamptzToMicros(value));
}

static VariantPrimitive<Long> ofTimestampntz(long value) {
public static VariantPrimitive<Long> ofTimestampntz(long value) {
return new PrimitiveWrapper<>(PhysicalType.TIMESTAMPNTZ, value);
}

static VariantPrimitive<Long> ofIsoTimestampntz(String value) {
public static VariantPrimitive<Long> ofIsoTimestampntz(String value) {
return ofTimestampntz(DateTimeUtil.isoTimestampToMicros(value));
}

static VariantPrimitive<BigDecimal> of(BigDecimal value) {
public static VariantPrimitive<BigDecimal> of(BigDecimal value) {
int bitLength = value.unscaledValue().bitLength();
if (bitLength < 32) {
return new PrimitiveWrapper<>(PhysicalType.DECIMAL4, value);
Expand All @@ -154,11 +158,11 @@ static VariantPrimitive<BigDecimal> of(BigDecimal value) {
throw new UnsupportedOperationException("Unsupported decimal precision: " + value.precision());
}

static VariantPrimitive<ByteBuffer> of(ByteBuffer value) {
public static VariantPrimitive<ByteBuffer> of(ByteBuffer value) {
return new PrimitiveWrapper<>(PhysicalType.BINARY, value);
}

static VariantPrimitive<String> of(String value) {
public static VariantPrimitive<String> of(String value) {
return new PrimitiveWrapper<>(PhysicalType.STRING, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.variants;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
Expand All @@ -27,10 +29,55 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

public class VariantTestUtil {
private VariantTestUtil() {}

public static void assertEqual(VariantMetadata expected, VariantMetadata actual) {
assertThat(actual).isNotNull();
assertThat(expected).isNotNull();
assertThat(actual.dictionarySize())
.as("Dictionary size should match")
.isEqualTo(expected.dictionarySize());

for (int i = 0; i < expected.dictionarySize(); i += 1) {
assertThat(actual.get(i)).isEqualTo(expected.get(i));
}
}

public static void assertEqual(VariantValue expected, VariantValue actual) {
assertThat(actual).isNotNull();
assertThat(expected).isNotNull();
assertThat(actual.type()).as("Variant type should match").isEqualTo(expected.type());

if (expected.type() == PhysicalType.OBJECT) {
VariantObject expectedObject = expected.asObject();
VariantObject actualObject = actual.asObject();
assertThat(actualObject.numFields())
.as("Variant object num fields should match")
.isEqualTo(expectedObject.numFields());
for (String fieldName : expectedObject.fieldNames()) {
assertEqual(expectedObject.get(fieldName), actualObject.get(fieldName));
}

} else if (expected.type() == PhysicalType.ARRAY) {
VariantArray expectedArray = expected.asArray();
VariantArray actualArray = actual.asArray();
assertThat(actualArray.numElements())
.as("Variant array num element should match")
.isEqualTo(expectedArray.numElements());
for (int i = 0; i < expectedArray.numElements(); i += 1) {
assertEqual(expectedArray.get(i), actualArray.get(i));
}

} else {
assertThat(actual.asPrimitive().get())
.as("Variant primitive value should match")
.isEqualTo(expected.asPrimitive().get());
}
}

private static byte primitiveHeader(int primitiveType) {
return (byte) (primitiveType << 2);
}
Expand Down Expand Up @@ -60,7 +107,11 @@ static SerializedPrimitive createString(String string) {
return SerializedPrimitive.from(buffer, buffer.get(0));
}

static ByteBuffer createMetadata(Collection<String> fieldNames, boolean sortNames) {
public static ByteBuffer emptyMetadata() {
return createMetadata(ImmutableList.of(), true);
}

public static ByteBuffer createMetadata(Collection<String> fieldNames, boolean sortNames) {
if (fieldNames.isEmpty()) {
return SerializedMetadata.EMPTY_V1_BUFFER;
}
Expand Down Expand Up @@ -108,7 +159,7 @@ static ByteBuffer createMetadata(Collection<String> fieldNames, boolean sortName
return buffer;
}

static ByteBuffer createObject(ByteBuffer metadataBuffer, Map<String, VariantValue> data) {
public static ByteBuffer createObject(ByteBuffer metadataBuffer, Map<String, VariantValue> data) {
// create the metadata to look up field names
VariantMetadata metadata = Variants.metadata(metadataBuffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.data.parquet;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,7 +27,9 @@
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetVariantVisitor;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.parquet.VariantReaderBuilder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -431,6 +434,16 @@ public ParquetValueReader<?> primitive(
}
}

@Override
public ParquetValueReader<?> variant(Types.VariantType iVariant, ParquetValueReader<?> reader) {
return reader;
}

@Override
public ParquetVariantVisitor<ParquetValueReader<?>> variantVisitor() {
return new VariantReaderBuilder(type, Arrays.asList(currentPath()));
}

MessageType type() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -75,6 +76,32 @@ private static Schema convertInternal(
converter.getAliases());
}

/**
* Returns true if the name identifies a field in the struct/group.
*
* @param group a GroupType
* @param name a String name
* @return true if the group contains a field with the given name
*/
public static boolean hasField(GroupType group, String name) {
return fieldType(group, name) != null;
}

/**
* Returns the Type of the named field in the struct/group, or null.
*
* @param group a GroupType
* @param name a String name
* @return the Type of the field in the group, or null if it is not present.
*/
public static Type fieldType(GroupType group, String name) {
try {
return group.getType(name);
} catch (InvalidRecordException ignored) {
return null;
}
}

public static MessageType pruneColumns(MessageType fileSchema, Schema expectedSchema) {
// column order must match the incoming type, so it doesn't matter that the ids are unordered
Set<Integer> selectedIds = TypeUtil.getProjectedIds(expectedSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public static ParquetValueReader<Integer> unboxed(ColumnDescriptor desc) {
return new UnboxedReader<>(desc);
}

public static ParquetValueReader<Byte> intsAsByte(ColumnDescriptor desc) {
return new IntAsByteReader(desc);
}

public static ParquetValueReader<Short> intsAsShort(ColumnDescriptor desc) {
return new IntAsShortReader(desc);
}

public static ParquetValueReader<String> strings(ColumnDescriptor desc) {
return new StringReader(desc);
}
Expand Down Expand Up @@ -390,6 +398,28 @@ public String read(String reuse) {
}
}

private static class IntAsByteReader extends UnboxedReader<Byte> {
private IntAsByteReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Byte read(Byte ignored) {
return (byte) readInteger();
}
}

private static class IntAsShortReader extends UnboxedReader<Short> {
private IntAsShortReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Short read(Short ignored) {
return (short) readInteger();
}
}

public static class IntAsLongReader extends UnboxedReader<Long> {
public IntAsLongReader(ColumnDescriptor desc) {
super(desc);
Expand Down
Loading

0 comments on commit 3c8f369

Please sign in to comment.