Skip to content

Commit

Permalink
block alter command from overriding or unsetting coordinated commits …
Browse files Browse the repository at this point in the history
…properties
  • Loading branch information
yumingxuanguo-db committed Sep 3, 2024
1 parent cdd39dd commit a882cf9
Show file tree
Hide file tree
Showing 9 changed files with 514 additions and 234 deletions.
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@
],
"sqlState" : "42613"
},
"DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS" : {
"message" : [
"ALTER cannot unset coordinated commits configurations. To downgrade a table from coordinated commits, please try again using `ALTER TABLE [table-name] DROP FEATURE 'coordinatedCommits-preview'`."
],
"sqlState" : "42616"
},
"DELTA_CANNOT_UPDATE_ARRAY_FIELD" : {
"message" : [
"Cannot update %1$s field %2$s type: update the element by updating %2$s.element"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ case class TestWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
}

val properties = Seq(TestRemovableWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
true
}
}
Expand All @@ -77,7 +78,8 @@ case class TestWriterWithHistoryValidationFeaturePreDowngradeCommand(table: Delt
}

val properties = Seq(TestRemovableWriterWithHistoryTruncationFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
true
}
}
Expand All @@ -95,7 +97,8 @@ case class TestReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
}

val properties = Seq(TestRemovableReaderWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
true
}
}
Expand All @@ -107,7 +110,8 @@ case class TestLegacyWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
if (TestRemovableLegacyWriterFeature.validateRemoval(table.initialSnapshot)) return false

val properties = Seq(TestRemovableLegacyWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
true
}
}
Expand All @@ -119,7 +123,8 @@ case class TestLegacyReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
if (TestRemovableLegacyReaderWriterFeature.validateRemoval(table.initialSnapshot)) return false

val properties = Seq(TestRemovableLegacyReaderWriterFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
true
}
}
Expand Down Expand Up @@ -251,7 +256,11 @@ case class CoordinatedCommitsPreDowngradeCommand(table: DeltaTableV2)
traceRemovalNeeded = true
try {
AlterTableUnsetPropertiesDeltaCommand(
table, CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS, ifExists = true).run(table.spark)
table,
CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS,
ifExists = true,
fromDropFeatureCommand = true
).run(table.spark)
} catch {
case NonFatal(e) =>
exceptionOpt = Some(e)
Expand Down Expand Up @@ -304,7 +313,8 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)

val startTimeNs = System.nanoTime()
val properties = Seq(DeltaConfigs.ENABLE_TYPE_WIDENING.key)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
AlterTableUnsetPropertiesDeltaCommand(
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
val numFilesRewritten = rewriteFilesIfNeeded()
val metadataRemoved = removeMetadataIfNeeded()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ case class CreateDeltaTableCommand(

val tableLocation = getDeltaTablePath(tableWithLocation)
val deltaLog = DeltaLog.forTable(sparkSession, tableLocation)
CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurations(
sparkSession, deltaLog, query, tableWithLocation.properties)
CoordinatedCommitsUtils.validateConfigurationsForCreateDeltaTableCommand(
sparkSession, deltaLog.tableExists, query, tableWithLocation.properties)

recordDeltaOperation(deltaLog, "delta.ddl.createTable") {
val result = handleCommit(sparkSession, deltaLog, tableWithLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.backfill.RowTrackingBackfillCommand
import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand
import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints}
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.schema.SchemaUtils.transformSchema
Expand Down Expand Up @@ -161,6 +162,8 @@ case class AlterTableSetPropertiesDeltaCommand(
true
}.toMap

CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand(
metadata.configuration, filteredConfs)
val newMetadata = metadata.copy(
description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description),
configuration = metadata.configuration ++ filteredConfs)
Expand Down Expand Up @@ -193,7 +196,8 @@ case class AlterTableSetPropertiesDeltaCommand(
case class AlterTableUnsetPropertiesDeltaCommand(
table: DeltaTableV2,
propKeys: Seq[String],
ifExists: Boolean)
ifExists: Boolean,
fromDropFeatureCommand: Boolean = false)
extends LeafRunnableCommand with AlterDeltaTableCommand with IgnoreCachedData {

override def run(sparkSession: SparkSession): Seq[Row] = {
Expand Down Expand Up @@ -223,6 +227,10 @@ case class AlterTableUnsetPropertiesDeltaCommand(
}
}

if (!fromDropFeatureCommand) {
CoordinatedCommitsUtils.validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand(
metadata.configuration, normalizedKeys)
}
val newConfiguration = metadata.configuration.filterNot {
case (key, _) => normalizedKeys.contains(key)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,16 +445,63 @@ object CoordinatedCommitsUtils extends DeltaLogging {
}
}

/**
* Validates the Coordinated Commits configurations in explicit command overrides for
* `AlterTableSetPropertiesDeltaCommand`.
*
* If the table already has Coordinated Commits configurations present, then we do not allow
* users to override them via `ALTER TABLE t SET TBLPROPERTIES ...`. Users must downgrade the
* table and then upgrade it with the new Coordinated Commits configurations.
*/
def validateConfigurationsForAlterTableSetPropertiesDeltaCommand(
existingConfs: Map[String, String],
propertyOverrides: Map[String, String]): Unit = {
val existingCoordinatedCommitsConfs = extractCoordinatedCommitsConfigurations(existingConfs)
val coordinatedCommitsOverrides = extractCoordinatedCommitsConfigurations(propertyOverrides)
if (coordinatedCommitsOverrides.nonEmpty) {
if (existingCoordinatedCommitsConfs.nonEmpty) {
throw new DeltaIllegalArgumentException(
"DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS",
Array("ALTER"))
}
verifyContainsOnlyCoordinatorNameAndConf(
coordinatedCommitsOverrides, command = "ALTER", fromDefault = false)
}
}

/**
* Validates the configurations to unset for `AlterTableUnsetPropertiesDeltaCommand`.
*
* If the table already has Coordinated Commits configurations present, then we do not allow users
* to unset them via `ALTER TABLE t UNSET TBLPROPERTIES ...`. Users could only downgrade the table
* via `ALTER TABLE t DROP FEATURE ...`.
*/
def validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand(
existingConfs: Map[String, String],
propKeysToUnset: Seq[String]): Unit = {
// If the table does not have any Coordinated Commits configurations, then we do not check the
// properties to unset. This is because unsetting non-existent entries would either be caught
// earlier (without `IF EXISTS`) or simply be a no-op (with `IF EXISTS`). Thus, we ignore them
// instead of throwing an exception.
if (extractCoordinatedCommitsConfigurations(existingConfs).nonEmpty) {
if (propKeysToUnset.exists(TABLE_PROPERTY_KEYS.contains)) {
throw new DeltaIllegalArgumentException(
"DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS",
Array.empty)
}
}
}

/**
* Validates the Coordinated Commits configurations in explicit command overrides and default
* SparkSession properties. See `validateCoordinatedCommitsConfigurationsImpl` for details.
* SparkSession properties for `CreateDeltaTableCommand`.
* See `validateConfigurationsForCreateDeltaTableCommandImpl` for details.
*/
def validateCoordinatedCommitsConfigurations(
def validateConfigurationsForCreateDeltaTableCommand(
spark: SparkSession,
deltaLog: DeltaLog,
tableExists: Boolean,
query: Option[LogicalPlan],
catalogTableProperties: Map[String, String]): Unit = {
val tableExists = deltaLog.tableExists
val (command, propertyOverrides) = query match {
// For CLONE, we cannot use the properties from the catalog table, because they are already
// the result of merging the source table properties with the overrides, but we do not
Expand All @@ -464,7 +511,7 @@ object CoordinatedCommitsUtils extends DeltaLogging {
cmd.tablePropertyOverrides)
case _ => (if (tableExists) "REPLACE" else "CREATE", catalogTableProperties)
}
validateCoordinatedCommitsConfigurationsImpl(
validateConfigurationsForCreateDeltaTableCommandImpl(
spark, propertyOverrides, tableExists, command)
}

Expand All @@ -476,7 +523,7 @@ object CoordinatedCommitsUtils extends DeltaLogging {
* the Coordinator Name and Coordinator Conf, and no Table Conf. Default configurations are
* checked similarly if non of the three properties is present in explicit overrides.
*/
private[delta] def validateCoordinatedCommitsConfigurationsImpl(
private[delta] def validateConfigurationsForCreateDeltaTableCommandImpl(
spark: SparkSession,
propertyOverrides: Map[String, String],
tableExists: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ class DeltaTableFeatureSuite
// Add coordinated commits table feature to the table
CommitCoordinatorProvider.registerBuilder(InMemoryCommitCoordinatorBuilder(batchSize = 100))
val tblProperties1 =
Seq(s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'in-memory'")
Seq(s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'in-memory'",
s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '{}'")
sql(buildTablePropertyModifyingCommand(
"ALTER", targetTableName = table, sourceTableName = table, tblProperties1))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class CoordinatedCommitsEnablementSuite
val log = DeltaLog.forTable(spark, tablePath)
validateCoordinatedCommitsCompleteEnablement(log.snapshot, expectEnabled = false)
sql(s"ALTER TABLE delta.`$tablePath` SET TBLPROPERTIES " + // Enable CC
s"('${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory')")
s"('${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " +
s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '{}')")
Seq(1).toDF().write.format("delta").mode("overwrite").save(tablePath) // commit 3
validateCoordinatedCommitsCompleteEnablement(log.update(), expectEnabled = true)
}
Expand Down
Loading

0 comments on commit a882cf9

Please sign in to comment.