Skip to content

Commit

Permalink
Add remaining visitor implementation in core
Browse files Browse the repository at this point in the history
  • Loading branch information
aihuaxu committed Feb 17, 2025
1 parent 1d54ff4 commit 9082cab
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 20 deletions.
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/AssignIds.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public Type map(Types.MapType map, Supplier<Type> keyFuture, Supplier<Type> valu
}
}

@Override
public Type variant(Types.VariantType variant) {
return variant;
}

@Override
public Type primitive(Type.PrimitiveType primitive) {
return primitive;
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,19 @@ public List<String> map(
}
}

@Override
public List<String> variant(Types.VariantType readVariant) {
if (currentType.isVariantType()) {
return NO_ERRORS;
}

// Currently promotion is not allowed to variant type
return ImmutableList.of(
String.format(
": %s cannot be read as a %s",
currentType.typeId().toString().toLowerCase(Locale.ENGLISH), readVariant));
}

@Override
public List<String> primitive(Type.PrimitiveType readPrimitive) {
if (currentType.equals(readPrimitive)) {
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/ReassignDoc.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public Type map(Types.MapType map, Supplier<Type> keyTypeFuture, Supplier<Type>
}
}

@Override
public Type variant(Types.VariantType variant) {
return variant;
}

@Override
public Type primitive(Type.PrimitiveType primitive) {
return primitive;
Expand Down
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,14 @@ public T map(Types.MapType map, T keyResult, T valueResult) {
return null;
}

/**
* @deprecated will be removed in 2.0.0; use {@link #variant(Types.VariantType)} instead.
*/
@Deprecated
public T variant() {
return variant(Types.VariantType.get());
}

public T variant(Types.VariantType variant) {
throw new UnsupportedOperationException("Unsupported type: variant");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Type.PrimitiveType;
Expand Down Expand Up @@ -112,6 +114,34 @@ private void testDisallowPrimitiveToStruct(PrimitiveType from, Schema fromSchema
.contains("cannot be read as a struct");
}

@Test
public void testVariantType() {
Schema fromSchema = new Schema(required(1, "from_field", Types.VariantType.get()));
List<String> errors =
CheckCompatibility.writeCompatibilityErrors(
new Schema(required(1, "to_field", Types.VariantType.get())), fromSchema);
assertThat(errors).as("Should produce 0 error messages").isEmpty();

List<Type> incompatibleTypes = new ArrayList<>();
incompatibleTypes.addAll(
List.of(
Types.StructType.of(required(1, "from", Types.IntegerType.get())),
Types.MapType.ofRequired(1, 2, Types.StringType.get(), Types.IntegerType.get()),
Types.ListType.ofRequired(1, Types.StringType.get())));
incompatibleTypes.addAll(Arrays.asList(PRIMITIVES));

for (Type from : incompatibleTypes) {
fromSchema = new Schema(required(3, "from_field", from));
errors =
CheckCompatibility.writeCompatibilityErrors(
new Schema(required(3, "to_field", Types.VariantType.get())), fromSchema);
assertThat(errors).hasSize(1);
assertThat(errors.get(0))
.as("Should complain that other type to variant is not allowed")
.contains("cannot be read as a variant");
}
}

@Test
public void testRequiredSchemaField() {
Schema write = new Schema(optional(1, "from_field", Types.IntegerType.get()));
Expand Down
54 changes: 36 additions & 18 deletions api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,25 +660,35 @@ private static Stream<Arguments> testTypes() {
Arguments.of(Types.TimestampNanoType.withZone()));
}

@ParameterizedTest
@MethodSource("testTypes")
public void testAssignIdsWithType(Type testType) {
Types.StructType sourceType =
Types.StructType.of(required(0, "id", IntegerType.get()), required(1, "data", testType));
Type expectedType =
Types.StructType.of(required(10, "id", IntegerType.get()), required(11, "data", testType));

Type assignedType = TypeUtil.assignIds(sourceType, oldId -> oldId + 10);
assertThat(assignedType).isEqualTo(expectedType);
}

@ParameterizedTest
@MethodSource("testTypes")
public void testAssignFreshIdsWithType(Type testType) {
Schema schema =
new Schema(required(0, "v", testType), required(1, "A", Types.IntegerType.get()));
Schema schema = new Schema(required(0, "id", IntegerType.get()), required(1, "data", testType));

Schema assignedSchema = TypeUtil.assignFreshIds(schema, new AtomicInteger(10)::incrementAndGet);
Schema expectedSchema =
new Schema(required(11, "v", testType), required(12, "A", Types.IntegerType.get()));
new Schema(required(11, "id", IntegerType.get()), required(12, "data", testType));
assertThat(assignedSchema.asStruct()).isEqualTo(expectedSchema.asStruct());
}

@ParameterizedTest
@MethodSource("testTypes")
public void testReassignIdsWithType(Type testType) {
Schema schema =
new Schema(required(0, "v", testType), required(1, "A", Types.IntegerType.get()));
Schema schema = new Schema(required(0, "id", IntegerType.get()), required(1, "data", testType));
Schema sourceSchema =
new Schema(required(1, "v", testType), required(2, "A", Types.IntegerType.get()));
new Schema(required(1, "id", IntegerType.get()), required(2, "data", testType));

Schema reassignedSchema = TypeUtil.reassignIds(schema, sourceSchema);
assertThat(reassignedSchema.asStruct()).isEqualTo(sourceSchema.asStruct());
Expand All @@ -687,41 +697,49 @@ public void testReassignIdsWithType(Type testType) {
@ParameterizedTest
@MethodSource("testTypes")
public void testIndexByIdWithType(Type testType) {
Schema schema =
new Schema(required(0, "v", testType), required(1, "A", Types.IntegerType.get()));
Schema schema = new Schema(required(0, "id", IntegerType.get()), required(1, "data", testType));

Map<Integer, Types.NestedField> indexByIds = TypeUtil.indexById(schema.asStruct());
assertThat(indexByIds.get(0).type()).isEqualTo(testType);
assertThat(indexByIds.get(1).type()).isEqualTo(testType);
}

@ParameterizedTest
@MethodSource("testTypes")
public void testIndexNameByIdWithType(Type testType) {
Schema schema =
new Schema(required(0, "v", testType), required(1, "A", Types.IntegerType.get()));
Schema schema = new Schema(required(0, "id", IntegerType.get()), required(1, "data", testType));

Map<Integer, String> indexNameByIds = TypeUtil.indexNameById(schema.asStruct());
assertThat(indexNameByIds.get(0)).isEqualTo("v");
assertThat(indexNameByIds.get(1)).isEqualTo("data");
}

@ParameterizedTest
@MethodSource("testTypes")
public void testProjectWithType(Type testType) {
Schema schema =
new Schema(required(0, "v", testType), required(1, "A", Types.IntegerType.get()));
Schema schema = new Schema(required(0, "id", IntegerType.get()), required(1, "data", testType));

Schema expectedSchema = new Schema(required(0, "v", testType));
Schema projectedSchema = TypeUtil.project(schema, Sets.newHashSet(0));
Schema expectedSchema = new Schema(required(1, "data", testType));
Schema projectedSchema = TypeUtil.project(schema, Sets.newHashSet(1));
assertThat(projectedSchema.asStruct()).isEqualTo(expectedSchema.asStruct());
}

@ParameterizedTest
@MethodSource("testTypes")
public void testGetProjectedIdsWithType(Type testType) {
Schema schema =
new Schema(required(0, "v", testType), required(1, "A", Types.IntegerType.get()));
Schema schema = new Schema(required(0, "id", IntegerType.get()), required(1, "data", testType));

Set<Integer> projectedIds = TypeUtil.getProjectedIds(schema);
assertThat(Set.of(0, 1)).isEqualTo(projectedIds);
}

@ParameterizedTest
@MethodSource("testTypes")
public void testReassignDocWithType(Type testType) {
Schema schema = new Schema(required(0, "id", IntegerType.get()), required(1, "data", testType));
Schema docSourceSchema =
new Schema(
required(0, "id", IntegerType.get(), "id"), required(1, "data", testType, "data"));

Schema reassignedSchema = TypeUtil.reassignDoc(schema, docSourceSchema);
assertThat(reassignedSchema.asStruct()).isEqualTo(docSourceSchema.asStruct());
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,11 @@ public Type map(Types.MapType map, Type kResult, Type valueResult) {
}
}

@Override
public Type variant(Types.VariantType variant) {
return variant;
}

@Override
public Type primitive(Type.PrimitiveType primitive) {
return primitive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,11 @@ public MappedFields map(Types.MapType map, MappedFields keyResult, MappedFields
MappedField.of(map.valueId(), "value", valueResult));
}

@Override
public MappedFields variant(Types.VariantType variant) {
return null; // no mapping because variant no nested fields
}

@Override
public MappedFields primitive(Type.PrimitiveType primitive) {
return null; // no mapping because primitives have no nested fields
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/types/FixupTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public Type map(Types.MapType map, Supplier<Type> keyTypeFuture, Supplier<Type>
}
}

@Override
public Type variant(Types.VariantType variant) {
// nothing to fix up
return variant;
}

@Override
public Type primitive(Type.PrimitiveType primitive) {
if (sourceType.equals(primitive)) {
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/java/org/apache/iceberg/mapping/TestNameMapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,16 @@ public void testMappingFindByName() {
"location",
MappedFields.of(MappedField.of(11, "latitude"), MappedField.of(12, "longitude"))));
}

@Test
public void testMappingVariantType() {
Schema schema =
new Schema(
required(1, "id", Types.LongType.get()), required(2, "data", Types.VariantType.get()));

MappedFields expected = MappedFields.of(MappedField.of(1, "id"), MappedField.of(2, "data"));

NameMapping mapping = MappingUtil.create(schema);
assertThat(mapping.asMappedFields()).isEqualTo(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,11 @@ public String map(Types.MapType map, String keyResult, String valueResult) {
return "map<" + keyResult + ", " + valueResult + ">";
}

@Override
public String variant(Types.VariantType variant) {
return "variant";
}

@Override
public String primitive(Type.PrimitiveType primitive) {
switch (primitive.typeId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,13 @@ public void testDescribeSchema() {
3,
"pairs",
Types.MapType.ofOptional(4, 5, Types.StringType.get(), Types.LongType.get())),
required(6, "time", Types.TimestampType.withoutZone()));
required(6, "time", Types.TimestampType.withoutZone()),
required(7, "v", Types.VariantType.get()));

assertThat(Spark3Util.describe(schema))
.as("Schema description isn't correct.")
.isEqualTo(
"struct<data: list<string> not null,pairs: map<string, bigint>,time: timestamp not null>");
"struct<data: list<string> not null,pairs: map<string, bigint>,time: timestamp not null,v: variant not null>");
}

@Test
Expand Down

0 comments on commit 9082cab

Please sign in to comment.