Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for v3 format in Iceberg #24455

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Apache Iceberg is an open table format for huge analytic datasets. The Iceberg
connector allows querying data stored in files written in Iceberg format, as
defined in the [Iceberg Table Spec](https://iceberg.apache.org/spec/). The
connector supports Apache Iceberg table spec versions 1 and 2.
connector supports Apache Iceberg table spec versions 1, 2 and 3.

The table state is maintained in metadata files. All changes to table
state create a new metadata file and replace the old metadata with an atomic
Expand Down Expand Up @@ -865,8 +865,8 @@ connector using a {doc}`WITH </sql/create-table-as>` clause.
- Optionally specifies the file system location URI for the table.
* - `format_version`
- Optionally specifies the format version of the Iceberg specification to use
for new tables; either `1` or `2`. Defaults to `2`. Version `2` is required
for row level deletes.
for new tables; either `1`, `2` or `3`. Defaults to `2`. Only version `2`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get what exactly is the user facing impact of this change.
I see that we disallow altering table to v3 and update/merge.
Are reads, optimize and inserts to existing v3 tables allowed ? Is it valid to allow them while we don't have support for v3 deletion vectors ? Do we error out only if a v3 deletion vector is encountered ?
I think it would be nicer if we just added support for deletion vector along with v3 support, if not sure what's the benefit of supporting v3 without that.

Copy link
Member Author

@ebyhr ebyhr Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rework once they publish API for deletion vectors.

supports row level deletes.
* - `orc_bloom_filter_columns`
- Comma-separated list of columns to use for ORC bloom filter. It improves the
performance of queries using Equality and IN predicates when reading ORC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
public class IcebergConfig
{
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
private static final int FORMAT_VERSION_DEFAULT = 2;
public static final int FORMAT_VERSION_SUPPORT_MAX = 3;
public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.extended-statistics.enabled";
public static final String EXTENDED_STATISTICS_DESCRIPTION = "Enable collection (ANALYZE) and use of extended statistics.";
public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION = "Collect extended statistics during writes";
Expand All @@ -72,7 +73,7 @@ public class IcebergConfig
private boolean registerTableProcedureEnabled;
private boolean addFilesProcedureEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
private int formatVersion = FORMAT_VERSION_SUPPORT_MAX;
private int formatVersion = FORMAT_VERSION_DEFAULT;
private Duration expireSnapshotsMinRetention = new Duration(7, DAYS);
private Duration removeOrphanFilesMinRetention = new Duration(7, DAYS);
private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ public class IcebergMetadata
{
private static final Logger log = Logger.get(IcebergMetadata.class);
private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/[^/]+");
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2;
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2;
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 3;
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 3;
private static final String RETENTION_THRESHOLD = "retention_threshold";
private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN";
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.<String>builder()
Expand Down Expand Up @@ -2283,6 +2283,10 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
// UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version
int formatVersion = (int) properties.get(FORMAT_VERSION_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The format_version property cannot be empty"));
if (formatVersion == 3) {
// TODO https://github.com/trinodb/trino/issues/24457 Allow upgrading to v3 once the connector supports deletion vectors
throw new TrinoException(NOT_SUPPORTED, "Cannot upgrade a table to v3");
}
updateProperties.set(FORMAT_VERSION, Integer.toString(formatVersion));
}

Expand Down Expand Up @@ -2844,7 +2848,12 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
verifyTableVersionForUpdate(table);

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
BaseTable icebergTable = catalog.loadTable(session, table.getSchemaTableName());
int formatVersion = icebergTable.operations().current().formatVersion();
if (formatVersion >= 3) {
// TODO https://github.com/trinodb/trino/issues/24457 Add support for deletion vector
throw new TrinoException(NOT_SUPPORTED, "This connector does not support modifying rows on tables with format version 3 or higher");
}
validateNotModifyingOldSnapshot(table, icebergTable);

beginTransaction(icebergTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6471,6 +6471,19 @@ public void testIfDeletesReturnsNumberOfRemovedRows()
assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 2);
}

@Test
void testUnsupportedOperationsWithFormatVersion3()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_v3", "WITH (format_version = 3) AS SELECT 1 x")) {
assertQueryFails("UPDATE " + table.getName() + " SET x = 0",
"This connector does not support modifying rows on tables with format version 3 or higher");
assertQueryFails("DELETE FROM " + table.getName() + " WHERE x = 0",
"This connector does not support modifying rows on tables with format version 3 or higher");
assertQueryFails("MERGE INTO " + table.getName() + " t USING " + table.getName() + " s ON (t.x = s.x) WHEN MATCHED THEN UPDATE SET x = s.x",
"This connector does not support modifying rows on tables with format version 3 or higher");
}
}

@Test
public void testUpdatingFileFormat()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,18 +759,38 @@ private void runOptimizeDuringWriteOperations(boolean useSmallFiles)
}

@Test
public void testUpgradeTableToV2FromTrino()
public void testUpgradeTableToV3FromTrino()
{
String tableName = "test_upgrade_table_to_v2_from_trino_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25);
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1);

// v1 -> v2
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2");
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2);
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");

// v2 -> v3
assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 3", "Cannot upgrade a table to v3");
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2);
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
}

@Test
public void testUpgradeTableFromV1ToV3()
{
String tableName = "test_upgrade_table_from_v1_to_v3_from_trino_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25);
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1);

// v1 -> v3
assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 3", "Cannot upgrade a table to v3");
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1);
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
}

@Test
public void testDowngradingV2TableToV1Fails()
public void testDowngradingFromV2Fails()
{
String tableName = "test_downgrading_v2_table_to_v1_fails_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25);
Expand All @@ -782,14 +802,33 @@ public void testDowngradingV2TableToV1Fails()
.hasMessage("Cannot downgrade v2 table to v1");
}

@Test
public void testDowngradingFromV3Fails()
{
String tableName = "test_downgrading_from_v3_fails_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 3) AS SELECT * FROM tpch.tiny.nation", 25);
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(3);

assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2"))
.failure()
.hasMessage("Failed to set new property values")
.rootCause()
.hasMessage("Cannot downgrade v3 table to v2");
assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 1"))
.failure()
.hasMessage("Failed to set new property values")
.rootCause()
.hasMessage("Cannot downgrade v3 table to v1");
}

@Test
public void testUpgradingToInvalidVersionFails()
{
String tableName = "test_upgrading_to_invalid_version_fails_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25);
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2);
assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 42"))
.failure().hasMessage("line 1:79: Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 2");
.failure().hasMessage("line 1:79: Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 3");
}

@Test
Expand Down
Loading