From 0d732aea2a817bc9b42f44f1dd742ef0c0d50cc9 Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Wed, 19 Feb 2025 19:55:09 -0800 Subject: [PATCH] Add remaining visitor implementation --- .../apache/iceberg/avro/ApplyNameMapping.java | 5 ++ .../java/org/apache/iceberg/avro/Avro.java | 2 +- .../avro/AvroCustomOrderSchemaVisitor.java | 12 ++-- .../apache/iceberg/avro/AvroEncoderUtil.java | 2 +- .../apache/iceberg/avro/AvroSchemaUtil.java | 4 +- .../iceberg/avro/AvroSchemaVisitor.java | 14 +++-- .../apache/iceberg/avro/BaseWriteBuilder.java | 5 ++ .../java/org/apache/iceberg/avro/HasIds.java | 5 ++ .../org/apache/iceberg/avro/MissingIds.java | 5 ++ .../org/apache/iceberg/avro/RemoveIds.java | 5 ++ .../org/apache/iceberg/avro/SchemaToType.java | 2 +- .../org/apache/iceberg/avro/TypeToSchema.java | 8 +-- .../{Variant.java => VariantLogicalType.java} | 8 +-- .../apache/iceberg/avro/AvroTestHelpers.java | 2 +- .../iceberg/avro/TestAvroNameMapping.java | 17 +++++ .../avro/TestAvroSchemaProjection.java | 63 +++++-------------- .../org/apache/iceberg/avro/TestHasIds.java | 7 ++- .../iceberg/avro/TestSchemaConversions.java | 3 +- 18 files changed, 93 insertions(+), 76 deletions(-) rename core/src/main/java/org/apache/iceberg/avro/{Variant.java => VariantLogicalType.java} (87%) diff --git a/core/src/main/java/org/apache/iceberg/avro/ApplyNameMapping.java b/core/src/main/java/org/apache/iceberg/avro/ApplyNameMapping.java index ce619c47fabe..4092644a71b6 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ApplyNameMapping.java +++ b/core/src/main/java/org/apache/iceberg/avro/ApplyNameMapping.java @@ -128,6 +128,11 @@ public Schema map(Schema map, Schema value) { return map; } + @Override + public Schema variant(Schema variant, List fields) { + return variant; + } + @Override public Schema primitive(Schema primitive) { return primitive; diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 28444bd367c2..2a3ea11590bb 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -86,7 +86,7 @@ private enum Codec { static { LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get()); - LogicalTypes.register(Variant.NAME, schema -> Variant.get()); + LogicalTypes.register(VariantLogicalType.NAME, schema -> VariantLogicalType.get()); DEFAULT_MODEL.addLogicalTypeConversion(new Conversions.DecimalConversion()); DEFAULT_MODEL.addLogicalTypeConversion(new UUIDConversion()); } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroCustomOrderSchemaVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroCustomOrderSchemaVisitor.java index 452cdbffa2b9..05a8b42f5521 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroCustomOrderSchemaVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroCustomOrderSchemaVisitor.java @@ -37,10 +37,6 @@ public static T visit(Schema schema, AvroCustomOrderSchemaVisitor v visitor.recordLevels.push(name); - if (schema.getLogicalType() instanceof Variant) { - return visitor.variant(schema); - } - List fields = schema.getFields(); List names = Lists.newArrayListWithExpectedSize(fields.size()); List> results = Lists.newArrayListWithExpectedSize(fields.size()); @@ -51,7 +47,13 @@ public static T visit(Schema schema, AvroCustomOrderSchemaVisitor v visitor.recordLevels.pop(); - return visitor.record(schema, names, Iterables.transform(results, Supplier::get)); + if (schema.getLogicalType() instanceof VariantLogicalType) { + Preconditions.checkArgument( + AvroSchemaUtil.isVariantSchema(schema), "Invalid variant record: %s", schema); + return visitor.variant(schema); + } else { + return visitor.record(schema, names, Iterables.transform(results, Supplier::get)); + } case UNION: List types = schema.getTypes(); diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java index 42d19ce50b3d..0db8d7dd5f9f 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java @@ -39,7 +39,7 @@ private AvroEncoderUtil() {} static { LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get()); - LogicalTypes.register(Variant.NAME, schema -> Variant.get()); + LogicalTypes.register(VariantLogicalType.NAME, schema -> VariantLogicalType.get()); } private static final byte[] MAGIC_BYTES = new byte[] {(byte) 0xC2, (byte) 0x01}; diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 4a4afa6188a7..ce9661822bb4 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -223,7 +223,9 @@ static boolean isVariantSchema(Schema schema) { return schema.getType() == RECORD && schema.getFields().size() == 2 && schema.getField("metadata") != null - && schema.getField("value") != null; + && schema.getField("metadata").schema().getType() == Schema.Type.BYTES + && schema.getField("value") != null + && schema.getField("value").schema().getType() == Schema.Type.BYTES; } static Schema createMap(int keyId, Schema keySchema, int valueId, Schema valueSchema) { diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java index e63ce3a067cd..11c068c678a3 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java @@ -28,10 +28,6 @@ public abstract class AvroSchemaVisitor { public static T visit(Schema schema, AvroSchemaVisitor visitor) { switch (schema.getType()) { case RECORD: - if (schema.getLogicalType() instanceof Variant) { - return visitor.variant(schema); - } - // check to make sure this hasn't been visited before String name = schema.getFullName(); Preconditions.checkState( @@ -50,7 +46,13 @@ public static T visit(Schema schema, AvroSchemaVisitor visitor) { visitor.recordLevels.pop(); - return visitor.record(schema, names, results); + if (schema.getLogicalType() instanceof VariantLogicalType) { + Preconditions.checkArgument( + AvroSchemaUtil.isVariantSchema(schema), "Invalid variant record: %s", schema); + return visitor.variant(schema, results); + } else { + return visitor.record(schema, names, results); + } case UNION: List types = schema.getTypes(); @@ -107,7 +109,7 @@ public T map(Schema map, T value) { return null; } - public T variant(Schema variant) { + public T variant(Schema variant, List fields) { throw new UnsupportedOperationException("Unsupported type: variant"); } diff --git a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java index f8a2bc604656..e5d44e8800cf 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java +++ b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java @@ -64,6 +64,11 @@ public ValueWriter map(Schema map, ValueWriter valueWriter) { return ValueWriters.map(ValueWriters.strings(), valueWriter); } + @Override + public ValueWriter variant(Schema variant, List> fields) { + return createRecordWriter(fields); + } + @Override public ValueWriter primitive(Schema primitive) { LogicalType logicalType = primitive.getLogicalType(); diff --git a/core/src/main/java/org/apache/iceberg/avro/HasIds.java b/core/src/main/java/org/apache/iceberg/avro/HasIds.java index 52ecfd01eaac..3c7c2fe40425 100644 --- a/core/src/main/java/org/apache/iceberg/avro/HasIds.java +++ b/core/src/main/java/org/apache/iceberg/avro/HasIds.java @@ -56,6 +56,11 @@ public Boolean union(Schema union, Iterable options) { return Iterables.any(options, Boolean.TRUE::equals); } + @Override + public Boolean variant(Schema variant) { + return false; + } + @Override public Boolean primitive(Schema primitive) { return false; diff --git a/core/src/main/java/org/apache/iceberg/avro/MissingIds.java b/core/src/main/java/org/apache/iceberg/avro/MissingIds.java index e47d012a36ee..26ca59d0cd59 100644 --- a/core/src/main/java/org/apache/iceberg/avro/MissingIds.java +++ b/core/src/main/java/org/apache/iceberg/avro/MissingIds.java @@ -62,6 +62,11 @@ public Boolean union(Schema union, Iterable options) { return Iterables.any(options, Boolean.TRUE::equals); } + @Override + public Boolean variant(Schema variant) { + return false; + } + @Override public Boolean primitive(Schema primitive) { // primitive node cannot be missing ID as Iceberg do not assign primitive node IDs in the first diff --git a/core/src/main/java/org/apache/iceberg/avro/RemoveIds.java b/core/src/main/java/org/apache/iceberg/avro/RemoveIds.java index dccc8bf57e9d..c95636a166cc 100644 --- a/core/src/main/java/org/apache/iceberg/avro/RemoveIds.java +++ b/core/src/main/java/org/apache/iceberg/avro/RemoveIds.java @@ -60,6 +60,11 @@ public Schema array(Schema array, Schema element) { return result; } + @Override + public Schema variant(Schema variant, List fields) { + return variant; + } + @Override public Schema primitive(Schema primitive) { return Schema.create(primitive.getType()); diff --git a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java index 16def7461a39..661b92621a90 100644 --- a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java +++ b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java @@ -173,7 +173,7 @@ public Type map(Schema map, Type valueType) { } @Override - public Type variant(Schema variant) { + public Type variant(Schema variant, List fieldTypes) { return Types.VariantType.get(); } diff --git a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java index e8a6596c3e67..cc77d88d8f3f 100644 --- a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java +++ b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java @@ -94,7 +94,7 @@ public Schema struct(Types.StructType struct, List fieldSchemas) { Integer fieldId = fieldIds.peek(); String recordName = namesFunction.apply(fieldId, struct); if (recordName == null) { - recordName = "r" + fieldId; + recordName = fieldId != null ? "r" + fieldId : "table"; } Schema recordSchema = lookupSchema(struct, recordName); @@ -188,8 +188,8 @@ public Schema map(Types.MapType map, Schema keySchema, Schema valueSchema) { } @Override - public Schema variant() { - String recordName = "r" + fieldIds.peek(); + public Schema variant(Types.VariantType variant) { + String recordName = fieldIds.peek() != null ? "r" + fieldIds.peek() : "variant"; Schema schema = Schema.createRecord( recordName, @@ -199,7 +199,7 @@ public Schema variant() { List.of( new Schema.Field("metadata", BINARY_SCHEMA), new Schema.Field("value", BINARY_SCHEMA))); - return Variant.get().addToSchema(schema); + return VariantLogicalType.get().addToSchema(schema); } @Override diff --git a/core/src/main/java/org/apache/iceberg/avro/Variant.java b/core/src/main/java/org/apache/iceberg/avro/VariantLogicalType.java similarity index 87% rename from core/src/main/java/org/apache/iceberg/avro/Variant.java rename to core/src/main/java/org/apache/iceberg/avro/VariantLogicalType.java index a6db730132cc..f25b48c6d33f 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Variant.java +++ b/core/src/main/java/org/apache/iceberg/avro/VariantLogicalType.java @@ -22,15 +22,15 @@ import org.apache.avro.Schema; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class Variant extends LogicalType { +public class VariantLogicalType extends LogicalType { static final String NAME = "variant"; - private static final Variant INSTANCE = new Variant(); + private static final VariantLogicalType INSTANCE = new VariantLogicalType(); - static Variant get() { + static VariantLogicalType get() { return INSTANCE; } - private Variant() { + private VariantLogicalType() { super(NAME); } diff --git a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java index d8c49346b9d8..3b96f844b537 100644 --- a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java @@ -52,7 +52,7 @@ static Schema variant(String name) { name, new Schema.Field("metadata", Schema.create(Schema.Type.BYTES), null, null), new Schema.Field("value", Schema.create(Schema.Type.BYTES), null, null)); - return Variant.get().addToSchema(schema); + return VariantLogicalType.get().addToSchema(schema); } static Schema.Field addId(int id, Schema.Field field) { diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index cabc9f250c13..54a456a6fab8 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -342,6 +342,23 @@ public void testInferredMapping() throws IOException { assertThat(projected).isEqualTo(record); } + @Test + public void testVariantNameMapping() { + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.required(1, "var", Types.VariantType.get())); + + org.apache.avro.Schema avroSchema = RemoveIds.removeIds(icebergSchema); + assertThat(AvroSchemaUtil.hasIds(avroSchema)).as("Expect schema has no ids").isFalse(); + + NameMapping nameMapping = + NameMapping.of( + MappedField.of(0, ImmutableList.of("id")), MappedField.of(1, ImmutableList.of("var"))); + org.apache.avro.Schema mappedSchema = AvroSchemaUtil.applyNameMapping(avroSchema, nameMapping); + assertThat(mappedSchema).isEqualTo(AvroSchemaUtil.convert(icebergSchema.asStruct())); + } + @Test @Override public void testAvroArrayAsLogicalMap() { diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroSchemaProjection.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroSchemaProjection.java index cdb144befc88..6632c04f3ff0 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroSchemaProjection.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroSchemaProjection.java @@ -23,6 +23,7 @@ import java.util.Collections; import org.apache.avro.SchemaBuilder; import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; public class TestAvroSchemaProjection { @@ -152,56 +153,20 @@ public void projectWithMapSchemaChanged() { } @Test - public void projectWithVariantSchemaChanged() { - final org.apache.avro.Schema currentAvroSchema = - SchemaBuilder.record("myrecord") - .fields() - .name("f11") - .type() - .nullable() - .intType() - .noDefault() - .endRecord(); - - final org.apache.avro.Schema variantSchema = - SchemaBuilder.record("v") - .fields() - .name("metadata") - .type() - .bytesType() - .noDefault() - .name("value") - .type() - .bytesType() - .noDefault() - .endRecord(); - Variant.get().addToSchema(variantSchema); + public void projectWithVariantType() { + Schema icebergSchema = + new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.required(1, "data", Types.VariantType.get())); - final org.apache.avro.Schema updatedAvroSchema = - SchemaBuilder.record("myrecord") - .fields() - .name("f11") - .type() - .nullable() - .intType() - .noDefault() - .name("f12") - .type(variantSchema) - .noDefault() - .endRecord(); - - final Schema currentIcebergSchema = AvroSchemaUtil.toIceberg(currentAvroSchema); - - // Getting the node ID in updatedAvroSchema allocated by converting into iceberg schema and back - final org.apache.avro.Schema idAllocatedUpdatedAvroSchema = - AvroSchemaUtil.convert(AvroSchemaUtil.toIceberg(updatedAvroSchema).asStruct()); - - final org.apache.avro.Schema projectedAvroSchema = + org.apache.avro.Schema projectedSchema = AvroSchemaUtil.buildAvroProjection( - idAllocatedUpdatedAvroSchema, currentIcebergSchema, Collections.emptyMap()); - - assertThat(AvroSchemaUtil.missingIds(projectedAvroSchema)) - .as("Result of buildAvroProjection is missing some IDs") - .isFalse(); + AvroSchemaUtil.convert(icebergSchema.asStruct()), + icebergSchema.select("data"), + Collections.emptyMap()); + assertThat(projectedSchema.getField("id")).isNull(); + org.apache.avro.Schema variantSchema = projectedSchema.getField("data").schema(); + assertThat(variantSchema.getLogicalType()).isEqualTo(VariantLogicalType.get()); + assertThat(AvroSchemaUtil.isVariantSchema(variantSchema)).isTrue(); } } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestHasIds.java b/core/src/test/java/org/apache/iceberg/avro/TestHasIds.java index 4b69d9c879e9..fab520f109e1 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestHasIds.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestHasIds.java @@ -26,7 +26,7 @@ public class TestHasIds { @Test - public void test() { + public void testRemoveIdsHasIds() { Schema schema = new Schema( Types.NestedField.required(0, "id", Types.LongType.get()), @@ -39,7 +39,10 @@ public void test() { Types.StringType.get(), Types.StructType.of( Types.NestedField.required(1, "lat", Types.FloatType.get()), - Types.NestedField.optional(2, "long", Types.FloatType.get()))))); + Types.NestedField.optional(2, "long", Types.FloatType.get())))), + Types.NestedField.required( + 8, "types", Types.ListType.ofRequired(9, Types.StringType.get())), + Types.NestedField.required(10, "data", Types.VariantType.get())); org.apache.avro.Schema avroSchema = RemoveIds.removeIds(schema); assertThat(AvroSchemaUtil.hasIds(avroSchema)).as("Avro schema should not have IDs").isFalse(); diff --git a/core/src/test/java/org/apache/iceberg/avro/TestSchemaConversions.java b/core/src/test/java/org/apache/iceberg/avro/TestSchemaConversions.java index 160ac15355f2..2c31fe92834b 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestSchemaConversions.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestSchemaConversions.java @@ -80,7 +80,7 @@ public void testPrimitiveTypes() { Schema.create(Schema.Type.BYTES), LogicalTypes.decimal(9, 4) .addToSchema(Schema.createFixed("decimal_9_4", null, null, 4)), - variant("rnull")); + variant("variant")); for (int i = 0; i < primitives.size(); i += 1) { Type type = primitives.get(i); @@ -386,6 +386,7 @@ public void testVariantConversion() { for (int id : Lists.newArrayList(1, 2)) { org.apache.avro.Schema variantSchema = avroSchema.getField("variantCol" + id).schema(); + assertThat(variantSchema.getName()).isEqualTo("r" + id); assertThat(variantSchema.getType()).isEqualTo(org.apache.avro.Schema.Type.RECORD); assertThat(variantSchema.getFields().size()).isEqualTo(2); assertThat(variantSchema.getField("metadata").schema().getType())