Skip to content

Commit

Permalink
[Kernel] Allow setting arbitrary properties when creating/updating th…
Browse files Browse the repository at this point in the history
…e table (#4107)

## Description
Currently, Kernel doesn't allow setting arbitrary table properties other
than the `delta.*` properties that Kernel understands. We have valid use
cases where we need to allow storing properties with arbitrary key
names.

As part of this, we also don't want to set any `delta.*` properties that
Kernel doesn't understand or supports yet.

## How was this patch tested?
UTs

## Does this PR introduce _any_ user-facing changes?
Now the connectors can property with any name.


Resolves #3149
  • Loading branch information
vkorukanti authored Jan 31, 2025
1 parent f32539d commit c01bb7c
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,32 +191,41 @@ public class TableConfig<T> {
///////////////////////////

/**
* Validates that the given properties have the delta prefix in the key name, and they are in the
* set of valid properties. The caller should get the validated configurations and store the case
* of the property name defined in TableConfig.
* Validates that the given new properties that the txn is trying to update in table. Properties
* that have `delta.` prefix in the key name should be in valid list and are editable. The caller
* is expected to store the returned properties in the table metadata after further validation
* from a protocol point of view. The returned properties will have the key's case normalized as
* defined in its {@link TableConfig}.
*
* @param configurations the properties to validate
* @param newProperties the properties to validate
* @throws InvalidConfigurationValueException if any of the properties are invalid
* @throws UnknownConfigurationException if any of the properties are unknown
*/
public static Map<String, String> validateProperties(Map<String, String> configurations) {
Map<String, String> validatedConfigurations = new HashMap<>();
for (Map.Entry<String, String> kv : configurations.entrySet()) {
public static Map<String, String> validateDeltaProperties(Map<String, String> newProperties) {
Map<String, String> validatedProperties = new HashMap<>();
for (Map.Entry<String, String> kv : newProperties.entrySet()) {
String key = kv.getKey().toLowerCase(Locale.ROOT);
String value = kv.getValue();
if (key.startsWith("delta.") && VALID_PROPERTIES.containsKey(key)) {

if (key.startsWith("delta.")) {
// If it is a delta table property, make sure it is a supported property and editable
if (!VALID_PROPERTIES.containsKey(key)) {
throw DeltaErrors.unknownConfigurationException(kv.getKey());
}

TableConfig<?> tableConfig = VALID_PROPERTIES.get(key);
if (tableConfig.editable) {
tableConfig.validate(value);
validatedConfigurations.put(tableConfig.getKey(), value);
} else {
if (!tableConfig.editable) {
throw DeltaErrors.cannotModifyTableProperty(kv.getKey());
}

tableConfig.validate(value);
validatedProperties.put(tableConfig.getKey(), value);
} else {
throw DeltaErrors.unknownConfigurationException(kv.getKey());
// allow unknown properties to be set
validatedProperties.put(key, value);
}
}
return validatedConfigurations;
return validatedProperties;
}

private static void addConfig(HashMap<String, TableConfig<?>> configs, TableConfig<?> config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public Transaction build(Engine engine) {
Protocol protocol = snapshot.getProtocol();
if (tableProperties.isPresent()) {
Map<String, String> validatedProperties =
TableConfig.validateProperties(tableProperties.get());
TableConfig.validateDeltaProperties(tableProperties.get());
Map<String, String> newProperties =
metadata.filterOutUnchangedProperties(validatedProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
class TableConfigSuite extends AnyFunSuite {

test("check TableConfig.editable is true") {
TableConfig.validateProperties(
TableConfig.validateDeltaProperties(
Map(
TableConfig.TOMBSTONE_RETENTION.getKey -> "interval 2 week",
TableConfig.CHECKPOINT_INTERVAL.getKey -> "20",
Expand All @@ -36,7 +36,7 @@ class TableConfigSuite extends AnyFunSuite {

test("check TableConfig.MAX_COLUMN_ID.editable is false") {
val e = intercept[KernelException] {
TableConfig.validateProperties(
TableConfig.validateDeltaProperties(
Map(
TableConfig.TOMBSTONE_RETENTION.getKey -> "interval 2 week",
TableConfig.CHECKPOINT_INTERVAL.getKey -> "20",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,29 +292,6 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("create table - invalid properties - expect failure") {
withTempDirAndEngine { (tablePath, engine) =>
val ex1 = intercept[UnknownConfigurationException] {
createTxn(
engine, tablePath, isNewTable = true, testSchema, Seq.empty, Map("invalid key" -> "10"))
}
assert(ex1.getMessage.contains("Unknown configuration was specified: invalid key"))

val ex2 = intercept[InvalidConfigurationValueException] {
createTxn(
engine,
tablePath,
isNewTable = true,
testSchema, Seq.empty, Map(TableConfig.CHECKPOINT_INTERVAL.getKey -> "-1"))
}
assert(
ex2.getMessage.contains(
String.format(
"Invalid value for table property '%s': '%s'. %s",
TableConfig.CHECKPOINT_INTERVAL.getKey, "-1", "needs to be a positive integer.")))
}
}

test("create partitioned table - partition column is not part of the schema") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults

import io.delta.kernel.Table
import io.delta.kernel.exceptions.UnknownConfigurationException
import io.delta.kernel.internal.SnapshotImpl
import io.delta.kernel.utils.CloseableIterable.emptyIterable

import scala.collection.immutable.Seq

/**
* Suite to set or get table properties.
* TODO: for now we just have the support for `set`. API `get` will be added in the next PRs.
*/
class TablePropertiesSuite extends DeltaTableWriteSuiteBase {
test("create/update table - allow arbitrary properties") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath

// create table with arbitrary properties and check if they are set
createUpdateTableWithProps(
tablePath,
createTable = true,
props = Map("my key" -> "10", "my key2" -> "20")
)
assertHasProp(tablePath, expProps = Map("my key" -> "10", "my key2" -> "20"))

// update table by modifying the arbitrary properties and check if they are updated
createUpdateTableWithProps(tablePath, props = Map("my key" -> "30"))
assertHasProp(tablePath, expProps = Map("my key" -> "30", "my key2" -> "20"))

// update table without any new properties and check if the existing properties are retained
createUpdateTableWithProps(tablePath)
assertHasProp(tablePath, expProps = Map("my key" -> "30", "my key2" -> "20"))

// update table by adding new arbitrary properties and check if they are set
createUpdateTableWithProps(tablePath, props = Map("new key3" -> "str"))
assertHasProp(
tablePath,
expProps = Map("my key" -> "30", "my key2" -> "20", "new key3" -> "str"))
}
}

test("create/update table - disallow unknown delta.* properties to Kernel") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath
val ex1 = intercept[UnknownConfigurationException] {
createUpdateTableWithProps(tablePath, createTable = true, Map("delta.unknown" -> "str"))
}
assert(ex1.getMessage.contains("Unknown configuration was specified: delta.unknown"))

// Try updating in an existing table
createUpdateTableWithProps(tablePath, createTable = true)
val ex2 = intercept[UnknownConfigurationException] {
createUpdateTableWithProps(tablePath, props = Map("Delta.unknown" -> "str"))
}
assert(ex2.getMessage.contains("Unknown configuration was specified: Delta.unknown"))
}
}

test("create/update table - delta configs are stored with same case as defined in TableConfig") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath
createUpdateTableWithProps(tablePath,
createTable = true,
Map("delta.CHECKPOINTINTERVAL" -> "20"))
assertHasProp(tablePath, expProps = Map("delta.checkpointInterval" -> "20"))

// Try updating in an existing table
createUpdateTableWithProps(
tablePath,
props = Map("DELTA.CHECKPOINTINTERVAL" -> "30"))
assertHasProp(tablePath, expProps = Map("delta.checkpointInterval" -> "30"))
}
}

def createUpdateTableWithProps(
tablePath: String,
createTable: Boolean = false,
props: Map[String, String] = null): Unit = {
createTxn(defaultEngine, tablePath, createTable, testSchema, Seq.empty, props)
.commit(defaultEngine, emptyIterable())
}

// TODO: this will be replaced with get API in the next PRs.
def assertHasProp(tablePath: String, expProps: Map[String, String]): Unit = {
val snapshot = Table.forPath(defaultEngine, tablePath)
.getLatestSnapshot(defaultEngine).asInstanceOf[SnapshotImpl]
expProps.foreach { case (key, value) =>
assert(snapshot.getMetadata.getConfiguration.get(key) === value, key)
}
}
}

0 comments on commit c01bb7c

Please sign in to comment.