diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 3536ed565d..279a45c1b2 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -279,6 +279,12 @@ ], "sqlState" : "42613" }, + "DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS" : { + "message" : [ + "ALTER cannot unset coordinated commits configurations. To downgrade a table from coordinated commits, please try again using `ALTER TABLE [table-name] DROP FEATURE 'coordinatedCommits-preview'`." + ], + "sqlState" : "42616" + }, "DELTA_CANNOT_UPDATE_ARRAY_FIELD" : { "message" : [ "Cannot update %1$s field %2$s type: update the element by updating %2$s.element" diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index bc1f024209..5c0e506396 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -61,7 +61,8 @@ case class TestWriterFeaturePreDowngradeCommand(table: DeltaTableV2) } val properties = Seq(TestRemovableWriterFeature.TABLE_PROP_KEY) - AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) + AlterTableUnsetPropertiesDeltaCommand( + table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) true } } @@ -77,7 +78,8 @@ case class TestWriterWithHistoryValidationFeaturePreDowngradeCommand(table: Delt } val properties = Seq(TestRemovableWriterWithHistoryTruncationFeature.TABLE_PROP_KEY) - AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) + AlterTableUnsetPropertiesDeltaCommand( + table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) true } } @@ -95,7 +97,8 @@ case class TestReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2) } val properties = Seq(TestRemovableReaderWriterFeature.TABLE_PROP_KEY) - AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) + AlterTableUnsetPropertiesDeltaCommand( + table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) true } } @@ -107,7 +110,8 @@ case class TestLegacyWriterFeaturePreDowngradeCommand(table: DeltaTableV2) if (TestRemovableLegacyWriterFeature.validateRemoval(table.initialSnapshot)) return false val properties = Seq(TestRemovableLegacyWriterFeature.TABLE_PROP_KEY) - AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) + AlterTableUnsetPropertiesDeltaCommand( + table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) true } } @@ -119,7 +123,8 @@ case class TestLegacyReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2) if (TestRemovableLegacyReaderWriterFeature.validateRemoval(table.initialSnapshot)) return false val properties = Seq(TestRemovableLegacyReaderWriterFeature.TABLE_PROP_KEY) - AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) + AlterTableUnsetPropertiesDeltaCommand( + table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) true } } @@ -251,7 +256,11 @@ case class CoordinatedCommitsPreDowngradeCommand(table: DeltaTableV2) traceRemovalNeeded = true try { AlterTableUnsetPropertiesDeltaCommand( - table, CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS, ifExists = true).run(table.spark) + table, + CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS, + ifExists = true, + fromDropFeatureCommand = true + ).run(table.spark) } catch { case NonFatal(e) => exceptionOpt = Some(e) @@ -304,7 +313,8 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) val startTimeNs = System.nanoTime() val properties = Seq(DeltaConfigs.ENABLE_TYPE_WIDENING.key) - AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) + AlterTableUnsetPropertiesDeltaCommand( + table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) val numFilesRewritten = rewriteFilesIfNeeded() val metadataRemoved = removeMetadataIfNeeded() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index dfe3d89628..f71deb5146 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -135,8 +135,8 @@ case class CreateDeltaTableCommand( val tableLocation = getDeltaTablePath(tableWithLocation) val deltaLog = DeltaLog.forTable(sparkSession, tableLocation) - CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurations( - sparkSession, deltaLog, query, tableWithLocation.properties) + CoordinatedCommitsUtils.validateConfigurationsForCreateDeltaTableCommand( + sparkSession, deltaLog.tableExists, query, tableWithLocation.properties) recordDeltaOperation(deltaLog, "delta.ddl.createTable") { val result = handleCommit(sparkSession, deltaLog, tableWithLocation) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index fa36f17738..ee671af7b0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.backfill.RowTrackingBackfillCommand import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints} +import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.schema.SchemaUtils.transformSchema @@ -161,6 +162,8 @@ case class AlterTableSetPropertiesDeltaCommand( true }.toMap + CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand( + metadata.configuration, filteredConfs) val newMetadata = metadata.copy( description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description), configuration = metadata.configuration ++ filteredConfs) @@ -193,7 +196,8 @@ case class AlterTableSetPropertiesDeltaCommand( case class AlterTableUnsetPropertiesDeltaCommand( table: DeltaTableV2, propKeys: Seq[String], - ifExists: Boolean) + ifExists: Boolean, + fromDropFeatureCommand: Boolean = false) extends LeafRunnableCommand with AlterDeltaTableCommand with IgnoreCachedData { override def run(sparkSession: SparkSession): Seq[Row] = { @@ -223,6 +227,10 @@ case class AlterTableUnsetPropertiesDeltaCommand( } } + if (!fromDropFeatureCommand) { + CoordinatedCommitsUtils.validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( + metadata.configuration, normalizedKeys) + } val newConfiguration = metadata.configuration.filterNot { case (key, _) => normalizedKeys.contains(key) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala index 0b045d4915..1593b5a616 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala @@ -445,16 +445,63 @@ object CoordinatedCommitsUtils extends DeltaLogging { } } + /** + * Validates the Coordinated Commits configurations in explicit command overrides for + * `AlterTableSetPropertiesDeltaCommand`. + * + * If the table already has Coordinated Commits configurations present, then we do not allow + * users to override them via `ALTER TABLE t SET TBLPROPERTIES ...`. Users must downgrade the + * table and then upgrade it with the new Coordinated Commits configurations. + */ + def validateConfigurationsForAlterTableSetPropertiesDeltaCommand( + existingConfs: Map[String, String], + propertyOverrides: Map[String, String]): Unit = { + val existingCoordinatedCommitsConfs = extractCoordinatedCommitsConfigurations(existingConfs) + val coordinatedCommitsOverrides = extractCoordinatedCommitsConfigurations(propertyOverrides) + if (coordinatedCommitsOverrides.nonEmpty) { + if (existingCoordinatedCommitsConfs.nonEmpty) { + throw new DeltaIllegalArgumentException( + "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", + Array("ALTER")) + } + verifyContainsOnlyCoordinatorNameAndConf( + coordinatedCommitsOverrides, command = "ALTER", fromDefault = false) + } + } + + /** + * Validates the configurations to unset for `AlterTableUnsetPropertiesDeltaCommand`. + * + * If the table already has Coordinated Commits configurations present, then we do not allow users + * to unset them via `ALTER TABLE t UNSET TBLPROPERTIES ...`. Users could only downgrade the table + * via `ALTER TABLE t DROP FEATURE ...`. + */ + def validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( + existingConfs: Map[String, String], + propKeysToUnset: Seq[String]): Unit = { + // If the table does not have any Coordinated Commits configurations, then we do not check the + // properties to unset. This is because unsetting non-existent entries would either be caught + // earlier (without `IF EXISTS`) or simply be a no-op (with `IF EXISTS`). Thus, we ignore them + // instead of throwing an exception. + if (extractCoordinatedCommitsConfigurations(existingConfs).nonEmpty) { + if (propKeysToUnset.exists(TABLE_PROPERTY_KEYS.contains)) { + throw new DeltaIllegalArgumentException( + "DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS", + Array.empty) + } + } + } + /** * Validates the Coordinated Commits configurations in explicit command overrides and default - * SparkSession properties. See `validateCoordinatedCommitsConfigurationsImpl` for details. + * SparkSession properties for `CreateDeltaTableCommand`. + * See `validateConfigurationsForCreateDeltaTableCommandImpl` for details. */ - def validateCoordinatedCommitsConfigurations( + def validateConfigurationsForCreateDeltaTableCommand( spark: SparkSession, - deltaLog: DeltaLog, + tableExists: Boolean, query: Option[LogicalPlan], catalogTableProperties: Map[String, String]): Unit = { - val tableExists = deltaLog.tableExists val (command, propertyOverrides) = query match { // For CLONE, we cannot use the properties from the catalog table, because they are already // the result of merging the source table properties with the overrides, but we do not @@ -464,7 +511,7 @@ object CoordinatedCommitsUtils extends DeltaLogging { cmd.tablePropertyOverrides) case _ => (if (tableExists) "REPLACE" else "CREATE", catalogTableProperties) } - validateCoordinatedCommitsConfigurationsImpl( + validateConfigurationsForCreateDeltaTableCommandImpl( spark, propertyOverrides, tableExists, command) } @@ -476,7 +523,7 @@ object CoordinatedCommitsUtils extends DeltaLogging { * the Coordinator Name and Coordinator Conf, and no Table Conf. Default configurations are * checked similarly if non of the three properties is present in explicit overrides. */ - private[delta] def validateCoordinatedCommitsConfigurationsImpl( + private[delta] def validateConfigurationsForCreateDeltaTableCommandImpl( spark: SparkSession, propertyOverrides: Map[String, String], tableExists: Boolean, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala index 261aa06c6e..9d7c826f81 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala @@ -469,7 +469,8 @@ class DeltaTableFeatureSuite // Add coordinated commits table feature to the table CommitCoordinatorProvider.registerBuilder(InMemoryCommitCoordinatorBuilder(batchSize = 100)) val tblProperties1 = - Seq(s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'in-memory'") + Seq(s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'in-memory'", + s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '{}'") sql(buildTablePropertyModifyingCommand( "ALTER", targetTableName = table, sourceTableName = table, tblProperties1)) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsEnablementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsEnablementSuite.scala index 9efd838c44..1461b60fb3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsEnablementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsEnablementSuite.scala @@ -92,7 +92,8 @@ class CoordinatedCommitsEnablementSuite val log = DeltaLog.forTable(spark, tablePath) validateCoordinatedCommitsCompleteEnablement(log.snapshot, expectEnabled = false) sql(s"ALTER TABLE delta.`$tablePath` SET TBLPROPERTIES " + // Enable CC - s"('${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory')") + s"('${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " + + s"'${DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '{}')") Seq(1).toDF().write.format("delta").mode("overwrite").save(tablePath) // commit 3 validateCoordinatedCommitsCompleteEnablement(log.update(), expectEnabled = true) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala index 818bbfe832..ae52064625 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.delta.LogSegment import org.apache.spark.sql.delta.Snapshot import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaExceptionTestUtils import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import org.apache.spark.sql.delta.test.DeltaTestImplicits._ @@ -46,7 +47,6 @@ import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, GetComm import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.scalatest.Tag import org.apache.spark.SparkConf import org.apache.spark.sql.{QueryTest, Row, SparkSession} @@ -58,7 +58,8 @@ class CoordinatedCommitsSuite with DeltaSQLTestUtils with SharedSparkSession with DeltaSQLCommandTest - with CoordinatedCommitsTestUtils { + with CoordinatedCommitsTestUtils + with DeltaExceptionTestUtils { import testImplicits._ @@ -1341,7 +1342,7 @@ class CoordinatedCommitsSuite } ///////////////////////////////////////////////////////////////////////////////////////////// - // Test coordinated-commits with DeltaLog.getChangeLogFile API starts // + // Test coordinated-commits with DeltaLog.getChangeLogFile API starts // ///////////////////////////////////////////////////////////////////////////////////////////// /** @@ -1567,227 +1568,46 @@ class CoordinatedCommitsSuite } ///////////////////////////////////////////////////////////////////////////////////////////// - // Test coordinated-commits with DeltaLog.getChangeLogFile API ENDS // + // Test coordinated-commits with DeltaLog.getChangeLogFile API ENDS // ///////////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////////// - // Test CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl STARTS // - ///////////////////////////////////////////////////////////////////////////////////////////// + test("During ALTER, overriding Coordinated Commits configurations throws an exception.") { + CommitCoordinatorProvider.registerBuilder(TrackingInMemoryCommitCoordinatorBuilder(1)) + CommitCoordinatorProvider.registerBuilder(InMemoryCommitCoordinatorBuilder(1)) - def gridTest[A](testNamePrefix: String, testTags: Tag*)(params: Seq[A])( - testFun: A => Unit): Unit = { - for (param <- params) { - test(testNamePrefix + s" ($param)", testTags: _*)(testFun(param)) + withTempDir { tempDir => + sql(s"CREATE TABLE delta.`${tempDir.getAbsolutePath}` (id LONG) USING delta TBLPROPERTIES" + + s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " + + s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}')") + val e = interceptWithUnwrapping[DeltaIllegalArgumentException] { + sql(s"ALTER TABLE delta.`${tempDir.getAbsolutePath}` SET TBLPROPERTIES" + + s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'in-memory', " + + s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}')") + } + checkError( + exception = e, + errorClass = "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", + sqlState = "42616", + parameters = Map("Command" -> "ALTER")) } } - private val cNameKey = COORDINATED_COMMITS_COORDINATOR_NAME.key - private val cConfKey = COORDINATED_COMMITS_COORDINATOR_CONF.key - private val tableConfKey = COORDINATED_COMMITS_TABLE_CONF.key - private val cName = cNameKey -> "some-cc-name" - private val cConf = cConfKey -> "some-cc-conf" - private val tableConf = tableConfKey -> "some-table-conf" - - private val cNameDefaultKey = COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey - private val cConfDefaultKey = COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey - private val tableConfDefaultKey = COORDINATED_COMMITS_TABLE_CONF.defaultTablePropertyKey - private val cNameDefault = cNameDefaultKey -> "some-cc-name" - private val cConfDefault = cConfDefaultKey -> "some-cc-conf" - private val tableConfDefault = tableConfDefaultKey -> "some-table-conf" - - private val command = "CLONE" - - private val errCannotOverride = new DeltaIllegalArgumentException( - "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", Array(command)) - - private def errMissingConfInCommand(key: String) = new DeltaIllegalArgumentException( - "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_COMMAND", Array(command, key)) - - private def errMissingConfInSession(key: String) = new DeltaIllegalArgumentException( - "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_SESSION", Array(command, key)) - - private def errTableConfInCommand = new DeltaIllegalArgumentException( - "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND", Array(command, tableConfKey)) - - private def errTableConfInSession = new DeltaIllegalArgumentException( - "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_SESSION", - Array(command, tableConfDefaultKey, tableConfDefaultKey)) - - private def testValidation( - tableExists: Boolean, - propertyOverrides: Map[String, String], - defaultConfs: Seq[(String, String)], - errorOpt: Option[DeltaIllegalArgumentException]): Unit = { - withoutCoordinatedCommitsDefaultTableProperties { - withSQLConf(defaultConfs: _*) { - if (errorOpt.isDefined) { - val e = intercept[DeltaIllegalArgumentException] { - CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl( - spark, propertyOverrides, tableExists, command) - } - assert(e.getMessage.contains(errorOpt.get.getMessage)) - } else { - CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl( - spark, propertyOverrides, tableExists, command) - } + test("During ALTER, unsetting Coordinated Commits configurations throws an exception.") { + CommitCoordinatorProvider.registerBuilder(TrackingInMemoryCommitCoordinatorBuilder(1)) + withTempDir { tempDir => + sql(s"CREATE TABLE delta.`${tempDir.getAbsolutePath}` (id LONG) USING delta TBLPROPERTIES" + + s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " + + s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}')") + val e = interceptWithUnwrapping[DeltaIllegalArgumentException] { + sql(s"ALTER TABLE delta.`${tempDir.getAbsolutePath}` UNSET TBLPROPERTIES" + + s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}', " + + s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}')") } + checkError( + exception = e, + errorClass = "DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS", + sqlState = "42616", + parameters = Map[String, String]()) } } - - // tableExists: True - // | False - // - // propertyOverrides: Map.empty - // | Map(cName) - // | Map(cName, cConf) - // | Map(cName, cConf, tableConf) - // | Map(tableConf) - // - // defaultConf: Seq.empty - // | Seq(cNameDefault) - // | Seq(cNameDefault, cConfDefault) - // | Seq(cNameDefault, cConfDefault, tableConfDefault) - // | Seq(tableConfDefault) - // - // errorOpt: None - // | Some(errCannotOverride) - // | Some(errMissingConfInCommand(cConfKey)) - // | Some(errMissingConfInSession(cConfKey)) - // | Some(errTableConfInCommand) - // | Some(errTableConfInSession) - - gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + - "passes for existing target tables with no explicit Coordinated Commits Configurations.") ( - Seq( - Seq.empty, - // Not having any explicit Coordinated Commits configurations, but having an illegal - // combination of Coordinated Commits configurations in default: pass. - // This is because we don't consider default configurations when the table exists. - Seq(cNameDefault), - Seq(cNameDefault, cConfDefault), - Seq(cNameDefault, cConfDefault, tableConfDefault), - Seq(tableConfDefault) - ) - ) { defaultConfs: Seq[(String, String)] => - testValidation( - tableExists = true, - propertyOverrides = Map.empty, - defaultConfs, - errorOpt = None) - } - - gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + - "fails for existing target tables with any explicit Coordinated Commits Configurations.") ( - Seq( - (Map(cName), Seq.empty), - (Map(cName), Seq(cNameDefault)), - (Map(cName), Seq(cNameDefault, cConfDefault)), - (Map(cName), Seq(cNameDefault, cConfDefault, tableConfDefault)), - (Map(cName), Seq(tableConfDefault)), - - (Map(cName, cConf), Seq.empty), - (Map(cName, cConf), Seq(cNameDefault)), - (Map(cName, cConf), Seq(cNameDefault, cConfDefault)), - (Map(cName, cConf), Seq(cNameDefault, cConfDefault, tableConfDefault)), - (Map(cName, cConf), Seq(tableConfDefault)), - - (Map(cName, cConf, tableConf), Seq.empty), - (Map(cName, cConf, tableConf), Seq(cNameDefault)), - (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault)), - (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault)), - (Map(cName, cConf, tableConf), Seq(tableConfDefault)), - - (Map(tableConf), Seq.empty), - (Map(tableConf), Seq(cNameDefault)), - (Map(tableConf), Seq(cNameDefault, cConfDefault)), - (Map(tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault)), - (Map(tableConf), Seq(tableConfDefault)) - ) - ) { case ( - propertyOverrides: Map[String, String], - defaultConfs: Seq[(String, String)]) => - testValidation( - tableExists = true, - propertyOverrides, - defaultConfs, - errorOpt = Some(errCannotOverride)) - } - - gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + - "works correctly for new target tables with default Coordinated Commits Configurations.") ( - Seq( - (Seq.empty, None), - (Seq(cNameDefault), Some(errMissingConfInSession(cConfDefaultKey))), - (Seq(cNameDefault, cConfDefault), None), - (Seq(cNameDefault, cConfDefault, tableConfDefault), Some(errTableConfInSession)), - (Seq(tableConfDefault), Some(errTableConfInSession)) - ) - ) { case ( - defaultConfs: Seq[(String, String)], - errorOpt: Option[DeltaIllegalArgumentException]) => - testValidation( - tableExists = false, - propertyOverrides = Map.empty, - defaultConfs, - errorOpt) - } - - gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + - "fails for new target tables with any illegal explicit Coordinated Commits Configurations.") ( - Seq( - (Map(cName), Seq.empty, Some(errMissingConfInCommand(cConfKey))), - (Map(cName), Seq(cNameDefault), Some(errMissingConfInCommand(cConfKey))), - (Map(cName), Seq(cNameDefault, cConfDefault), Some(errMissingConfInCommand(cConfKey))), - (Map(cName), Seq(cNameDefault, cConfDefault, tableConfDefault), - Some(errMissingConfInCommand(cConfKey))), - (Map(cName), Seq(tableConfDefault), Some(errMissingConfInCommand(cConfKey))), - - (Map(cName, cConf, tableConf), Seq.empty, Some(errTableConfInCommand)), - (Map(cName, cConf, tableConf), Seq(cNameDefault), Some(errTableConfInCommand)), - (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault), Some(errTableConfInCommand)), - (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault), - Some(errTableConfInCommand)), - (Map(cName, cConf, tableConf), Seq(tableConfDefault), Some(errTableConfInCommand)), - - (Map(tableConf), Seq.empty, Some(errTableConfInCommand)), - (Map(tableConf), Seq(cNameDefault), Some(errTableConfInCommand)), - (Map(tableConf), Seq(cNameDefault, cConfDefault), Some(errTableConfInCommand)), - (Map(tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault), - Some(errTableConfInCommand)), - (Map(tableConf), Seq(tableConfDefault), Some(errTableConfInCommand)) - ) - ) { case ( - propertyOverrides: Map[String, String], - defaultConfs: Seq[(String, String)], - errorOpt: Option[DeltaIllegalArgumentException]) => - testValidation( - tableExists = false, - propertyOverrides, - defaultConfs, - errorOpt) - } - - gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + - "passes for new target tables with legal explicit Coordinated Commits Configurations.") ( - Seq( - // Having exactly Coordinator Name and Coordinator Conf explicitly, but having an illegal - // combination of Coordinated Commits configurations in default: pass. - // This is because we don't consider default configurations when explicit ones are provided. - Seq.empty, - Seq(cNameDefault), - Seq(cNameDefault, cConfDefault), - Seq(cNameDefault, cConfDefault, tableConfDefault), - Seq(tableConfDefault) - ) - ) { defaultConfs: Seq[(String, String)] => - testValidation( - tableExists = false, - propertyOverrides = Map(cName, cConf), - defaultConfs, - errorOpt = None) - } - - ///////////////////////////////////////////////////////////////////////////////////////////// - // Test CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl ENDS // - ///////////////////////////////////////////////////////////////////////////////////////////// } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtilsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtilsSuite.scala new file mode 100644 index 0000000000..081e2ee7c9 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtilsSuite.scala @@ -0,0 +1,387 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.coordinatedcommits + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.delta.DeltaConfigs.{COORDINATED_COMMITS_COORDINATOR_CONF, COORDINATED_COMMITS_COORDINATOR_NAME, COORDINATED_COMMITS_TABLE_CONF} +import org.apache.spark.sql.delta.DeltaIllegalArgumentException +import org.scalatest.Tag + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +class CoordinatedCommitsUtilsSuite extends QueryTest + with SharedSparkSession + with CoordinatedCommitsTestUtils { + + ///////////////////////////////////////////////////////////////////////////////////////////// + // Test CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl STARTS // + ///////////////////////////////////////////////////////////////////////////////////////////// + + def gridTest[A](testNamePrefix: String, testTags: Tag*)(params: Seq[A])( + testFun: A => Unit): Unit = { + for (param <- params) { + test(testNamePrefix + s" ($param)", testTags: _*)(testFun(param)) + } + } + + private val cNameKey = COORDINATED_COMMITS_COORDINATOR_NAME.key + private val cConfKey = COORDINATED_COMMITS_COORDINATOR_CONF.key + private val tableConfKey = COORDINATED_COMMITS_TABLE_CONF.key + private val cName = cNameKey -> "some-cc-name" + private val cConf = cConfKey -> "some-cc-conf" + private val tableConf = tableConfKey -> "some-table-conf" + + private val cNameDefaultKey = COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey + private val cConfDefaultKey = COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey + private val tableConfDefaultKey = COORDINATED_COMMITS_TABLE_CONF.defaultTablePropertyKey + private val cNameDefault = cNameDefaultKey -> "some-cc-name" + private val cConfDefault = cConfDefaultKey -> "some-cc-conf" + private val tableConfDefault = tableConfDefaultKey -> "some-table-conf" + + private val command = "CLONE" + + private def errCannotOverride = new DeltaIllegalArgumentException( + "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", Array(command)) + + private def errMissingConfInCommand(key: String) = new DeltaIllegalArgumentException( + "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_COMMAND", Array(command, key)) + + private def errMissingConfInSession(key: String) = new DeltaIllegalArgumentException( + "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_SESSION", Array(command, key)) + + private def errTableConfInCommand = new DeltaIllegalArgumentException( + "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND", Array(command, tableConfKey)) + + private def errTableConfInSession = new DeltaIllegalArgumentException( + "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_SESSION", + Array(command, tableConfDefaultKey, tableConfDefaultKey)) + + private def testValidationForCreateDeltaTableCommand( + tableExists: Boolean, + propertyOverrides: Map[String, String], + defaultConfs: Seq[(String, String)], + errorOpt: Option[DeltaIllegalArgumentException]): Unit = { + withoutCoordinatedCommitsDefaultTableProperties { + withSQLConf(defaultConfs: _*) { + if (errorOpt.isDefined) { + val e = intercept[DeltaIllegalArgumentException] { + CoordinatedCommitsUtils.validateConfigurationsForCreateDeltaTableCommandImpl( + spark, propertyOverrides, tableExists, command) + } + checkError( + exception = e, + errorClass = errorOpt.get.getErrorClass, + sqlState = errorOpt.get.getSqlState, + parameters = errorOpt.get.getMessageParameters.asScala.toMap) + } else { + CoordinatedCommitsUtils.validateConfigurationsForCreateDeltaTableCommandImpl( + spark, propertyOverrides, tableExists, command) + } + } + } + } + + // tableExists: True + // | False + // + // propertyOverrides: Map.empty + // | Map(cName) + // | Map(cName, cConf) + // | Map(cName, cConf, tableConf) + // | Map(tableConf) + // + // defaultConf: Seq.empty + // | Seq(cNameDefault) + // | Seq(cNameDefault, cConfDefault) + // | Seq(cNameDefault, cConfDefault, tableConfDefault) + // | Seq(tableConfDefault) + // + // errorOpt: None + // | Some(errCannotOverride) + // | Some(errMissingConfInCommand(cConfKey)) + // | Some(errMissingConfInSession(cConfKey)) + // | Some(errTableConfInCommand) + // | Some(errTableConfInSession) + + gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + + "passes for existing target tables with no explicit Coordinated Commits Configurations.") ( + Seq( + Seq.empty, + // Not having any explicit Coordinated Commits configurations, but having an illegal + // combination of Coordinated Commits configurations in default: pass. + // This is because we don't consider default configurations when the table exists. + Seq(cNameDefault), + Seq(cNameDefault, cConfDefault), + Seq(cNameDefault, cConfDefault, tableConfDefault), + Seq(tableConfDefault) + ) + ) { defaultConfs: Seq[(String, String)] => + testValidationForCreateDeltaTableCommand( + tableExists = true, + propertyOverrides = Map.empty, + defaultConfs, + errorOpt = None) + } + + gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + + "fails for existing target tables with any explicit Coordinated Commits Configurations.") ( + Seq( + (Map(cName), Seq.empty), + (Map(cName), Seq(cNameDefault)), + (Map(cName), Seq(cNameDefault, cConfDefault)), + (Map(cName), Seq(cNameDefault, cConfDefault, tableConfDefault)), + (Map(cName), Seq(tableConfDefault)), + + (Map(cName, cConf), Seq.empty), + (Map(cName, cConf), Seq(cNameDefault)), + (Map(cName, cConf), Seq(cNameDefault, cConfDefault)), + (Map(cName, cConf), Seq(cNameDefault, cConfDefault, tableConfDefault)), + (Map(cName, cConf), Seq(tableConfDefault)), + + (Map(cName, cConf, tableConf), Seq.empty), + (Map(cName, cConf, tableConf), Seq(cNameDefault)), + (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault)), + (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault)), + (Map(cName, cConf, tableConf), Seq(tableConfDefault)), + + (Map(tableConf), Seq.empty), + (Map(tableConf), Seq(cNameDefault)), + (Map(tableConf), Seq(cNameDefault, cConfDefault)), + (Map(tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault)), + (Map(tableConf), Seq(tableConfDefault)) + ) + ) { case ( + propertyOverrides: Map[String, String], + defaultConfs: Seq[(String, String)]) => + testValidationForCreateDeltaTableCommand( + tableExists = true, + propertyOverrides, + defaultConfs, + errorOpt = Some(errCannotOverride)) + } + + gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + + "works correctly for new target tables with default Coordinated Commits Configurations.") ( + Seq( + (Seq.empty, None), + (Seq(cNameDefault), Some(errMissingConfInSession(cConfDefaultKey))), + (Seq(cNameDefault, cConfDefault), None), + (Seq(cNameDefault, cConfDefault, tableConfDefault), Some(errTableConfInSession)), + (Seq(tableConfDefault), Some(errTableConfInSession)) + ) + ) { case ( + defaultConfs: Seq[(String, String)], + errorOpt: Option[DeltaIllegalArgumentException]) => + testValidationForCreateDeltaTableCommand( + tableExists = false, + propertyOverrides = Map.empty, + defaultConfs, + errorOpt) + } + + gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + + "fails for new target tables with any illegal explicit Coordinated Commits Configurations.") ( + Seq( + (Map(cName), Seq.empty, Some(errMissingConfInCommand(cConfKey))), + (Map(cName), Seq(cNameDefault), Some(errMissingConfInCommand(cConfKey))), + (Map(cName), Seq(cNameDefault, cConfDefault), Some(errMissingConfInCommand(cConfKey))), + (Map(cName), Seq(cNameDefault, cConfDefault, tableConfDefault), + Some(errMissingConfInCommand(cConfKey))), + (Map(cName), Seq(tableConfDefault), Some(errMissingConfInCommand(cConfKey))), + + (Map(cName, cConf, tableConf), Seq.empty, Some(errTableConfInCommand)), + (Map(cName, cConf, tableConf), Seq(cNameDefault), Some(errTableConfInCommand)), + (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault), Some(errTableConfInCommand)), + (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault), + Some(errTableConfInCommand)), + (Map(cName, cConf, tableConf), Seq(tableConfDefault), Some(errTableConfInCommand)), + + (Map(tableConf), Seq.empty, Some(errTableConfInCommand)), + (Map(tableConf), Seq(cNameDefault), Some(errTableConfInCommand)), + (Map(tableConf), Seq(cNameDefault, cConfDefault), Some(errTableConfInCommand)), + (Map(tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault), + Some(errTableConfInCommand)), + (Map(tableConf), Seq(tableConfDefault), Some(errTableConfInCommand)) + ) + ) { case ( + propertyOverrides: Map[String, String], + defaultConfs: Seq[(String, String)], + errorOpt: Option[DeltaIllegalArgumentException]) => + testValidationForCreateDeltaTableCommand( + tableExists = false, + propertyOverrides, + defaultConfs, + errorOpt) + } + + gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + + "passes for new target tables with legal explicit Coordinated Commits Configurations.") ( + Seq( + // Having exactly Coordinator Name and Coordinator Conf explicitly, but having an illegal + // combination of Coordinated Commits configurations in default: pass. + // This is because we don't consider default configurations when explicit ones are provided. + Seq.empty, + Seq(cNameDefault), + Seq(cNameDefault, cConfDefault), + Seq(cNameDefault, cConfDefault, tableConfDefault), + Seq(tableConfDefault) + ) + ) { defaultConfs: Seq[(String, String)] => + testValidationForCreateDeltaTableCommand( + tableExists = false, + propertyOverrides = Map(cName, cConf), + defaultConfs, + errorOpt = None) + } + + private def testValidateConfigurationsForAlterTableSetPropertiesDeltaCommand( + existingConfs: Map[String, String], + propertyOverrides: Map[String, String], + errorOpt: Option[DeltaIllegalArgumentException]): Unit = { + if (errorOpt.isDefined) { + val e = intercept[DeltaIllegalArgumentException] { + CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand( + existingConfs, propertyOverrides) + } + checkError( + exception = e, + errorClass = errorOpt.get.getErrorClass, + sqlState = errorOpt.get.getSqlState, + parameters = errorOpt.get.getMessageParameters.asScala.toMap) + } else { + CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand( + existingConfs, propertyOverrides) + } + } + + gridTest("During ALTER, `validateConfigurationsForAlterTableSetPropertiesDeltaCommand` " + + "works correctly for tables without Coordinated Commits configurations.") { + Seq( + (Map.empty, None), + (Map(cName), Some(new DeltaIllegalArgumentException( + "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_COMMAND", Array("ALTER", cConfKey)))), + (Map(cName, cConf), None), + (Map(cName, cConf, tableConf), Some(new DeltaIllegalArgumentException( + "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND", Array("ALTER", tableConfKey)))), + (Map(tableConf), Some(new DeltaIllegalArgumentException( + "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND", Array("ALTER", tableConfKey)))) + ) + } { case ( + propertyOverrides: Map[String, String], + errorOpt: Option[DeltaIllegalArgumentException]) => + testValidateConfigurationsForAlterTableSetPropertiesDeltaCommand( + existingConfs = Map.empty, + propertyOverrides, + errorOpt) + } + + test("During ALTER, `validateConfigurationsForAlterTableSetPropertiesDeltaCommand` " + + "passes with no overrides for tables with Coordinated Commits configurations.") { + testValidateConfigurationsForAlterTableSetPropertiesDeltaCommand( + existingConfs = Map(cName, cConf, tableConf), + propertyOverrides = Map.empty, + errorOpt = None) + } + + gridTest("During ALTER, `validateConfigurationsForAlterTableSetPropertiesDeltaCommand` " + + "fails with overrides for tables with Coordinated Commits configurations.") ( + Seq( + Map(cName), + Map(cName, cConf), + Map(cName, cConf, tableConf), + Map(tableConf) + ) + ) { propertyOverrides: Map[String, String] => + testValidateConfigurationsForAlterTableSetPropertiesDeltaCommand( + existingConfs = Map(cName, cConf, tableConf), + propertyOverrides, + errorOpt = Some(new DeltaIllegalArgumentException( + "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", Array("ALTER")))) + } + + private def errCannotUnset = new DeltaIllegalArgumentException( + "DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS", Array.empty) + + private def testValidateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( + existingConfs: Map[String, String], + propKeysToUnset: Seq[String], + errorOpt: Option[DeltaIllegalArgumentException]): Unit = { + if (errorOpt.isDefined) { + val e = intercept[DeltaIllegalArgumentException] { + CoordinatedCommitsUtils.validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( + existingConfs, propKeysToUnset) + } + checkError( + exception = e, + errorClass = errorOpt.get.getErrorClass, + sqlState = errorOpt.get.getSqlState, + parameters = errorOpt.get.getMessageParameters.asScala.toMap) + } else { + CoordinatedCommitsUtils.validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( + existingConfs, propKeysToUnset) + } + } + + gridTest("During ALTER, `validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand` " + + "fails with overrides for tables with Coordinated Commits configurations.") { + Seq( + Seq(cNameKey), + Seq(cNameKey, cConfKey), + Seq(cNameKey, cConfKey, tableConfKey), + Seq(tableConfKey) + ) + } { propKeysToUnset: Seq[String] => + testValidateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( + existingConfs = Map(cName, cConf, tableConf), + propKeysToUnset, + errorOpt = Some(errCannotUnset)) + } + + gridTest("During ALTER, `validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand` " + + "passes with no overrides for tables with or without Coordinated Commits configurations.") { + Seq( + Map.empty, + Map(cName, cConf, tableConf) + ) + } { case existingConfs: Map[String, String] => + testValidateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( + existingConfs, + propKeysToUnset = Seq.empty, + errorOpt = None) + } + + gridTest("During ALTER, `validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand` " + + "passes with overrides for tables without Coordinated Commits configurations.") { + Seq( + Seq(cNameKey), + Seq(cNameKey, cConfKey), + Seq(cNameKey, cConfKey, tableConfKey), + Seq(tableConfKey) + ) + } { propKeysToUnset: Seq[String] => + testValidateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( + existingConfs = Map.empty, + propKeysToUnset, + errorOpt = None) + } + + ///////////////////////////////////////////////////////////////////////////////////////////// + // Test CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl ENDS // + ///////////////////////////////////////////////////////////////////////////////////////////// +}