From 05f99b658189e277bbf32a814ee489cf59add3fa Mon Sep 17 00:00:00 2001 From: big face cat <731030576@qq.com> Date: Tue, 5 Mar 2024 10:29:50 +0800 Subject: [PATCH] Flink: Supports specifying comment for iceberg fields in create table and addcolumn syntax using flinksql (#9606) Co-authored-by: huyuanfeng --- .../apache/iceberg/flink/FlinkCatalog.java | 10 +-- .../flink/FlinkDynamicTableFactory.java | 23 +++--- .../apache/iceberg/flink/FlinkSchemaUtil.java | 77 ++++++++++++++----- .../flink/util/FlinkAlterTableUtil.java | 6 +- .../iceberg/flink/TestFlinkCatalogTable.java | 18 ++++- 5 files changed, 95 insertions(+), 39 deletions(-) diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index f022c8abcb00..86295d78cc13 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -38,6 +38,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; @@ -390,17 +391,16 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " + "create table without 'connector'='iceberg' related properties in an iceberg table."); } - - createIcebergTable(tablePath, table, ignoreIfExists); + Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved"); + createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists); } - void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolean ignoreIfExists) throws CatalogException, TableAlreadyExistException { validateFlinkTable(table); - Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema()); + Schema icebergSchema = FlinkSchemaUtil.convert(table.getResolvedSchema()); PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema); - ImmutableMap.Builder properties = ImmutableMap.builder(); String location = null; for (Map.Entry entry : table.getOptions().entrySet()) { diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index 8e1f420b722d..b7f1be4b93fb 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -24,11 +24,10 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -84,9 +83,9 @@ public FlinkDynamicTableFactory(FlinkCatalog catalog) { @Override public DynamicTableSource createDynamicTableSource(Context context) { ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); - CatalogTable catalogTable = context.getCatalogTable(); - Map tableProps = catalogTable.getOptions(); - TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()); + ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable(); + Map tableProps = resolvedCatalogTable.getOptions(); + TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(resolvedCatalogTable.getSchema()); TableLoader tableLoader; if (catalog != null) { @@ -94,7 +93,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { } else { tableLoader = createTableLoader( - catalogTable, + resolvedCatalogTable, tableProps, objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName()); @@ -106,9 +105,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { @Override public DynamicTableSink createDynamicTableSink(Context context) { ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); - CatalogTable catalogTable = context.getCatalogTable(); - Map writeProps = catalogTable.getOptions(); - TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()); + ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable(); + Map writeProps = resolvedCatalogTable.getOptions(); + TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(resolvedCatalogTable.getSchema()); TableLoader tableLoader; if (catalog != null) { @@ -116,7 +115,7 @@ public DynamicTableSink createDynamicTableSink(Context context) { } else { tableLoader = createTableLoader( - catalogTable, + resolvedCatalogTable, writeProps, objectIdentifier.getDatabaseName(), objectIdentifier.getObjectName()); @@ -147,7 +146,7 @@ public String factoryIdentifier() { } private static TableLoader createTableLoader( - CatalogBaseTable catalogBaseTable, + ResolvedCatalogTable resolvedCatalogTable, Map tableProps, String databaseName, String tableName) { @@ -187,7 +186,7 @@ private static TableLoader createTableLoader( // Create table if not exists in the external catalog. if (!flinkCatalog.tableExists(objectPath)) { try { - flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, true); + flinkCatalog.createIcebergTable(objectPath, resolvedCatalogTable, true); } catch (TableAlreadyExistException e) { throw new AlreadyExistsException( e, diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java index a6b53879ad80..4790dc85bf28 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java @@ -20,7 +20,10 @@ import java.util.List; import java.util.Set; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.TypeConversions; @@ -55,35 +58,69 @@ public class FlinkSchemaUtil { private FlinkSchemaUtil() {} - /** Convert the flink table schema to apache iceberg schema. */ + /** @deprecated Use {@link #convert(ResolvedSchema)} instead. */ + @Deprecated public static Schema convert(TableSchema schema) { LogicalType schemaType = schema.toRowDataType().getLogicalType(); Preconditions.checkArgument( - schemaType instanceof RowType, "Schema logical type should be RowType."); + schemaType instanceof RowType, "Schema logical type should be row type."); RowType root = (RowType) schemaType; Type converted = root.accept(new FlinkTypeToType(root)); - Schema iSchema = new Schema(converted.asStructType().fields()); - return freshIdentifierFieldIds(iSchema, schema); + Schema icebergSchema = new Schema(converted.asStructType().fields()); + if (schema.getPrimaryKey().isPresent()) { + return freshIdentifierFieldIds(icebergSchema, schema.getPrimaryKey().get().getColumns()); + } else { + return icebergSchema; + } + } + + /** Convert the flink table schema to apache iceberg schema with column comment. */ + public static Schema convert(ResolvedSchema flinkSchema) { + List tableColumns = flinkSchema.getColumns(); + // copy from org.apache.flink.table.api.Schema#toRowDataType + DataTypes.Field[] fields = + tableColumns.stream() + .map( + column -> { + if (column.getComment().isPresent()) { + return DataTypes.FIELD( + column.getName(), column.getDataType(), column.getComment().get()); + } else { + return DataTypes.FIELD(column.getName(), column.getDataType()); + } + }) + .toArray(DataTypes.Field[]::new); + + LogicalType schemaType = DataTypes.ROW(fields).notNull().getLogicalType(); + Preconditions.checkArgument( + schemaType instanceof RowType, "Schema logical type should be row type."); + + RowType root = (RowType) schemaType; + Type converted = root.accept(new FlinkTypeToType(root)); + Schema icebergSchema = new Schema(converted.asStructType().fields()); + if (flinkSchema.getPrimaryKey().isPresent()) { + return freshIdentifierFieldIds(icebergSchema, flinkSchema.getPrimaryKey().get().getColumns()); + } else { + return icebergSchema; + } } - private static Schema freshIdentifierFieldIds(Schema iSchema, TableSchema schema) { + private static Schema freshIdentifierFieldIds(Schema icebergSchema, List primaryKeys) { // Locate the identifier field id list. Set identifierFieldIds = Sets.newHashSet(); - if (schema.getPrimaryKey().isPresent()) { - for (String column : schema.getPrimaryKey().get().getColumns()) { - Types.NestedField field = iSchema.findField(column); - Preconditions.checkNotNull( - field, - "Cannot find field ID for the primary key column %s in schema %s", - column, - iSchema); - identifierFieldIds.add(field.fieldId()); - } + for (String primaryKey : primaryKeys) { + Types.NestedField field = icebergSchema.findField(primaryKey); + Preconditions.checkNotNull( + field, + "Cannot find field ID for the primary key column %s in schema %s", + primaryKey, + icebergSchema); + identifierFieldIds.add(field.fieldId()); } - - return new Schema(iSchema.schemaId(), iSchema.asStruct().fields(), identifierFieldIds); + return new Schema( + icebergSchema.schemaId(), icebergSchema.asStruct().fields(), identifierFieldIds); } /** @@ -109,7 +146,11 @@ public static Schema convert(Schema baseSchema, TableSchema flinkSchema) { // fix types that can't be represented in Flink (UUID) Schema fixedSchema = FlinkFixupTypes.fixup(schema, baseSchema); - return freshIdentifierFieldIds(fixedSchema, flinkSchema); + if (flinkSchema.getPrimaryKey().isPresent()) { + return freshIdentifierFieldIds(fixedSchema, flinkSchema.getPrimaryKey().get().getColumns()); + } else { + return fixedSchema; + } } /** diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java index f0b9bf64fb1a..2bbc9cf208fe 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -132,9 +132,11 @@ public static void applySchemaChanges( flinkColumn.getName()); Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType()); if (flinkColumn.getDataType().getLogicalType().isNullable()) { - pendingUpdate.addColumn(flinkColumn.getName(), icebergType); + pendingUpdate.addColumn( + flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null)); } else { - pendingUpdate.addRequiredColumn(flinkColumn.getName(), icebergType); + pendingUpdate.addRequiredColumn( + flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null)); } } else if (change instanceof TableChange.ModifyColumn) { TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change; diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index ef0802d8693d..eaa92e32c49d 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -226,6 +226,19 @@ public void testCreatePartitionTable() throws TableNotExistException { assertThat(catalogTable.getPartitionKeys()).isEqualTo(Collections.singletonList("dt")); } + @TestTemplate + public void testCreateTableWithColumnComment() { + sql("CREATE TABLE tl(id BIGINT COMMENT 'comment - id', data STRING COMMENT 'comment - data')"); + + Table table = table("tl"); + assertThat(table.schema().asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get(), "comment - id"), + Types.NestedField.optional(2, "data", Types.StringType.get(), "comment - data")) + .asStruct()); + } + @TestTemplate public void testCreateTableWithFormatV2ThroughTableProperty() throws Exception { sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')"); @@ -316,14 +329,15 @@ public void testAlterTableAddColumn() { Types.NestedField.optional(2, "dt", Types.StringType.get())) .asStruct()); // Add multiple columns - sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + sql("ALTER TABLE tl ADD (col1 STRING COMMENT 'comment for col1', col2 BIGINT)"); Schema schemaAfter2 = table("tl").schema(); assertThat(schemaAfter2.asStruct()) .isEqualTo( new Schema( Types.NestedField.optional(1, "id", Types.LongType.get()), Types.NestedField.optional(2, "dt", Types.StringType.get()), - Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional( + 3, "col1", Types.StringType.get(), "comment for col1"), Types.NestedField.optional(4, "col2", Types.LongType.get())) .asStruct()); // Adding a required field should fail because Iceberg's SchemaUpdate does not allow