From 83b255f4fe094f9136ace394d334e6c99eb65226 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 24 Oct 2024 22:59:59 +0300 Subject: [PATCH 1/6] support partitioning time types --- .../IO_Iceberg_Integration_Tests.json | 2 +- CHANGES.md | 1 + .../beam/sdk/io/iceberg/IcebergUtils.java | 70 ++++++ .../sdk/io/iceberg/RecordWriterManager.java | 6 +- .../sdk/io/iceberg/SerializableDataFile.java | 5 +- .../beam/sdk/io/iceberg/IcebergIOIT.java | 2 +- ...ebergWriteSchemaTransformProviderTest.java | 98 +++++++++ .../io/iceberg/RecordWriterManagerTest.java | 208 +++++++++++++++++- 8 files changed, 384 insertions(+), 8 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 62ae7886c573..1efc8e9e4405 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 1 } diff --git a/CHANGES.md b/CHANGES.md index f2b865cec236..30cd4b7d46fe 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ * [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495)) * [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) * BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527)) +* [Managed Iceberg] Support partitioning time types (year, month, day, hour) ([]()) ## New Features / Improvements diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index ef19a5881366..010d35577c51 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -19,12 +19,16 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import com.google.common.collect.Maps; import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.OffsetDateTime; +import java.time.YearMonth; import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -35,9 +39,16 @@ import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; @@ -506,4 +517,63 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType // LocalDateTime, LocalDate, LocalTime return icebergValue; } + + /** + * Determines partition values for partition a {@link Record}'s values and returns another {@link + * Record} that can be applied to the given {@link PartitionSpec}. + */ + static Record getPartitionableRecord(Record record, PartitionSpec spec) { + if (spec.isUnpartitioned()) { + return record; + } + Record output = GenericRecord.create(spec.schema()); + for (PartitionField partitionField : spec.fields()) { + Transform transform = partitionField.transform(); + int id = partitionField.sourceId(); + Types.NestedField field = spec.schema().findField(id); + String name = field.name(); + Object value = record.getField(name); + @Nullable Literal literal = Literal.of(value.toString()).to(field.type()); + if (literal == null || transform.isVoid() || transform.isIdentity()) { + output.setField(name, value); + } else { + output.setField(name, literal.value()); + } + } + return output; + } + + private static final DateTimeFormatter HOUR_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); + private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC); + + static String resolvePartitionPath(String path, PartitionSpec spec) { + if (Strings.isNullOrEmpty(path) || spec.isUnpartitioned()) { + return path; + } + List resolved = new ArrayList<>(); + List partitionFields = spec.fields(); + Map partitionFieldMap = Maps.newHashMap(); + for (PartitionField partitionField : partitionFields) { + partitionFieldMap.put(partitionField.name(), partitionField); + } + for (String partition : Splitter.on('/').splitToList(path)) { + List parts = Splitter.on('=').splitToList(partition); + String name = parts.get(0); + String value = parts.get(1); + PartitionField partitionField = + Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)); + String transformName = partitionField.transform().toString(); + if (Transforms.month().toString().equals(transformName)) { + int month = YearMonth.parse(value).getMonthValue(); + value = String.valueOf(month); + } else if (Transforms.hour().toString().equals(transformName)) { + LocalDateTime targetDateTime = LocalDateTime.parse(value, HOUR_FORMATTER); + long hour = ChronoUnit.HOURS.between(EPOCH, targetDateTime); + value = String.valueOf(hour); + } + resolved.add(name + "=" + value); + } + return String.join("/", resolved); + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 396db7c20f36..e30b90a2132d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -120,7 +120,9 @@ class DestinationState { e); } openWriters--; - dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk)); + String partitionPath = IcebergUtils.resolvePartitionPath(pk.toPath(), spec); + dataFiles.add( + SerializableDataFile.from(recordWriter.getDataFile(), partitionPath)); }) .build(); } @@ -133,7 +135,7 @@ class DestinationState { * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. */ boolean write(Record record) { - partitionKey.partition(record); + partitionKey.partition(IcebergUtils.getPartitionableRecord(record, spec)); if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { return false; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java index 699d4fa4dfd0..7383a92c1193 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -115,13 +115,14 @@ abstract static class Builder { * Create a {@link SerializableDataFile} from a {@link DataFile} and its associated {@link * PartitionKey}. */ - static SerializableDataFile from(DataFile f, PartitionKey key) { + static SerializableDataFile from(DataFile f, String partitionPath) { + return SerializableDataFile.builder() .setPath(f.path().toString()) .setFileFormat(f.format().toString()) .setRecordCount(f.recordCount()) .setFileSizeInBytes(f.fileSizeInBytes()) - .setPartitionPath(key.toPath()) + .setPartitionPath(partitionPath) .setPartitionSpecId(f.specId()) .setKeyMetadata(f.keyMetadata()) .setSplitOffsets(f.splitOffsets()) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 5df8604699a3..6f307c41404b 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -327,7 +327,7 @@ public void testWritePartitionedData() { PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA) .identity("bool") - .identity("modulo_5") + .hour("datetime") .truncate("str", "value_x".length()) .build(); Table table = diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 47dc9aa425dd..9834547c4741 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -23,14 +23,19 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; @@ -49,12 +54,16 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; +import org.apache.iceberg.util.DateTimeUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Matchers; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.ClassRule; @@ -360,4 +369,93 @@ public Void apply(Iterable input) { return null; } } + + @Test + public void testWritePartitionedData() { + Schema schema = + Schema.builder() + .addStringField("str") + .addInt32Field("int") + .addLogicalTypeField("y_date", SqlTypes.DATE) + .addLogicalTypeField("y_datetime", SqlTypes.DATETIME) + .addDateTimeField("y_datetime_tz") + .addLogicalTypeField("m_date", SqlTypes.DATE) + .addLogicalTypeField("m_datetime", SqlTypes.DATETIME) + .addDateTimeField("m_datetime_tz") + .addLogicalTypeField("d_date", SqlTypes.DATE) + .addLogicalTypeField("d_datetime", SqlTypes.DATETIME) + .addDateTimeField("d_datetime_tz") + .addLogicalTypeField("h_datetime", SqlTypes.DATETIME) + .addDateTimeField("h_datetime_tz") + .build(); + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema); + PartitionSpec spec = + PartitionSpec.builderFor(icebergSchema) + .identity("str") + .bucket("int", 5) + .year("y_date") + .year("y_datetime") + .year("y_datetime_tz") + .month("m_date") + .month("m_datetime") + .month("m_datetime_tz") + .day("d_date") + .day("d_datetime") + .day("d_datetime_tz") + .hour("h_datetime") + .hour("h_datetime_tz") + .build(); + String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + + warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec); + Map config = + ImmutableMap.of( + "table", + identifier, + "catalog_properties", + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location)); + + List rows = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + long millis = i * 100_00_000_000L; + LocalDate localDate = DateTimeUtil.dateFromDays(i * 100); + LocalDateTime localDateTime = DateTimeUtil.timestampFromMicros(millis * 1000); + DateTime dateTime = new DateTime(millis).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25)); + Row row = + Row.withSchema(schema) + .addValues( + "str_" + i, + i, + localDate, + localDateTime, + dateTime, + localDate, + localDateTime, + dateTime, + localDate, + localDateTime, + dateTime, + localDateTime, + dateTime) + .build(); + rows.add(row); + } + + PCollection result = + testPipeline + .apply("Records To Add", Create.of(rows)) + .setRowSchema(schema) + .apply(Managed.write(Managed.ICEBERG).withConfig(config)) + .get(SNAPSHOTS_TAG); + + PAssert.that(result) + .satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append")); + testPipeline.run().waitUntilFinish(); + + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + PCollection readRows = + p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); + PAssert.that(readRows).containsInAnyOrder(rows); + p.run(); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 7adf6defe520..38b3e3066161 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -19,14 +19,20 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -40,6 +46,8 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -77,9 +85,14 @@ public void setUp() { private WindowedValue getWindowedDestination( String tableName, @Nullable PartitionSpec partitionSpec) { + return getWindowedDestination(tableName, ICEBERG_SCHEMA, partitionSpec); + } + + private WindowedValue getWindowedDestination( + String tableName, org.apache.iceberg.Schema schema, @Nullable PartitionSpec partitionSpec) { TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName); - warehouse.createTable(tableIdentifier, ICEBERG_SCHEMA, partitionSpec); + warehouse.createTable(tableIdentifier, schema, partitionSpec); IcebergDestination icebergDestination = IcebergDestination.builder() @@ -287,8 +300,9 @@ public void testSerializableDataFileRoundTripEquality() throws IOException { DataFile datafile = writer.getDataFile(); assertEquals(2L, datafile.recordCount()); + String partitionPath = IcebergUtils.resolvePartitionPath(partitionKey.toPath(), PARTITION_SPEC); DataFile roundTripDataFile = - SerializableDataFile.from(datafile, partitionKey).createDataFile(PARTITION_SPEC); + SerializableDataFile.from(datafile, partitionPath).createDataFile(PARTITION_SPEC); // DataFile doesn't implement a .equals() method. Check equality manually assertEquals(datafile.path(), roundTripDataFile.path()); assertEquals(datafile.format(), roundTripDataFile.format()); @@ -306,4 +320,194 @@ public void testSerializableDataFileRoundTripEquality() throws IOException { assertEquals(datafile.dataSequenceNumber(), roundTripDataFile.dataSequenceNumber()); assertEquals(datafile.pos(), roundTripDataFile.pos()); } + + @Test + public void testIdentityPartitioning() throws IOException { + Schema primitiveTypeSchema = + Schema.builder() + .addBooleanField("bool") + .addInt32Field("int") + .addInt64Field("long") + .addFloatField("float") + .addDoubleField("double") + .addStringField("str") + .build(); + + Row row = + Row.withSchema(primitiveTypeSchema).addValues(true, 1, 1L, 1.23f, 4.56, "str").build(); + org.apache.iceberg.Schema icebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(primitiveTypeSchema); + PartitionSpec spec = + PartitionSpec.builderFor(icebergSchema) + .identity("bool") + .identity("int") + .identity("long") + .identity("float") + .identity("double") + .identity("str") + .build(); + WindowedValue dest = + getWindowedDestination("identity_partitioning", icebergSchema, spec); + + RecordWriterManager writer = + new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE, Integer.MAX_VALUE); + writer.write(dest, row); + writer.close(); + List files = writer.getSerializableDataFiles().get(dest); + assertEquals(1, files.size()); + SerializableDataFile dataFile = files.get(0); + assertEquals(1, dataFile.getRecordCount()); + // build this string: bool=true/int=1/long=1/float=1.0/double=1.0/str=str + List expectedPartitions = new ArrayList<>(); + for (Schema.Field field : primitiveTypeSchema.getFields()) { + Object val = row.getValue(field.getName()); + expectedPartitions.add(field.getName() + "=" + val); + } + String expectedPartitionPath = String.join("/", expectedPartitions); + assertEquals(expectedPartitionPath, dataFile.getPartitionPath()); + assertThat(dataFile.getPath(), containsString(expectedPartitionPath)); + } + + @Test + public void testBucketPartitioning() throws IOException { + Schema bucketSchema = + Schema.builder() + .addInt32Field("int") + .addInt64Field("long") + .addStringField("str") + .addLogicalTypeField("date", SqlTypes.DATE) + .addLogicalTypeField("time", SqlTypes.TIME) + .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addDateTimeField("datetime_tz") + .build(); + + String timestamp = "2024-10-08T13:18:20.053"; + LocalDateTime localDateTime = LocalDateTime.parse(timestamp); + + Row row = + Row.withSchema(bucketSchema) + .addValues( + 1, + 1L, + "str", + localDateTime.toLocalDate(), + localDateTime.toLocalTime(), + localDateTime, + DateTime.parse(timestamp)) + .build(); + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(bucketSchema); + PartitionSpec spec = + PartitionSpec.builderFor(icebergSchema) + .bucket("int", 2) + .bucket("long", 2) + .bucket("str", 2) + .bucket("date", 2) + .bucket("time", 2) + .bucket("datetime", 2) + .bucket("datetime_tz", 2) + .build(); + WindowedValue dest = + getWindowedDestination("bucket_partitioning", icebergSchema, spec); + + RecordWriterManager writer = + new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE, Integer.MAX_VALUE); + writer.write(dest, row); + writer.close(); + List files = writer.getSerializableDataFiles().get(dest); + assertEquals(1, files.size()); + SerializableDataFile dataFile = files.get(0); + assertEquals(1, dataFile.getRecordCount()); + for (Schema.Field field : bucketSchema.getFields()) { + String expectedPartition = field.getName() + "_bucket"; + assertThat(dataFile.getPartitionPath(), containsString(expectedPartition)); + assertThat(dataFile.getPath(), containsString(expectedPartition)); + } + } + + @Test + public void testTimePartitioning() throws IOException { + Schema timePartitioningSchema = + Schema.builder() + .addLogicalTypeField("y_date", SqlTypes.DATE) + .addLogicalTypeField("y_datetime", SqlTypes.DATETIME) + .addDateTimeField("y_datetime_tz") + .addLogicalTypeField("m_date", SqlTypes.DATE) + .addLogicalTypeField("m_datetime", SqlTypes.DATETIME) + .addDateTimeField("m_datetime_tz") + .addLogicalTypeField("d_date", SqlTypes.DATE) + .addLogicalTypeField("d_datetime", SqlTypes.DATETIME) + .addDateTimeField("d_datetime_tz") + .addLogicalTypeField("h_datetime", SqlTypes.DATETIME) + .addDateTimeField("h_datetime_tz") + .build(); + org.apache.iceberg.Schema icebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(timePartitioningSchema); + PartitionSpec spec = + PartitionSpec.builderFor(icebergSchema) + .year("y_date") + .year("y_datetime") + .year("y_datetime_tz") + .month("m_date") + .month("m_datetime") + .month("m_datetime_tz") + .day("d_date") + .day("d_datetime") + .day("d_datetime_tz") + .hour("h_datetime") + .hour("h_datetime_tz") + .build(); + + WindowedValue dest = + getWindowedDestination("time_partitioning", icebergSchema, spec); + + String timestamp = "2024-10-08T13:18:20.053"; + LocalDateTime localDateTime = LocalDateTime.parse(timestamp); + LocalDate localDate = localDateTime.toLocalDate(); + String timestamptz = "2024-10-08T13:18:20.053+03:27"; + DateTime dateTime = DateTime.parse(timestamptz); + + Row row = + Row.withSchema(timePartitioningSchema) + .addValues(localDate, localDateTime, dateTime) // year + .addValues(localDate, localDateTime, dateTime) // month + .addValues(localDate, localDateTime, dateTime) // day + .addValues(localDateTime, dateTime) // hour + .build(); + + // write some rows + RecordWriterManager writer = + new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE, Integer.MAX_VALUE); + writer.write(dest, row); + writer.close(); + List files = writer.getSerializableDataFiles().get(dest); + assertEquals(1, files.size()); + SerializableDataFile serializableDataFile = files.get(0); + assertEquals(1, serializableDataFile.getRecordCount()); + + int year = localDateTime.getYear(); + int month = localDateTime.getMonthValue(); + int day = localDateTime.getDayOfMonth(); + int hour = localDateTime.getHour(); + List expectedPartitions = new ArrayList<>(); + for (Schema.Field field : timePartitioningSchema.getFields()) { + String name = field.getName(); + String expected = ""; + if (name.startsWith("y_")) { + expected = String.format("%s_year=%s", name, year); + } else if (name.startsWith("m_")) { + expected = String.format("%s_month=%s-%02d", name, year, month); + } else if (name.startsWith("d_")) { + expected = String.format("%s_day=%s-%02d-%02d", name, year, month, day); + } else if (name.startsWith("h_")) { + if (name.contains("tz")) { + hour = dateTime.withZone(DateTimeZone.UTC).getHourOfDay(); + } + expected = String.format("%s_hour=%s-%02d-%02d-%02d", name, year, month, day, hour); + } + expectedPartitions.add(expected); + } + String expectedPartition = String.join("/", expectedPartitions); + DataFile dataFile = serializableDataFile.createDataFile(spec); + assertThat(dataFile.path().toString(), containsString(expectedPartition)); + } } From 95258a31e1e87ce672c8110a992e981db142a855 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 24 Oct 2024 23:01:37 +0300 Subject: [PATCH 2/6] add to changes --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 30cd4b7d46fe..2b6fd82c9770 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,7 +68,7 @@ * [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495)) * [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) * BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527)) -* [Managed Iceberg] Support partitioning time types (year, month, day, hour) ([]()) +* [Managed Iceberg] Support partitioning time types (year, month, day, hour) ([#32939](https://github.com/apache/beam/pull/32939)) ## New Features / Improvements From 5bacc00d02b65c6400233602695205a55e1b5aac Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 25 Oct 2024 00:53:49 +0300 Subject: [PATCH 3/6] spotless --- .../main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 010d35577c51..d4a188cecab5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -19,7 +19,6 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import com.google.common.collect.Maps; import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; @@ -42,6 +41,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.data.GenericRecord; From f11053a3ab0da89200ce924eb9c3c1f3d8aa10f1 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 25 Oct 2024 17:54:15 +0300 Subject: [PATCH 4/6] relocate logic to RecordWriterManager --- .../beam/sdk/io/iceberg/IcebergUtils.java | 70 ----------------- .../sdk/io/iceberg/RecordWriterManager.java | 78 ++++++++++++++++++- .../io/iceberg/RecordWriterManagerTest.java | 10 ++- 3 files changed, 85 insertions(+), 73 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index d4a188cecab5..ef19a5881366 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -24,10 +24,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.OffsetDateTime; -import java.time.YearMonth; import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -38,17 +35,9 @@ import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; -import org.apache.iceberg.PartitionField; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.expressions.Literal; -import org.apache.iceberg.transforms.Transform; -import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; @@ -517,63 +506,4 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType // LocalDateTime, LocalDate, LocalTime return icebergValue; } - - /** - * Determines partition values for partition a {@link Record}'s values and returns another {@link - * Record} that can be applied to the given {@link PartitionSpec}. - */ - static Record getPartitionableRecord(Record record, PartitionSpec spec) { - if (spec.isUnpartitioned()) { - return record; - } - Record output = GenericRecord.create(spec.schema()); - for (PartitionField partitionField : spec.fields()) { - Transform transform = partitionField.transform(); - int id = partitionField.sourceId(); - Types.NestedField field = spec.schema().findField(id); - String name = field.name(); - Object value = record.getField(name); - @Nullable Literal literal = Literal.of(value.toString()).to(field.type()); - if (literal == null || transform.isVoid() || transform.isIdentity()) { - output.setField(name, value); - } else { - output.setField(name, literal.value()); - } - } - return output; - } - - private static final DateTimeFormatter HOUR_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); - private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC); - - static String resolvePartitionPath(String path, PartitionSpec spec) { - if (Strings.isNullOrEmpty(path) || spec.isUnpartitioned()) { - return path; - } - List resolved = new ArrayList<>(); - List partitionFields = spec.fields(); - Map partitionFieldMap = Maps.newHashMap(); - for (PartitionField partitionField : partitionFields) { - partitionFieldMap.put(partitionField.name(), partitionField); - } - for (String partition : Splitter.on('/').splitToList(path)) { - List parts = Splitter.on('=').splitToList(partition); - String name = parts.get(0); - String value = parts.get(1); - PartitionField partitionField = - Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)); - String transformName = partitionField.transform().toString(); - if (Transforms.month().toString().equals(transformName)) { - int month = YearMonth.parse(value).getMonthValue(); - value = String.valueOf(month); - } else if (Transforms.hour().toString().equals(transformName)) { - LocalDateTime targetDateTime = LocalDateTime.parse(value, HOUR_FORMATTER); - long hour = ChronoUnit.HOURS.between(EPOCH, targetDateTime); - value = String.valueOf(hour); - } - resolved.add(name + "=" + value); - } - return String.join("/", resolved); - } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index e30b90a2132d..901e2a459b47 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -21,6 +21,11 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.io.IOException; +import java.time.LocalDateTime; +import java.time.YearMonth; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -32,6 +37,8 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; @@ -39,14 +46,20 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.types.Types; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,6 +104,7 @@ class DestinationState { final Cache writers; private final List dataFiles = Lists.newArrayList(); @VisibleForTesting final Map writerCounts = Maps.newHashMap(); + private final Map partitionFieldMap = Maps.newHashMap(); DestinationState(IcebergDestination icebergDestination, Table table) { this.icebergDestination = icebergDestination; @@ -98,6 +112,9 @@ class DestinationState { this.spec = table.spec(); this.partitionKey = new PartitionKey(spec, schema); this.table = table; + for (PartitionField partitionField : spec.fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } // build a cache of RecordWriters. // writers will expire after 1 min of idle time. @@ -120,7 +137,7 @@ class DestinationState { e); } openWriters--; - String partitionPath = IcebergUtils.resolvePartitionPath(pk.toPath(), spec); + String partitionPath = getPartitionDataPath(pk.toPath(), partitionFieldMap); dataFiles.add( SerializableDataFile.from(recordWriter.getDataFile(), partitionPath)); }) @@ -135,7 +152,7 @@ class DestinationState { * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. */ boolean write(Record record) { - partitionKey.partition(IcebergUtils.getPartitionableRecord(record, spec)); + partitionKey.partition(getPartitionableRecord(record)); if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { return false; @@ -184,8 +201,65 @@ private RecordWriter createWriter(PartitionKey partitionKey) { e); } } + + /** + * Resolves an input {@link Record}'s partition values and returns another {@link Record} that + * can be applied to the destination's {@link PartitionSpec}. + */ + private Record getPartitionableRecord(Record record) { + if (spec.isUnpartitioned()) { + return record; + } + Record output = GenericRecord.create(schema); + for (PartitionField partitionField : spec.fields()) { + Transform transform = partitionField.transform(); + Types.NestedField field = schema.findField(partitionField.sourceId()); + String name = field.name(); + Object value = record.getField(name); + @Nullable Literal literal = Literal.of(value.toString()).to(field.type()); + if (literal == null || transform.isVoid() || transform.isIdentity()) { + output.setField(name, value); + } else { + output.setField(name, literal.value()); + } + } + return output; + } } + /** + * Returns an equivalent partition path that is made up of partition data. Needed to reconstruct a + * {@link DataFile}. + */ + @VisibleForTesting + static String getPartitionDataPath( + String partitionPath, Map partitionFieldMap) { + if (Strings.isNullOrEmpty(partitionPath) || partitionFieldMap.isEmpty()) { + return partitionPath; + } + List resolved = new ArrayList<>(); + for (String partition : Splitter.on('/').splitToList(partitionPath)) { + List parts = Splitter.on('=').splitToList(partition); + String name = parts.get(0); + String value = parts.get(1); + String transformName = + Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString(); + if (Transforms.month().toString().equals(transformName)) { + int month = YearMonth.parse(value).getMonthValue(); + value = String.valueOf(month); + } else if (Transforms.hour().toString().equals(transformName)) { + long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, HOUR_FORMATTER)); + value = String.valueOf(hour); + } + resolved.add(name + "=" + value); + } + return String.join("/", resolved); + } + + private static final DateTimeFormatter HOUR_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); + private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC); + private final Catalog catalog; private final String filePrefix; private final long maxFileSize; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 38b3e3066161..e1987b5fec24 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -29,6 +29,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; @@ -41,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; @@ -300,7 +302,13 @@ public void testSerializableDataFileRoundTripEquality() throws IOException { DataFile datafile = writer.getDataFile(); assertEquals(2L, datafile.recordCount()); - String partitionPath = IcebergUtils.resolvePartitionPath(partitionKey.toPath(), PARTITION_SPEC); + Map partitionFieldMap = new HashMap<>(); + for (PartitionField partitionField : PARTITION_SPEC.fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } + + String partitionPath = + RecordWriterManager.getPartitionDataPath(partitionKey.toPath(), partitionFieldMap); DataFile roundTripDataFile = SerializableDataFile.from(datafile, partitionPath).createDataFile(PARTITION_SPEC); // DataFile doesn't implement a .equals() method. Check equality manually From cbae083aba233c013202297f17a7fed98f953f26 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 5 Nov 2024 13:35:11 -0500 Subject: [PATCH 5/6] fix test --- .../beam/sdk/io/iceberg/RecordWriterManagerTest.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 969eba2ab3ab..c48d70f65681 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -365,8 +365,14 @@ public void testRecreateSerializableDataAfterUpdatingPartitionSpec() throws IOEx writer.close(); // fetch data file and its serializable version + Map partitionFieldMap = new HashMap<>(); + for (PartitionField partitionField : PARTITION_SPEC.fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } + String partitionPath = + RecordWriterManager.getPartitionDataPath(partitionKey.toPath(), partitionFieldMap); DataFile datafile = writer.getDataFile(); - SerializableDataFile serializableDataFile = SerializableDataFile.from(datafile, partitionKey); + SerializableDataFile serializableDataFile = SerializableDataFile.from(datafile, partitionPath); assertEquals(2L, datafile.recordCount()); assertEquals(serializableDataFile.getPartitionSpecId(), datafile.specId()); @@ -619,7 +625,9 @@ public void testTimePartitioning() throws IOException { expectedPartitions.add(expected); } String expectedPartition = String.join("/", expectedPartitions); - DataFile dataFile = serializableDataFile.createDataFile(spec); + DataFile dataFile = + serializableDataFile.createDataFile( + catalog.loadTable(dest.getValue().getTableIdentifier()).specs()); assertThat(dataFile.path().toString(), containsString(expectedPartition)); } } From fffb8da86ad7bb4054060f28cbe0324feac309f1 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 16 Dec 2024 12:42:29 -0500 Subject: [PATCH 6/6] address comments --- CHANGES.md | 2 +- .../apache/beam/sdk/io/iceberg/RecordWriterManager.java | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7911f45b8fbc..a0eadb043fbd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ * gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)). * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939)) * Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)). ## New Features / Improvements @@ -105,7 +106,6 @@ * [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495)) * [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) * BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527)) -* [Managed Iceberg] Support partitioning time types (year, month, day, hour) ([#32939](https://github.com/apache/beam/pull/32939)) * [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879)) * Added BigQueryIO as a Managed IO ([#31486](https://github.com/apache/beam/pull/31486)) * Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)). diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index e058b9bccb36..4c21a0175ab0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; @@ -237,14 +236,14 @@ private Record getPartitionableRecord(Record record) { @VisibleForTesting static String getPartitionDataPath( String partitionPath, Map partitionFieldMap) { - if (Strings.isNullOrEmpty(partitionPath) || partitionFieldMap.isEmpty()) { + if (partitionPath.isEmpty() || partitionFieldMap.isEmpty()) { return partitionPath; } List resolved = new ArrayList<>(); for (String partition : Splitter.on('/').splitToList(partitionPath)) { - List parts = Splitter.on('=').splitToList(partition); - String name = parts.get(0); - String value = parts.get(1); + List nameAndValue = Splitter.on('=').splitToList(partition); + String name = nameAndValue.get(0); + String value = nameAndValue.get(1); String transformName = Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString(); if (Transforms.month().toString().equals(transformName)) {