Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Delta] Block alter command from overriding or unsetting coordinated commits properties #3573

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@
],
"sqlState" : "42613"
},
"DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS" : {
"message" : [
"ALTER cannot unset coordinated commits configurations. To downgrade a table from coordinated commits, please try again using `ALTER TABLE [table-name] DROP FEATURE 'coordinatedCommits-preview'`."
],
"sqlState" : "42616"
},
"DELTA_CANNOT_UPDATE_ARRAY_FIELD" : {
"message" : [
"Cannot update %1$s field %2$s type: update the element by updating %2$s.element"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ case class TestWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
}

val properties = Seq(TestRemovableWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
true
}
}
Expand All @@ -77,7 +78,8 @@ case class TestWriterWithHistoryValidationFeaturePreDowngradeCommand(table: Delt
}

val properties = Seq(TestRemovableWriterWithHistoryTruncationFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
true
}
}
Expand All @@ -95,7 +97,8 @@ case class TestReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
}

val properties = Seq(TestRemovableReaderWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
true
}
}
Expand All @@ -107,7 +110,8 @@ case class TestLegacyWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
if (TestRemovableLegacyWriterFeature.validateRemoval(table.initialSnapshot)) return false

val properties = Seq(TestRemovableLegacyWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
true
}
}
Expand All @@ -119,7 +123,8 @@ case class TestLegacyReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
if (TestRemovableLegacyReaderWriterFeature.validateRemoval(table.initialSnapshot)) return false

val properties = Seq(TestRemovableLegacyReaderWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
true
}
}
Expand Down Expand Up @@ -251,7 +256,11 @@ case class CoordinatedCommitsPreDowngradeCommand(table: DeltaTableV2)
traceRemovalNeeded = true
try {
AlterTableUnsetPropertiesDeltaCommand(
table, CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS, ifExists = true).run(table.spark)
table,
CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS,
ifExists = true,
fromDropFeatureCommand = true
).run(table.spark)
} catch {
case NonFatal(e) =>
exceptionOpt = Some(e)
Expand Down Expand Up @@ -304,7 +313,8 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)

val startTimeNs = System.nanoTime()
val properties = Seq(DeltaConfigs.ENABLE_TYPE_WIDENING.key)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
val numFilesRewritten = rewriteFilesIfNeeded()
val metadataRemoved = removeMetadataIfNeeded()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ case class CreateDeltaTableCommand(

val tableLocation = getDeltaTablePath(tableWithLocation)
val deltaLog = DeltaLog.forTable(sparkSession, tableLocation)
CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurations(
sparkSession, deltaLog, query, tableWithLocation.properties)
CoordinatedCommitsUtils.validateConfigurationsForCreateDeltaTableCommand(
sparkSession, deltaLog.tableExists, query, tableWithLocation.properties)

recordDeltaOperation(deltaLog, "delta.ddl.createTable") {
val result = handleCommit(sparkSession, deltaLog, tableWithLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.backfill.RowTrackingBackfillCommand
import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand
import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints}
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.schema.SchemaUtils.transformSchema
Expand Down Expand Up @@ -161,6 +162,8 @@ case class AlterTableSetPropertiesDeltaCommand(
true
}.toMap

CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand(
metadata.configuration, filteredConfs)
val newMetadata = metadata.copy(
description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description),
configuration = metadata.configuration ++ filteredConfs)
Expand Down Expand Up @@ -193,7 +196,8 @@ case class AlterTableSetPropertiesDeltaCommand(
case class AlterTableUnsetPropertiesDeltaCommand(
table: DeltaTableV2,
propKeys: Seq[String],
ifExists: Boolean)
ifExists: Boolean,
fromDropFeatureCommand: Boolean = false)
extends LeafRunnableCommand with AlterDeltaTableCommand with IgnoreCachedData {

override def run(sparkSession: SparkSession): Seq[Row] = {
Expand Down Expand Up @@ -223,6 +227,10 @@ case class AlterTableUnsetPropertiesDeltaCommand(
}
}

if (!fromDropFeatureCommand) {
CoordinatedCommitsUtils.validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand(
metadata.configuration, normalizedKeys)
}
val newConfiguration = metadata.configuration.filterNot {
case (key, _) => normalizedKeys.contains(key)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,16 +445,63 @@ object CoordinatedCommitsUtils extends DeltaLogging {
}
}

/**
* Validates the Coordinated Commits configurations in explicit command overrides for
* `AlterTableSetPropertiesDeltaCommand`.
*
* If the table already has Coordinated Commits configurations present, then we do not allow
* users to override them via `ALTER TABLE t SET TBLPROPERTIES ...`. Users must downgrade the
* table and then upgrade it with the new Coordinated Commits configurations.
*/
def validateConfigurationsForAlterTableSetPropertiesDeltaCommand(
existingConfs: Map[String, String],
propertyOverrides: Map[String, String]): Unit = {
val existingCoordinatedCommitsConfs = extractCoordinatedCommitsConfigurations(existingConfs)
val coordinatedCommitsOverrides = extractCoordinatedCommitsConfigurations(propertyOverrides)
if (coordinatedCommitsOverrides.nonEmpty) {
if (existingCoordinatedCommitsConfs.nonEmpty) {
throw new DeltaIllegalArgumentException(
"DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS",
Array("ALTER"))
}
verifyContainsOnlyCoordinatorNameAndConf(
coordinatedCommitsOverrides, command = "ALTER", fromDefault = false)
}
}

/**
* Validates the configurations to unset for `AlterTableUnsetPropertiesDeltaCommand`.
*
* If the table already has Coordinated Commits configurations present, then we do not allow users
* to unset them via `ALTER TABLE t UNSET TBLPROPERTIES ...`. Users could only downgrade the table
* via `ALTER TABLE t DROP FEATURE ...`.
*/
def validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand(
existingConfs: Map[String, String],
propKeysToUnset: Seq[String]): Unit = {
// If the table does not have any Coordinated Commits configurations, then we do not check the
// properties to unset. This is because unsetting non-existent entries would either be caught
// earlier (without `IF EXISTS`) or simply be a no-op (with `IF EXISTS`). Thus, we ignore them
// instead of throwing an exception.
if (extractCoordinatedCommitsConfigurations(existingConfs).nonEmpty) {
if (propKeysToUnset.exists(TABLE_PROPERTY_KEYS.contains)) {
throw new DeltaIllegalArgumentException(
"DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS",
Array.empty)
}
}
}

/**
* Validates the Coordinated Commits configurations in explicit command overrides and default
* SparkSession properties. See `validateCoordinatedCommitsConfigurationsImpl` for details.
* SparkSession properties for `CreateDeltaTableCommand`.
* See `validateConfigurationsForCreateDeltaTableCommandImpl` for details.
*/
def validateCoordinatedCommitsConfigurations(
def validateConfigurationsForCreateDeltaTableCommand(
spark: SparkSession,
deltaLog: DeltaLog,
tableExists: Boolean,
query: Option[LogicalPlan],
catalogTableProperties: Map[String, String]): Unit = {
val tableExists = deltaLog.tableExists
val (command, propertyOverrides) = query match {
// For CLONE, we cannot use the properties from the catalog table, because they are already
// the result of merging the source table properties with the overrides, but we do not
Expand All @@ -464,7 +511,7 @@ object CoordinatedCommitsUtils extends DeltaLogging {
cmd.tablePropertyOverrides)
case _ => (if (tableExists) "REPLACE" else "CREATE", catalogTableProperties)
}
validateCoordinatedCommitsConfigurationsImpl(
validateConfigurationsForCreateDeltaTableCommandImpl(
spark, propertyOverrides, tableExists, command)
}

Expand All @@ -476,7 +523,7 @@ object CoordinatedCommitsUtils extends DeltaLogging {
* the Coordinator Name and Coordinator Conf, and no Table Conf. Default configurations are
* checked similarly if non of the three properties is present in explicit overrides.
*/
private[delta] def validateCoordinatedCommitsConfigurationsImpl(
private[delta] def validateConfigurationsForCreateDeltaTableCommandImpl(
spark: SparkSession,
propertyOverrides: Map[String, String],
tableExists: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ class DeltaTableFeatureSuite
// Add coordinated commits table feature to the table
CommitCoordinatorProvider.registerBuilder(InMemoryCommitCoordinatorBuilder(batchSize = 100))
val tblProperties1 =
Seq(s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'in-memory'")
Seq(s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'in-memory'",
s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '{}'")
sql(buildTablePropertyModifyingCommand(
"ALTER", targetTableName = table, sourceTableName = table, tblProperties1))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class CoordinatedCommitsEnablementSuite
val log = DeltaLog.forTable(spark, tablePath)
validateCoordinatedCommitsCompleteEnablement(log.snapshot, expectEnabled = false)
sql(s"ALTER TABLE delta.`$tablePath` SET TBLPROPERTIES " + // Enable CC
s"('${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory')")
s"('${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " +
s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '{}')")
Seq(1).toDF().write.format("delta").mode("overwrite").save(tablePath) // commit 3
validateCoordinatedCommitsCompleteEnablement(log.update(), expectEnabled = true)
}
Expand Down
Loading
Loading