From ca7b0fd45318585e544f54edb32151a7d4906f39 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 21 Jan 2025 14:10:45 -0500 Subject: [PATCH 1/3] Use Iceberg InternalRecordWrapper partition util --- .../IO_Iceberg_Integration_Tests.json | 2 +- sdks/java/io/iceberg/build.gradle | 1 + .../sdk/io/iceberg/RecordWriterManager.java | 33 +++---------------- 3 files changed, 6 insertions(+), 30 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..37dd25bf9029 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 3 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 41e12921e6f8..d84205791d48 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -55,6 +55,7 @@ dependencies { implementation "org.apache.iceberg:iceberg-api:$iceberg_version" implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" + implementation "org.apache.iceberg:iceberg-data:$iceberg_version" implementation library.java.hadoop_common runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version" diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 63186f26fb5a..c0fdbb940591 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -50,14 +50,11 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.expressions.Literal; -import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; -import org.apache.iceberg.types.Types; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,12 +103,14 @@ class DestinationState { @VisibleForTesting final Map writerCounts = Maps.newHashMap(); private final Map partitionFieldMap = Maps.newHashMap(); private final List exceptions = Lists.newArrayList(); + private final InternalRecordWrapper wrapper; // wrapper that facilitates partitioning DestinationState(IcebergDestination icebergDestination, Table table) { this.icebergDestination = icebergDestination; this.schema = table.schema(); this.spec = table.spec(); this.routingPartitionKey = new PartitionKey(spec, schema); + this.wrapper = new InternalRecordWrapper(schema.asStruct()); this.table = table; for (PartitionField partitionField : spec.fields()) { partitionFieldMap.put(partitionField.name(), partitionField); @@ -156,7 +155,7 @@ class DestinationState { * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. */ boolean write(Record record) { - routingPartitionKey.partition(getPartitionableRecord(record)); + routingPartitionKey.partition(wrapper.wrap(record)); if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) { return false; @@ -207,30 +206,6 @@ private RecordWriter createWriter(PartitionKey partitionKey) { e); } } - - /** - * Resolves an input {@link Record}'s partition values and returns another {@link Record} that - * can be applied to the destination's {@link PartitionSpec}. - */ - private Record getPartitionableRecord(Record record) { - if (spec.isUnpartitioned()) { - return record; - } - Record output = GenericRecord.create(schema); - for (PartitionField partitionField : spec.fields()) { - Transform transform = partitionField.transform(); - Types.NestedField field = schema.findField(partitionField.sourceId()); - String name = field.name(); - Object value = record.getField(name); - @Nullable Literal literal = Literal.of(value.toString()).to(field.type()); - if (literal == null || transform.isVoid() || transform.isIdentity()) { - output.setField(name, value); - } else { - output.setField(name, literal.value()); - } - } - return output; - } } /** From dbb2c3e457130561b469945317727ff9a631db04 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 21 Jan 2025 14:53:32 -0500 Subject: [PATCH 2/3] add test --- .../io/iceberg/RecordWriterManagerTest.java | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 5168f71fef99..32648cb71aeb 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -27,9 +28,12 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.net.URLEncoder; +import java.nio.charset.Charset; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -451,10 +455,27 @@ public void testIdentityPartitioning() throws IOException { .addFloatField("float") .addDoubleField("double") .addStringField("str") + .addLogicalTypeField("date", SqlTypes.DATE) + .addLogicalTypeField("time", SqlTypes.TIME) + .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addDateTimeField("datetime_tz") .build(); - + String timestamp = "2025-01-21T13:18:20.053"; + LocalDateTime localDateTime = LocalDateTime.parse(timestamp); Row row = - Row.withSchema(primitiveTypeSchema).addValues(true, 1, 1L, 1.23f, 4.56, "str").build(); + Row.withSchema(primitiveTypeSchema) + .addValues( + true, + 1, + 1L, + 1.23f, + 4.56, + "str", + localDateTime.toLocalDate(), + localDateTime.toLocalTime(), + localDateTime, + DateTime.parse(timestamp)) + .build(); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(primitiveTypeSchema); PartitionSpec spec = @@ -465,6 +486,10 @@ public void testIdentityPartitioning() throws IOException { .identity("float") .identity("double") .identity("str") + .identity("date") + .identity("time") + .identity("datetime") + .identity("datetime_tz") .build(); WindowedValue dest = getWindowedDestination("identity_partitioning", icebergSchema, spec); @@ -479,8 +504,12 @@ public void testIdentityPartitioning() throws IOException { assertEquals(1, dataFile.getRecordCount()); // build this string: bool=true/int=1/long=1/float=1.0/double=1.0/str=str List expectedPartitions = new ArrayList<>(); + List dateTypes = Arrays.asList("date", "time", "datetime", "datetime_tz"); for (Schema.Field field : primitiveTypeSchema.getFields()) { - Object val = row.getValue(field.getName()); + Object val = checkStateNotNull(row.getValue(field.getName())); + if (dateTypes.contains(field.getName())) { + val = URLEncoder.encode(val.toString(), Charset.defaultCharset().toString()); + } expectedPartitions.add(field.getName() + "=" + val); } String expectedPartitionPath = String.join("/", expectedPartitions); From 50e8d80ac3974f79c00d6a663ceb9a2a1afbd361 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 21 Jan 2025 15:08:22 -0500 Subject: [PATCH 3/3] spotless --- .../apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 32648cb71aeb..eb10722c263f 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -29,7 +29,7 @@ import java.io.IOException; import java.net.URLEncoder; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; @@ -508,7 +508,7 @@ public void testIdentityPartitioning() throws IOException { for (Schema.Field field : primitiveTypeSchema.getFields()) { Object val = checkStateNotNull(row.getValue(field.getName())); if (dateTypes.contains(field.getName())) { - val = URLEncoder.encode(val.toString(), Charset.defaultCharset().toString()); + val = URLEncoder.encode(val.toString(), StandardCharsets.UTF_8.toString()); } expectedPartitions.add(field.getName() + "=" + val); }