From a798b1df19f6cbd5dfd8ff55bab9678a519ee369 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 12 Dec 2024 18:18:07 +0900 Subject: [PATCH] Add support for v3 format in Iceberg --- docs/src/main/sphinx/connector/iceberg.md | 6 +-- .../trino/plugin/iceberg/IcebergConfig.java | 5 ++- .../trino/plugin/iceberg/IcebergMetadata.java | 15 +++++-- .../iceberg/BaseIcebergConnectorTest.java | 13 ++++++ .../trino/plugin/iceberg/TestIcebergV2.java | 45 +++++++++++++++++-- 5 files changed, 73 insertions(+), 11 deletions(-) diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 42cb12927220..498e6e1d9477 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -7,7 +7,7 @@ Apache Iceberg is an open table format for huge analytic datasets. The Iceberg connector allows querying data stored in files written in Iceberg format, as defined in the [Iceberg Table Spec](https://iceberg.apache.org/spec/). The -connector supports Apache Iceberg table spec versions 1 and 2. +connector supports Apache Iceberg table spec versions 1, 2 and 3. The table state is maintained in metadata files. All changes to table state create a new metadata file and replace the old metadata with an atomic @@ -865,8 +865,8 @@ connector using a {doc}`WITH ` clause. - Optionally specifies the file system location URI for the table. * - `format_version` - Optionally specifies the format version of the Iceberg specification to use - for new tables; either `1` or `2`. Defaults to `2`. Version `2` is required - for row level deletes. + for new tables; either `1`, `2` or `3`. Defaults to `2`. Only version `2` + supports row level deletes. * - `orc_bloom_filter_columns` - Comma-separated list of columns to use for ORC bloom filter. It improves the performance of queries using Equality and IN predicates when reading ORC diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 95728c86a2fb..426fae3ef0b0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -51,7 +51,8 @@ public class IcebergConfig { public static final int FORMAT_VERSION_SUPPORT_MIN = 1; - public static final int FORMAT_VERSION_SUPPORT_MAX = 2; + private static final int FORMAT_VERSION_DEFAULT = 2; + public static final int FORMAT_VERSION_SUPPORT_MAX = 3; public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.extended-statistics.enabled"; public static final String EXTENDED_STATISTICS_DESCRIPTION = "Enable collection (ANALYZE) and use of extended statistics."; public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION = "Collect extended statistics during writes"; @@ -72,7 +73,7 @@ public class IcebergConfig private boolean registerTableProcedureEnabled; private boolean addFilesProcedureEnabled; private Optional hiveCatalogName = Optional.empty(); - private int formatVersion = FORMAT_VERSION_SUPPORT_MAX; + private int formatVersion = FORMAT_VERSION_DEFAULT; private Duration expireSnapshotsMinRetention = new Duration(7, DAYS); private Duration removeOrphanFilesMinRetention = new Duration(7, DAYS); private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 86ca10a5e1d4..26904828e085 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -382,8 +382,8 @@ public class IcebergMetadata { private static final Logger log = Logger.get(IcebergMetadata.class); private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/[^/]+"); - private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2; - private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2; + private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 3; + private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 3; private static final String RETENTION_THRESHOLD = "retention_threshold"; private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN"; public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.builder() @@ -2283,6 +2283,10 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta // UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version int formatVersion = (int) properties.get(FORMAT_VERSION_PROPERTY) .orElseThrow(() -> new IllegalArgumentException("The format_version property cannot be empty")); + if (formatVersion == 3) { + // // TODO https://github.com/trinodb/trino/issues/24457 Allow upgrading to v3 once the connector supports deletion vectors + throw new TrinoException(NOT_SUPPORTED, "Cannot upgrade a table to v3"); + } updateProperties.set(FORMAT_VERSION, Integer.toString(formatVersion)); } @@ -2844,7 +2848,12 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT IcebergTableHandle table = (IcebergTableHandle) tableHandle; verifyTableVersionForUpdate(table); - Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + BaseTable icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + int formatVersion = icebergTable.operations().current().formatVersion(); + if (formatVersion >= 3) { + // TODO https://github.com/trinodb/trino/issues/24457 Add support for deletion vector + throw new TrinoException(NOT_SUPPORTED, "This connector does not support modifying rows on tables with format version 3 or higher"); + } validateNotModifyingOldSnapshot(table, icebergTable); beginTransaction(icebergTable); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 670200247be5..968c156973af 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -6471,6 +6471,19 @@ public void testIfDeletesReturnsNumberOfRemovedRows() assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 2); } + @Test + void testUnsupportedOperationsWithFormatVersion3() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_v3", "WITH (format_version = 3) AS SELECT 1 x")) { + assertQueryFails("UPDATE " + table.getName() + " SET x = 0", + "This connector does not support modifying rows on tables with format version 3 or higher"); + assertQueryFails("DELETE FROM " + table.getName() + " WHERE x = 0", + "This connector does not support modifying rows on tables with format version 3 or higher"); + assertQueryFails("MERGE INTO " + table.getName() + " t USING " + table.getName() + " s ON (t.x = s.x) WHEN MATCHED THEN UPDATE SET x = s.x", + "This connector does not support modifying rows on tables with format version 3 or higher"); + } + } + @Test public void testUpdatingFileFormat() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index a5e59ec83610..91a586860ad6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -759,18 +759,38 @@ private void runOptimizeDuringWriteOperations(boolean useSmallFiles) } @Test - public void testUpgradeTableToV2FromTrino() + public void testUpgradeTableToV3FromTrino() { String tableName = "test_upgrade_table_to_v2_from_trino_" + randomNameSuffix(); assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25); assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1); + + // v1 -> v2 assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2"); assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2); assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); + + // v2 -> v3 + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 3", "Cannot upgrade a table to v3"); + assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); + } + + @Test + public void testUpgradeTableFromV1ToV3() + { + String tableName = "test_upgrade_table_from_v1_to_v3_from_trino_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25); + assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1); + + // v1 -> v3 + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 3", "Cannot upgrade a table to v3"); + assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); } @Test - public void testDowngradingV2TableToV1Fails() + public void testDowngradingFromV2Fails() { String tableName = "test_downgrading_v2_table_to_v1_fails_" + randomNameSuffix(); assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25); @@ -782,6 +802,25 @@ public void testDowngradingV2TableToV1Fails() .hasMessage("Cannot downgrade v2 table to v1"); } + @Test + public void testDowngradingFromV3Fails() + { + String tableName = "test_downgrading_from_v3_fails_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 3) AS SELECT * FROM tpch.tiny.nation", 25); + assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(3); + + assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2")) + .failure() + .hasMessage("Failed to set new property values") + .rootCause() + .hasMessage("Cannot downgrade v3 table to v2"); + assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 1")) + .failure() + .hasMessage("Failed to set new property values") + .rootCause() + .hasMessage("Cannot downgrade v3 table to v1"); + } + @Test public void testUpgradingToInvalidVersionFails() { @@ -789,7 +828,7 @@ public void testUpgradingToInvalidVersionFails() assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25); assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2); assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 42")) - .failure().hasMessage("line 1:79: Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 2"); + .failure().hasMessage("line 1:79: Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 3"); } @Test