From c01bb7ce83bec9e4dfc93e957bb8794b83d1b253 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Thu, 30 Jan 2025 16:12:33 -0800 Subject: [PATCH] [Kernel] Allow setting arbitrary properties when creating/updating the table (#4107) ## Description Currently, Kernel doesn't allow setting arbitrary table properties other than the `delta.*` properties that Kernel understands. We have valid use cases where we need to allow storing properties with arbitrary key names. As part of this, we also don't want to set any `delta.*` properties that Kernel doesn't understand or supports yet. ## How was this patch tested? UTs ## Does this PR introduce _any_ user-facing changes? Now the connectors can property with any name. Resolves #3149 --- .../io/delta/kernel/internal/TableConfig.java | 37 +++--- .../internal/TransactionBuilderImpl.java | 2 +- .../kernel/internal/TableConfigSuite.scala | 4 +- .../defaults/DeltaTableWritesSuite.scala | 23 ---- .../defaults/TablePropertiesSuite.scala | 107 ++++++++++++++++++ 5 files changed, 133 insertions(+), 40 deletions(-) create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TablePropertiesSuite.scala diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java index aa82b284560..bcd0512b1aa 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java @@ -191,32 +191,41 @@ public class TableConfig { /////////////////////////// /** - * Validates that the given properties have the delta prefix in the key name, and they are in the - * set of valid properties. The caller should get the validated configurations and store the case - * of the property name defined in TableConfig. + * Validates that the given new properties that the txn is trying to update in table. Properties + * that have `delta.` prefix in the key name should be in valid list and are editable. The caller + * is expected to store the returned properties in the table metadata after further validation + * from a protocol point of view. The returned properties will have the key's case normalized as + * defined in its {@link TableConfig}. * - * @param configurations the properties to validate + * @param newProperties the properties to validate * @throws InvalidConfigurationValueException if any of the properties are invalid * @throws UnknownConfigurationException if any of the properties are unknown */ - public static Map validateProperties(Map configurations) { - Map validatedConfigurations = new HashMap<>(); - for (Map.Entry kv : configurations.entrySet()) { + public static Map validateDeltaProperties(Map newProperties) { + Map validatedProperties = new HashMap<>(); + for (Map.Entry kv : newProperties.entrySet()) { String key = kv.getKey().toLowerCase(Locale.ROOT); String value = kv.getValue(); - if (key.startsWith("delta.") && VALID_PROPERTIES.containsKey(key)) { + + if (key.startsWith("delta.")) { + // If it is a delta table property, make sure it is a supported property and editable + if (!VALID_PROPERTIES.containsKey(key)) { + throw DeltaErrors.unknownConfigurationException(kv.getKey()); + } + TableConfig tableConfig = VALID_PROPERTIES.get(key); - if (tableConfig.editable) { - tableConfig.validate(value); - validatedConfigurations.put(tableConfig.getKey(), value); - } else { + if (!tableConfig.editable) { throw DeltaErrors.cannotModifyTableProperty(kv.getKey()); } + + tableConfig.validate(value); + validatedProperties.put(tableConfig.getKey(), value); } else { - throw DeltaErrors.unknownConfigurationException(kv.getKey()); + // allow unknown properties to be set + validatedProperties.put(key, value); } } - return validatedConfigurations; + return validatedProperties; } private static void addConfig(HashMap> configs, TableConfig config) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index 6639a097381..b1586f18c72 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -137,7 +137,7 @@ public Transaction build(Engine engine) { Protocol protocol = snapshot.getProtocol(); if (tableProperties.isPresent()) { Map validatedProperties = - TableConfig.validateProperties(tableProperties.get()); + TableConfig.validateDeltaProperties(tableProperties.get()); Map newProperties = metadata.filterOutUnchangedProperties(validatedProperties); diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableConfigSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableConfigSuite.scala index 65ac2fe4d05..9587395a64b 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableConfigSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableConfigSuite.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ class TableConfigSuite extends AnyFunSuite { test("check TableConfig.editable is true") { - TableConfig.validateProperties( + TableConfig.validateDeltaProperties( Map( TableConfig.TOMBSTONE_RETENTION.getKey -> "interval 2 week", TableConfig.CHECKPOINT_INTERVAL.getKey -> "20", @@ -36,7 +36,7 @@ class TableConfigSuite extends AnyFunSuite { test("check TableConfig.MAX_COLUMN_ID.editable is false") { val e = intercept[KernelException] { - TableConfig.validateProperties( + TableConfig.validateDeltaProperties( Map( TableConfig.TOMBSTONE_RETENTION.getKey -> "interval 2 week", TableConfig.CHECKPOINT_INTERVAL.getKey -> "20", diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 5907fb32108..83b77ef208b 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -292,29 +292,6 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - test("create table - invalid properties - expect failure") { - withTempDirAndEngine { (tablePath, engine) => - val ex1 = intercept[UnknownConfigurationException] { - createTxn( - engine, tablePath, isNewTable = true, testSchema, Seq.empty, Map("invalid key" -> "10")) - } - assert(ex1.getMessage.contains("Unknown configuration was specified: invalid key")) - - val ex2 = intercept[InvalidConfigurationValueException] { - createTxn( - engine, - tablePath, - isNewTable = true, - testSchema, Seq.empty, Map(TableConfig.CHECKPOINT_INTERVAL.getKey -> "-1")) - } - assert( - ex2.getMessage.contains( - String.format( - "Invalid value for table property '%s': '%s'. %s", - TableConfig.CHECKPOINT_INTERVAL.getKey, "-1", "needs to be a positive integer."))) - } - } - test("create partitioned table - partition column is not part of the schema") { withTempDirAndEngine { (tablePath, engine) => val table = Table.forPath(engine, tablePath) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TablePropertiesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TablePropertiesSuite.scala new file mode 100644 index 00000000000..1523a882104 --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TablePropertiesSuite.scala @@ -0,0 +1,107 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults + +import io.delta.kernel.Table +import io.delta.kernel.exceptions.UnknownConfigurationException +import io.delta.kernel.internal.SnapshotImpl +import io.delta.kernel.utils.CloseableIterable.emptyIterable + +import scala.collection.immutable.Seq + +/** + * Suite to set or get table properties. + * TODO: for now we just have the support for `set`. API `get` will be added in the next PRs. + */ +class TablePropertiesSuite extends DeltaTableWriteSuiteBase { + test("create/update table - allow arbitrary properties") { + withTempDir { tempFile => + val tablePath = tempFile.getAbsolutePath + + // create table with arbitrary properties and check if they are set + createUpdateTableWithProps( + tablePath, + createTable = true, + props = Map("my key" -> "10", "my key2" -> "20") + ) + assertHasProp(tablePath, expProps = Map("my key" -> "10", "my key2" -> "20")) + + // update table by modifying the arbitrary properties and check if they are updated + createUpdateTableWithProps(tablePath, props = Map("my key" -> "30")) + assertHasProp(tablePath, expProps = Map("my key" -> "30", "my key2" -> "20")) + + // update table without any new properties and check if the existing properties are retained + createUpdateTableWithProps(tablePath) + assertHasProp(tablePath, expProps = Map("my key" -> "30", "my key2" -> "20")) + + // update table by adding new arbitrary properties and check if they are set + createUpdateTableWithProps(tablePath, props = Map("new key3" -> "str")) + assertHasProp( + tablePath, + expProps = Map("my key" -> "30", "my key2" -> "20", "new key3" -> "str")) + } + } + + test("create/update table - disallow unknown delta.* properties to Kernel") { + withTempDir { tempFile => + val tablePath = tempFile.getAbsolutePath + val ex1 = intercept[UnknownConfigurationException] { + createUpdateTableWithProps(tablePath, createTable = true, Map("delta.unknown" -> "str")) + } + assert(ex1.getMessage.contains("Unknown configuration was specified: delta.unknown")) + + // Try updating in an existing table + createUpdateTableWithProps(tablePath, createTable = true) + val ex2 = intercept[UnknownConfigurationException] { + createUpdateTableWithProps(tablePath, props = Map("Delta.unknown" -> "str")) + } + assert(ex2.getMessage.contains("Unknown configuration was specified: Delta.unknown")) + } + } + + test("create/update table - delta configs are stored with same case as defined in TableConfig") { + withTempDir { tempFile => + val tablePath = tempFile.getAbsolutePath + createUpdateTableWithProps(tablePath, + createTable = true, + Map("delta.CHECKPOINTINTERVAL" -> "20")) + assertHasProp(tablePath, expProps = Map("delta.checkpointInterval" -> "20")) + + // Try updating in an existing table + createUpdateTableWithProps( + tablePath, + props = Map("DELTA.CHECKPOINTINTERVAL" -> "30")) + assertHasProp(tablePath, expProps = Map("delta.checkpointInterval" -> "30")) + } + } + + def createUpdateTableWithProps( + tablePath: String, + createTable: Boolean = false, + props: Map[String, String] = null): Unit = { + createTxn(defaultEngine, tablePath, createTable, testSchema, Seq.empty, props) + .commit(defaultEngine, emptyIterable()) + } + + // TODO: this will be replaced with get API in the next PRs. + def assertHasProp(tablePath: String, expProps: Map[String, String]): Unit = { + val snapshot = Table.forPath(defaultEngine, tablePath) + .getLatestSnapshot(defaultEngine).asInstanceOf[SnapshotImpl] + expProps.foreach { case (key, value) => + assert(snapshot.getMetadata.getConfiguration.get(key) === value, key) + } + } +}