Skip to content

Commit

Permalink
add Unknown Type and test for GenericParquetWriter
Browse files Browse the repository at this point in the history
TODO: schema parser, visitors, etc
  • Loading branch information
HonahX committed Jan 9, 2025
1 parent 4d5c87d commit 201a0eb
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 12 deletions.
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public class Schema implements Serializable {

@VisibleForTesting
static final Map<Type.TypeID, Integer> MIN_FORMAT_VERSIONS =
ImmutableMap.of(Type.TypeID.TIMESTAMP_NANO, 3, Type.TypeID.VARIANT, 3);
ImmutableMap.of(
Type.TypeID.TIMESTAMP_NANO, 3, Type.TypeID.VARIANT, 3, Type.TypeID.UNKNOWN, 3);

private final StructType struct;
private final int schemaId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ default String toHumanString(Type type, T value) {
} else {
throw new UnsupportedOperationException("Unsupported binary type: " + value.getClass());
}
case UNKNOWN:
// TODO: optional, more like a safety check
return "null";
default:
return value.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private Comparators() {}
.put(Types.StringType.get(), Comparators.charSequences())
.put(Types.UUIDType.get(), Comparator.naturalOrder())
.put(Types.BinaryType.get(), Comparators.unsignedBytes())
.put(Types.UnknownType.get(), Comparators.nullsFirst())
.buildOrThrow();

public static Comparator<StructLike> forType(Types.StructType struct) {
Expand Down
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Conversions.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public static Object fromPartitionString(Type type, String asString) {
return new BigDecimal(asString);
case DATE:
return Literal.of(asString).to(Types.DateType.get()).value();
case UNKNOWN:
return null;
default:
throw new UnsupportedOperationException(
"Unsupported type for fromPartitionString: " + type);
Expand Down Expand Up @@ -117,6 +119,9 @@ public static ByteBuffer toByteBuffer(Type.TypeID typeId, Object value) {
return (ByteBuffer) value;
case DECIMAL:
return ByteBuffer.wrap(((BigDecimal) value).unscaledValue().toByteArray());
case UNKNOWN:
// TODO: again, just for safety
return null;
default:
throw new UnsupportedOperationException("Cannot serialize type: " + typeId);
}
Expand Down Expand Up @@ -177,6 +182,9 @@ private static Object internalFromByteBuffer(Type type, ByteBuffer buffer) {
byte[] unscaledBytes = new byte[buffer.remaining()];
tmp.get(unscaledBytes);
return new BigDecimal(new BigInteger(unscaledBytes), decimal.scale());
case UNKNOWN:
// TODO: again, safety
return null;
default:
throw new UnsupportedOperationException("Cannot deserialize type: " + type);
}
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/types/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ enum TypeID {
STRUCT(StructLike.class),
LIST(List.class),
MAP(Map.class),
VARIANT(Object.class);
VARIANT(Object.class),
UNKNOWN(Object.class);

private final Class<?> javaClass;

Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ public static boolean isPromotionAllowed(Type from, Type.PrimitiveType to) {
Types.DecimalType toDecimal = (Types.DecimalType) to;
return fromDecimal.scale() == toDecimal.scale()
&& fromDecimal.precision() <= toDecimal.precision();

case UNKNOWN:
// TODO: first round: unknown can be promoted to any type
return true;
}

return false;
Expand Down Expand Up @@ -507,6 +511,9 @@ public static int estimateSize(Types.NestedField field) {

private static int estimateSize(Type type) {
switch (type.typeId()) {
case UNKNOWN:
// unknown type variable is always null
return 0;
case BOOLEAN:
// the size of a boolean variable is virtual machine dependent
// it is common to believe booleans occupy 1 byte in most JVMs
Expand Down
26 changes: 26 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private Types() {}
.put(StringType.get().toString(), StringType.get())
.put(UUIDType.get().toString(), UUIDType.get())
.put(BinaryType.get().toString(), BinaryType.get())
.put(UnknownType.get().toString(), UnknownType.get())
.buildOrThrow();

private static final Pattern FIXED = Pattern.compile("fixed\\[\\s*(\\d+)\\s*\\]");
Expand Down Expand Up @@ -412,6 +413,24 @@ public String toString() {
}
}

public static class UnknownType extends PrimitiveType {
private static final UnknownType INSTANCE = new UnknownType();

public static UnknownType get() {
return INSTANCE;
}

@Override
public TypeID typeId() {
return TypeID.UNKNOWN;
}

@Override
public String toString() {
return "unknown";
}
}

public static class VariantType implements Type {
private static final VariantType INSTANCE = new VariantType();

Expand Down Expand Up @@ -613,6 +632,13 @@ private NestedField(
Object writeDefault) {
Preconditions.checkNotNull(name, "Name cannot be null");
Preconditions.checkNotNull(type, "Type cannot be null");
if (type.typeId() == Type.TypeID.UNKNOWN) {
Preconditions.checkArgument(isOptional, "Unknown type field must be optional");
Preconditions.checkArgument(
initialDefault == null, "Unknown type cannot have non-null initial-default");
Preconditions.checkArgument(
writeDefault == null, "Unknown type cannot have non-null write-default");
}
this.isOptional = isOptional;
this.id = id;
this.name = name;
Expand Down
4 changes: 2 additions & 2 deletions api/src/test/java/org/apache/iceberg/types/TestTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void fromPrimitiveString() {
assertThat(Types.fromPrimitiveString("Decimal(2,3)")).isEqualTo(Types.DecimalType.of(2, 3));

assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> Types.fromPrimitiveString("Unknown"))
.withMessageContaining("Unknown");
.isThrownBy(() -> Types.fromPrimitiveString("unknown-type"))
.withMessageContaining("unknown-type");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ static Schema toOption(Schema schema) {
Preconditions.checkArgument(
isOptionSchema(schema), "Union schemas are not supported: %s", schema);
return schema;
} else if (schema.getType() == Schema.Type.NULL) {
return schema;
} else {
return Schema.createUnion(NULL, schema);
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ abstract class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
private static final Schema UUID_SCHEMA =
LogicalTypes.uuid().addToSchema(Schema.createFixed("uuid_fixed", null, null, 16));
private static final Schema BINARY_SCHEMA = Schema.create(Schema.Type.BYTES);
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);

static {
TIMESTAMP_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false);
Expand Down Expand Up @@ -243,6 +244,9 @@ public Schema primitive(Type.PrimitiveType primitive) {
null,
TypeUtil.decimalRequiredBytes(decimal.precision())));
break;
case UNKNOWN:
primitiveSchema = NULL_SCHEMA;
break;
default:
throw new UnsupportedOperationException("Unsupported type ID: " + primitive.typeId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.apache.iceberg.parquet.ParquetTypeVisitor;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
import org.apache.iceberg.parquet.TripleWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -168,6 +170,12 @@ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalType) {
return Optional.of(new UnknownWriter(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
Expand Down Expand Up @@ -300,4 +308,18 @@ public void write(int repetitionLevel, byte[] value) {
column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value));
}
}

private static class UnknownWriter extends ParquetValueWriters.PrimitiveWriter<Object> {
private UnknownWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, Object value) {}

@Override
public List<TripleWriter<?>> columns() {
return ImmutableList.of();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
Expand All @@ -32,6 +33,7 @@
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
Expand Down Expand Up @@ -88,8 +90,16 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
this.metadata = ImmutableMap.copyOf(metadata);
this.compressor =
new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec);
this.parquetSchema = ParquetSchemaUtil.convert(schema, "table");
this.model = (ParquetValueWriter<T>) createWriterFunc.apply(parquetSchema);
// TODO: remove unknown type from the Schema, we will need a visitor if we allow unknown to be
// in nestedfield.
Schema schemaWithoutUnknown =
new Schema(
schema.columns().stream()
.filter(field -> field.type().typeId() != Type.TypeID.UNKNOWN)
.collect(Collectors.toList()));
MessageType fullParquetSchema = ParquetSchemaUtil.convert(schema, "table");
this.parquetSchema = ParquetSchemaUtil.convert(schemaWithoutUnknown, "table");
this.model = (ParquetValueWriter<T>) createWriterFunc.apply(fullParquetSchema);
this.metricsConfig = metricsConfig;
this.columnIndexTruncateLength =
conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public class TypeToMessageType {
LogicalTypeAnnotation.timestampType(false /* not adjusted to UTC */, TimeUnit.MICROS);
private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS =
LogicalTypeAnnotation.timestampType(true /* adjusted to UTC */, TimeUnit.MICROS);
// According to
// https://github.com/apache/parquet-java/blob/7f77908338192105a5adbfc420a7281d919e8596/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java#L992-L996
// This maps to UNKNOWN
private static final LogicalTypeAnnotation UNKNOWN = LogicalTypeAnnotation.intervalType();

public MessageType convert(Schema schema, String name) {
Types.MessageTypeBuilder builder = Types.buildMessage();
Expand Down Expand Up @@ -187,6 +191,13 @@ public Type primitive(
.id(id)
.named(name);

case UNKNOWN:
return Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(12)
.as(UNKNOWN)
.id(id)
.named(name);

default:
throw new UnsupportedOperationException("Unsupported type for Parquet: " + primitive);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
Expand All @@ -46,7 +47,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -58,16 +63,24 @@ public class TestParquetDataWriter {
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "binary", Types.BinaryType.get()));

private static final Schema SCHEMA_WITH_UNKNOWN =
new Schema(
Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "binary", Types.BinaryType.get()),
Types.NestedField.optional(4, "unknown", Types.UnknownType.get()),
Types.NestedField.optional(5, "test", Types.BooleanType.get()));

private List<Record> records;

@TempDir private Path temp;

@BeforeEach
public void createRecords() {
GenericRecord record = GenericRecord.create(SCHEMA);
GenericRecord record = GenericRecord.create(SCHEMA_WITH_UNKNOWN);

ImmutableList.Builder<Record> builder = ImmutableList.builder();
builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a")));
builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a", "test", true)));
builder.add(record.copy(ImmutableMap.of("id", 2L, "data", "b")));
builder.add(record.copy(ImmutableMap.of("id", 3L, "data", "c")));
builder.add(record.copy(ImmutableMap.of("id", 4L, "data", "d")));
Expand All @@ -80,11 +93,12 @@ public void createRecords() {
public void testDataWriter() throws IOException {
OutputFile file = Files.localOutput(createTempFile(temp));

SortOrder sortOrder = SortOrder.builderFor(SCHEMA).withOrderId(10).asc("id").build();
SortOrder sortOrder =
SortOrder.builderFor(SCHEMA_WITH_UNKNOWN).withOrderId(10).asc("id").build();

DataWriter<Record> dataWriter =
Parquet.writeData(file)
.schema(SCHEMA)
.schema(SCHEMA_WITH_UNKNOWN)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
Expand All @@ -109,15 +123,32 @@ public void testDataWriter() throws IOException {
.as("Sort order should match")
.isEqualTo(sortOrder.orderId());
assertThat(dataFile.keyMetadata()).as("Key metadata should be null").isNull();
assertThat(dataFile.nullValueCounts().containsKey(4))
.as("Unknown type field should not appear in metrics")
.isFalse();

List<Record> writtenRecords;
try (CloseableIterable<Record> reader =
Parquet.read(file.toInputFile())
.project(SCHEMA)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(SCHEMA, fileSchema))
.project(SCHEMA_WITH_UNKNOWN)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(SCHEMA_WITH_UNKNOWN, fileSchema))
.build()) {
writtenRecords = Lists.newArrayList(reader);
}
ParquetFileReader schemaReader =
ParquetFileReader.open(
ParquetIO.file(file.toInputFile()), ParquetReadOptions.builder().build());
MessageType parquetSchema = schemaReader.getFileMetaData().getSchema();
assertThat(parquetSchema)
.as("UNKNOWN type should not be written to data file.")
.isEqualTo(
ParquetSchemaUtil.convert(
new Schema(
SCHEMA_WITH_UNKNOWN.columns().stream()
.filter(field -> field.type().typeId() != Type.TypeID.UNKNOWN)
.collect(Collectors.toList())),
"table"));

assertThat(writtenRecords).as("Written records should match").isEqualTo(records);
}
Expand Down

0 comments on commit 201a0eb

Please sign in to comment.