Skip to content

Commit

Permalink
Allow configuring orc_bloom_filter_columns table property in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jan 7, 2025
1 parent bdd4503 commit d2436a1
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_LAYOUT_ENABLED_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
Expand All @@ -307,6 +308,7 @@
import static io.trino.plugin.iceberg.IcebergUtil.getTopLevelColumns;
import static io.trino.plugin.iceberg.IcebergUtil.newCreateTableTransaction;
import static io.trino.plugin.iceberg.IcebergUtil.schemaFromMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.validateOrcBloomFilterColumns;
import static io.trino.plugin.iceberg.IcebergUtil.validateParquetBloomFilterColumns;
import static io.trino.plugin.iceberg.IcebergUtil.verifyExtraProperties;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
Expand Down Expand Up @@ -374,6 +376,7 @@
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
Expand All @@ -397,6 +400,7 @@ public class IcebergMetadata
.add(FORMAT_VERSION_PROPERTY)
.add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY)
.add(DATA_LOCATION_PROPERTY)
.add(ORC_BLOOM_FILTER_COLUMNS_PROPERTY)
.add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY)
.add(PARTITIONING_PROPERTY)
.add(SORTED_BY_PROPERTY)
Expand Down Expand Up @@ -2295,6 +2299,20 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
parquetBloomFilterColumns.forEach(column -> updateProperties.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + column, "true"));
}

if (properties.containsKey(ORC_BLOOM_FILTER_COLUMNS_PROPERTY)) {
checkFormatForProperty(getFileFormat(icebergTable).toIceberg(), FileFormat.ORC, ORC_BLOOM_FILTER_COLUMNS_PROPERTY);
//noinspection unchecked
List<String> orcBloomFilterColumns = (List<String>) properties.get(ORC_BLOOM_FILTER_COLUMNS_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The orc_bloom_filter_columns property cannot be empty"));
if (orcBloomFilterColumns.isEmpty()) {
updateProperties.remove(ORC_BLOOM_FILTER_COLUMNS);
}
else {
validateOrcBloomFilterColumns(getColumnMetadatas(SchemaParser.fromJson(table.getTableSchemaJson()), typeManager), orcBloomFilterColumns);
updateProperties.set(ORC_BLOOM_FILTER_COLUMNS, Joiner.on(",").join(orcBloomFilterColumns));
}
}

if (properties.containsKey(FILE_FORMAT_PROPERTY)) {
IcebergFileFormat fileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The format property cannot be empty"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
List<String> orcBloomFilterColumns = IcebergTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties());
if (!orcBloomFilterColumns.isEmpty()) {
checkFormatForProperty(fileFormat.toIceberg(), FileFormat.ORC, ORC_BLOOM_FILTER_COLUMNS_PROPERTY);
validateOrcBloomFilterColumns(tableMetadata, orcBloomFilterColumns);
validateOrcBloomFilterColumns(tableMetadata.getColumns(), orcBloomFilterColumns);
propertiesBuilder.put(ORC_BLOOM_FILTER_COLUMNS, Joiner.on(",").join(orcBloomFilterColumns));
propertiesBuilder.put(ORC_BLOOM_FILTER_FPP, String.valueOf(IcebergTableProperties.getOrcBloomFilterFpp(tableMetadata.getProperties())));
}
Expand Down Expand Up @@ -993,9 +993,9 @@ public static void checkFormatForProperty(FileFormat actualStorageFormat, FileFo
}
}

private static void validateOrcBloomFilterColumns(ConnectorTableMetadata tableMetadata, List<String> orcBloomFilterColumns)
public static void validateOrcBloomFilterColumns(List<ColumnMetadata> columns, List<String> orcBloomFilterColumns)
{
Set<String> allColumns = tableMetadata.getColumns().stream()
Set<String> allColumns = columns.stream()
.map(ColumnMetadata::getName)
.collect(toImmutableSet());
if (!allColumns.containsAll(orcBloomFilterColumns)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7475,12 +7475,8 @@ public void testAlterTableWithUnsupportedProperties()

assertUpdate("CREATE TABLE " + tableName + " (a bigint)");

assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES orc_bloom_filter_columns = ARRAY['a']",
"The following properties cannot be updated: orc_bloom_filter_columns");
assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES location = '/var/data/table/', orc_bloom_filter_fpp = 0.5",
"The following properties cannot be updated: location, orc_bloom_filter_fpp");
assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES format = 'ORC', orc_bloom_filter_columns = ARRAY['a']",
"The following properties cannot be updated: orc_bloom_filter_columns");

assertUpdate("DROP TABLE " + tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
package io.trino.plugin.iceberg;

import io.trino.testing.BaseOrcWithBloomFiltersTest;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.sql.TestTable;
import org.junit.jupiter.api.Test;

import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.QueryAssertions.assertContains;
import java.util.Map;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -32,6 +33,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
return IcebergQueryRunner.builder()
.addIcebergProperty("iceberg.file-format", "ORC")
.addIcebergProperty("hive.orc.bloom-filters.enabled", "true")
.addIcebergProperty("hive.orc.default-bloom-filter-fpp", "0.001")
.build();
Expand All @@ -55,14 +57,60 @@ public void testBloomFilterPropertiesArePersistedDuringCreate()
"orc_bloom_filter_columns = array['a','b']," +
"orc_bloom_filter_fpp = 0.1)");

MaterializedResult actualProperties = computeActual("SELECT * FROM \"" + tableName + "$properties\"");
assertThat(actualProperties).isNotNull();
MaterializedResult expectedProperties = resultBuilder(getSession())
.row("write.orc.bloom.filter.columns", "a,b")
.row("write.orc.bloom.filter.fpp", "0.1").build();
assertContains(actualProperties, expectedProperties);
assertThat(getTableProperties(tableName))
.containsEntry("write.orc.bloom.filter.columns", "a,b")
.containsEntry("write.orc.bloom.filter.fpp", "0.1");

assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.contains("orc_bloom_filter_columns", "orc_bloom_filter_fpp");
}

@Test
void testBloomFilterPropertiesArePersistedDuringSetProperties()
{
String tableName = "test_metadata_write_properties_" + randomNameSuffix();
assertQuerySucceeds("CREATE TABLE " + tableName + "(A bigint, b bigint, c bigint)");

assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES orc_bloom_filter_columns = ARRAY['a','B']");
assertThat(getTableProperties(tableName))
.containsEntry("write.orc.bloom.filter.columns", "a,b");

assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES orc_bloom_filter_columns = ARRAY['a']");
assertThat(getTableProperties(tableName))
.containsEntry("write.orc.bloom.filter.columns", "a");

assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES orc_bloom_filter_columns = ARRAY[]");
assertThat(getTableProperties(tableName))
.doesNotContainKey("write.orc.bloom.filter.columns");
}

@Test
void testInvalidBloomFilterProperties()
{
String tableName = "test_invalid_bloom_filter_properties_" + randomNameSuffix();
assertQueryFails(
"CREATE TABLE " + tableName + "(x int) WITH (orc_bloom_filter_columns = ARRAY['missing_column'])",
"\\QOrc bloom filter columns [missing_column] not present in schema");

assertQuerySucceeds("CREATE TABLE " + tableName + "(x array(integer))");
assertQueryFails(
"ALTER TABLE " + tableName + " SET PROPERTIES orc_bloom_filter_columns = ARRAY['missing_column']",
"\\QOrc bloom filter columns [missing_column] not present in schema");
}

@Test
void testInvalidOrcBloomFilterPropertiesOnParquet()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_orc_bloom_filter", "(x int) WITH (format = 'PARQUET')")) {
assertQueryFails(
"ALTER TABLE " + table.getName() + " SET PROPERTIES orc_bloom_filter_columns = ARRAY['x']",
"Cannot specify orc_bloom_filter_columns table property for storage format: PARQUET");
}
}

private Map<String, String> getTableProperties(String tableName)
{
return computeActual("SELECT key, value FROM \"" + tableName + "$properties\"").getMaterializedRows().stream()
.collect(toImmutableMap(row -> (String) row.getField(0), row -> (String) row.getField(1)));
}
}

0 comments on commit d2436a1

Please sign in to comment.