Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Allow setting arbitrary properties when creating/updating the table #4107

Merged
merged 4 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,32 +191,41 @@ public class TableConfig<T> {
///////////////////////////

/**
* Validates that the given properties have the delta prefix in the key name, and they are in the
* set of valid properties. The caller should get the validated configurations and store the case
* of the property name defined in TableConfig.
* Validates that the given new properties that the txn is trying to update in table. Properties
* that have `delta.` prefix in the key name should be in valid list and are editable. The caller
* is expected to store the returned properties in the table metadata after further validation
* from a protocol point of view. The returned properties will have the key's case normalized as
* defined in its {@link TableConfig}.
vkorukanti marked this conversation as resolved.
Show resolved Hide resolved
*
* @param configurations the properties to validate
* @param newProperties the properties to validate
* @throws InvalidConfigurationValueException if any of the properties are invalid
* @throws UnknownConfigurationException if any of the properties are unknown
*/
public static Map<String, String> validateProperties(Map<String, String> configurations) {
Map<String, String> validatedConfigurations = new HashMap<>();
for (Map.Entry<String, String> kv : configurations.entrySet()) {
public static Map<String, String> validateDeltaProperties(Map<String, String> newProperties) {
Map<String, String> validatedProperties = new HashMap<>();
for (Map.Entry<String, String> kv : newProperties.entrySet()) {
String key = kv.getKey().toLowerCase(Locale.ROOT);
String value = kv.getValue();
if (key.startsWith("delta.") && VALID_PROPERTIES.containsKey(key)) {

if (key.startsWith("delta.")) {
// If it is a delta table property, make sure it is a supported property and editable
if (!VALID_PROPERTIES.containsKey(key)) {
throw DeltaErrors.unknownConfigurationException(kv.getKey());
}

TableConfig<?> tableConfig = VALID_PROPERTIES.get(key);
if (tableConfig.editable) {
tableConfig.validate(value);
validatedConfigurations.put(tableConfig.getKey(), value);
} else {
if (!tableConfig.editable) {
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
throw DeltaErrors.cannotModifyTableProperty(kv.getKey());
}

tableConfig.validate(value);
validatedProperties.put(tableConfig.getKey(), value);
} else {
throw DeltaErrors.unknownConfigurationException(kv.getKey());
// allow unknown properties to be set
validatedProperties.put(key, value);
}
}
return validatedConfigurations;
return validatedProperties;
}

private static void addConfig(HashMap<String, TableConfig<?>> configs, TableConfig<?> config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public Transaction build(Engine engine) {
Protocol protocol = snapshot.getProtocol();
if (tableProperties.isPresent()) {
Map<String, String> validatedProperties =
TableConfig.validateProperties(tableProperties.get());
TableConfig.validateDeltaProperties(tableProperties.get());
Map<String, String> newProperties =
metadata.filterOutUnchangedProperties(validatedProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
class TableConfigSuite extends AnyFunSuite {

test("check TableConfig.editable is true") {
TableConfig.validateProperties(
TableConfig.validateDeltaProperties(
Map(
TableConfig.TOMBSTONE_RETENTION.getKey -> "interval 2 week",
TableConfig.CHECKPOINT_INTERVAL.getKey -> "20",
Expand All @@ -36,7 +36,7 @@ class TableConfigSuite extends AnyFunSuite {

test("check TableConfig.MAX_COLUMN_ID.editable is false") {
val e = intercept[KernelException] {
TableConfig.validateProperties(
TableConfig.validateDeltaProperties(
Map(
TableConfig.TOMBSTONE_RETENTION.getKey -> "interval 2 week",
TableConfig.CHECKPOINT_INTERVAL.getKey -> "20",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,29 +292,6 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("create table - invalid properties - expect failure") {
withTempDirAndEngine { (tablePath, engine) =>
val ex1 = intercept[UnknownConfigurationException] {
createTxn(
engine, tablePath, isNewTable = true, testSchema, Seq.empty, Map("invalid key" -> "10"))
}
assert(ex1.getMessage.contains("Unknown configuration was specified: invalid key"))

val ex2 = intercept[InvalidConfigurationValueException] {
createTxn(
engine,
tablePath,
isNewTable = true,
testSchema, Seq.empty, Map(TableConfig.CHECKPOINT_INTERVAL.getKey -> "-1"))
}
assert(
ex2.getMessage.contains(
String.format(
"Invalid value for table property '%s': '%s'. %s",
TableConfig.CHECKPOINT_INTERVAL.getKey, "-1", "needs to be a positive integer.")))
}
}

test("create partitioned table - partition column is not part of the schema") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults

import io.delta.kernel.Table
import io.delta.kernel.exceptions.UnknownConfigurationException
import io.delta.kernel.internal.SnapshotImpl
import io.delta.kernel.utils.CloseableIterable.emptyIterable

import scala.collection.immutable.Seq

/**
* Suite to set or get table properties.
* TODO: for now we just have the support for `set`. API `get` will be added in the next PRs.
*/
class TablePropertiesSuite extends DeltaTableWriteSuiteBase {
test("create/update table - allow arbitrary properties") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath

// create table with arbitrary properties and check if they are set
createUpdateTableWithProps(
tablePath,
createTable = true,
props = Map("my key" -> "10", "my key2" -> "20")
)
assertHasProp(tablePath, expProps = Map("my key" -> "10", "my key2" -> "20"))

// update table by modifying the arbitrary properties and check if they are updated
createUpdateTableWithProps(tablePath, props = Map("my key" -> "30"))
assertHasProp(tablePath, expProps = Map("my key" -> "30", "my key2" -> "20"))

// update table without any new properties and check if the existing properties are retained
createUpdateTableWithProps(tablePath)
assertHasProp(tablePath, expProps = Map("my key" -> "30", "my key2" -> "20"))

// update table by adding new arbitrary properties and check if they are set
createUpdateTableWithProps(tablePath, props = Map("new key3" -> "str"))
assertHasProp(
tablePath,
expProps = Map("my key" -> "30", "my key2" -> "20", "new key3" -> "str"))
}
}

test("create/update table - disallow unknown delta.* properties to Kernel") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath
val ex1 = intercept[UnknownConfigurationException] {
createUpdateTableWithProps(tablePath, createTable = true, Map("delta.unknown" -> "str"))
}
assert(ex1.getMessage.contains("Unknown configuration was specified: delta.unknown"))

// Try updating in an existing table
createUpdateTableWithProps(tablePath, createTable = true)
val ex2 = intercept[UnknownConfigurationException] {
createUpdateTableWithProps(tablePath, props = Map("Delta.unknown" -> "str"))
}
assert(ex2.getMessage.contains("Unknown configuration was specified: Delta.unknown"))
}
}

test("create/update table - delta configs are stored with same case as defined in TableConfig") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath
createUpdateTableWithProps(tablePath,
createTable = true,
Map("delta.CHECKPOINTINTERVAL" -> "20"))
assertHasProp(tablePath, expProps = Map("delta.checkpointInterval" -> "20"))

// Try updating in an existing table
createUpdateTableWithProps(
tablePath,
props = Map("DELTA.CHECKPOINTINTERVAL" -> "30"))
assertHasProp(tablePath, expProps = Map("delta.checkpointInterval" -> "30"))
}
}

def createUpdateTableWithProps(
tablePath: String,
createTable: Boolean = false,
props: Map[String, String] = null): Unit = {
createTxn(defaultEngine, tablePath, createTable, testSchema, Seq.empty, props)
.commit(defaultEngine, emptyIterable())
}

// TODO: this will be replaced with get API in the next PRs.
def assertHasProp(tablePath: String, expProps: Map[String, String]): Unit = {
val snapshot = Table.forPath(defaultEngine, tablePath)
.getLatestSnapshot(defaultEngine).asInstanceOf[SnapshotImpl]
expProps.foreach { case (key, value) =>
assert(snapshot.getMetadata.getConfiguration.get(key) === value, key)
}
}
}
Loading