Skip to content

Commit

Permalink
Add support for v3 format in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Dec 24, 2024
1 parent 0a23454 commit a798b1d
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 11 deletions.
6 changes: 3 additions & 3 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Apache Iceberg is an open table format for huge analytic datasets. The Iceberg
connector allows querying data stored in files written in Iceberg format, as
defined in the [Iceberg Table Spec](https://iceberg.apache.org/spec/). The
connector supports Apache Iceberg table spec versions 1 and 2.
connector supports Apache Iceberg table spec versions 1, 2 and 3.

The table state is maintained in metadata files. All changes to table
state create a new metadata file and replace the old metadata with an atomic
Expand Down Expand Up @@ -865,8 +865,8 @@ connector using a {doc}`WITH </sql/create-table-as>` clause.
- Optionally specifies the file system location URI for the table.
* - `format_version`
- Optionally specifies the format version of the Iceberg specification to use
for new tables; either `1` or `2`. Defaults to `2`. Version `2` is required
for row level deletes.
for new tables; either `1`, `2` or `3`. Defaults to `2`. Only version `2`
supports row level deletes.
* - `orc_bloom_filter_columns`
- Comma-separated list of columns to use for ORC bloom filter. It improves the
performance of queries using Equality and IN predicates when reading ORC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
public class IcebergConfig
{
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
private static final int FORMAT_VERSION_DEFAULT = 2;
public static final int FORMAT_VERSION_SUPPORT_MAX = 3;
public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.extended-statistics.enabled";
public static final String EXTENDED_STATISTICS_DESCRIPTION = "Enable collection (ANALYZE) and use of extended statistics.";
public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION = "Collect extended statistics during writes";
Expand All @@ -72,7 +73,7 @@ public class IcebergConfig
private boolean registerTableProcedureEnabled;
private boolean addFilesProcedureEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
private int formatVersion = FORMAT_VERSION_SUPPORT_MAX;
private int formatVersion = FORMAT_VERSION_DEFAULT;
private Duration expireSnapshotsMinRetention = new Duration(7, DAYS);
private Duration removeOrphanFilesMinRetention = new Duration(7, DAYS);
private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ public class IcebergMetadata
{
private static final Logger log = Logger.get(IcebergMetadata.class);
private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/[^/]+");
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2;
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2;
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 3;
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 3;
private static final String RETENTION_THRESHOLD = "retention_threshold";
private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN";
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.<String>builder()
Expand Down Expand Up @@ -2283,6 +2283,10 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
// UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version
int formatVersion = (int) properties.get(FORMAT_VERSION_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The format_version property cannot be empty"));
if (formatVersion == 3) {
// // TODO https://github.com/trinodb/trino/issues/24457 Allow upgrading to v3 once the connector supports deletion vectors
throw new TrinoException(NOT_SUPPORTED, "Cannot upgrade a table to v3");
}
updateProperties.set(FORMAT_VERSION, Integer.toString(formatVersion));
}

Expand Down Expand Up @@ -2844,7 +2848,12 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
verifyTableVersionForUpdate(table);

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
BaseTable icebergTable = catalog.loadTable(session, table.getSchemaTableName());
int formatVersion = icebergTable.operations().current().formatVersion();
if (formatVersion >= 3) {
// TODO https://github.com/trinodb/trino/issues/24457 Add support for deletion vector
throw new TrinoException(NOT_SUPPORTED, "This connector does not support modifying rows on tables with format version 3 or higher");
}
validateNotModifyingOldSnapshot(table, icebergTable);

beginTransaction(icebergTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6471,6 +6471,19 @@ public void testIfDeletesReturnsNumberOfRemovedRows()
assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 2);
}

@Test
void testUnsupportedOperationsWithFormatVersion3()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_v3", "WITH (format_version = 3) AS SELECT 1 x")) {
assertQueryFails("UPDATE " + table.getName() + " SET x = 0",
"This connector does not support modifying rows on tables with format version 3 or higher");
assertQueryFails("DELETE FROM " + table.getName() + " WHERE x = 0",
"This connector does not support modifying rows on tables with format version 3 or higher");
assertQueryFails("MERGE INTO " + table.getName() + " t USING " + table.getName() + " s ON (t.x = s.x) WHEN MATCHED THEN UPDATE SET x = s.x",
"This connector does not support modifying rows on tables with format version 3 or higher");
}
}

@Test
public void testUpdatingFileFormat()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,18 +759,38 @@ private void runOptimizeDuringWriteOperations(boolean useSmallFiles)
}

@Test
public void testUpgradeTableToV2FromTrino()
public void testUpgradeTableToV3FromTrino()
{
String tableName = "test_upgrade_table_to_v2_from_trino_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25);
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1);

// v1 -> v2
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2");
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2);
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");

// v2 -> v3
assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 3", "Cannot upgrade a table to v3");
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2);
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
}

@Test
public void testUpgradeTableFromV1ToV3()
{
String tableName = "test_upgrade_table_from_v1_to_v3_from_trino_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25);
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1);

// v1 -> v3
assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 3", "Cannot upgrade a table to v3");
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1);
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
}

@Test
public void testDowngradingV2TableToV1Fails()
public void testDowngradingFromV2Fails()
{
String tableName = "test_downgrading_v2_table_to_v1_fails_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25);
Expand All @@ -782,14 +802,33 @@ public void testDowngradingV2TableToV1Fails()
.hasMessage("Cannot downgrade v2 table to v1");
}

@Test
public void testDowngradingFromV3Fails()
{
String tableName = "test_downgrading_from_v3_fails_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 3) AS SELECT * FROM tpch.tiny.nation", 25);
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(3);

assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2"))
.failure()
.hasMessage("Failed to set new property values")
.rootCause()
.hasMessage("Cannot downgrade v3 table to v2");
assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 1"))
.failure()
.hasMessage("Failed to set new property values")
.rootCause()
.hasMessage("Cannot downgrade v3 table to v1");
}

@Test
public void testUpgradingToInvalidVersionFails()
{
String tableName = "test_upgrading_to_invalid_version_fails_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25);
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2);
assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 42"))
.failure().hasMessage("line 1:79: Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 2");
.failure().hasMessage("line 1:79: Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 3");
}

@Test
Expand Down

0 comments on commit a798b1d

Please sign in to comment.