Skip to content

Commit

Permalink
commits
Browse files Browse the repository at this point in the history
  • Loading branch information
junlee-db committed Sep 5, 2024
1 parent 7dfc6d9 commit cc581c2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.delta.coordinatedcommits
import java.util.Optional
import java.util.concurrent.atomic.AtomicInteger

import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaTestUtilsBase}
import org.apache.spark.sql.delta.DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME
import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol}
Expand Down Expand Up @@ -324,6 +326,32 @@ trait CoordinatedCommitsBaseSuite

final def coordinatedCommitsEnabledInTests: Boolean = coordinatedCommitsBackfillBatchSize.nonEmpty

// In case some tests reuse the table path/name with DROP table, this method can be used to
// clean the table data in the commit coordinator. Note that we should call this before
// the table actually gets DROP.
def deleteTableFromCommitCoordinator(tableName: String): Unit = {
val cc = CommitCoordinatorProvider.getCommitCoordinatorClient(
defaultCommitsCoordinatorName, defaultCommitsCoordinatorConf, spark)
assert(
cc.isInstanceOf[TrackingCommitCoordinatorClient],
s"Please implement delete/drop method for coordinator: ${cc.getClass.getName}")
val location = try {
spark.sql(s"describe detail $tableName")
.select("location")
.first
.getAs[String](0)
} catch {
case NonFatal(_) =>
// Ignore if the table does not exist/broken.
return
}
val logPath = location + "/_delta_log"
cc.asInstanceOf[TrackingCommitCoordinatorClient]
.delegatingCommitCoordinatorClient
.asInstanceOf[InMemoryCommitCoordinator]
.dropTable(new Path(logPath))
}

override protected def sparkConf: SparkConf = {
if (coordinatedCommitsBackfillBatchSize.nonEmpty) {
val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(defaultCommitsCoordinatorConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
import org.apache.spark.sql.delta.DeltaOperations.{CLUSTERING_PARAMETER_KEY, ZORDER_PARAMETER_KEY}
import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.hooks.UpdateCatalog
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
Expand All @@ -32,7 +33,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession {
trait ClusteredTableTestUtilsBase
extends SparkFunSuite
with SharedSparkSession
with CoordinatedCommitsBaseSuite {
import testImplicits._

/**
Expand Down Expand Up @@ -161,6 +165,23 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
}
}

protected def deleteTableFromCommitCoordinatorIfNeeded(table: String): Unit = {
if (coordinatedCommitsEnabledInTests) {
// Clean up the table data in commit coordinator because DROP/REPLACE TABLE does not bother
// commit coordinator.
deleteTableFromCommitCoordinator(table)
}
}

override def withTable(tableNames: String*)(f: => Unit): Unit = {
Utils.tryWithSafeFinally(f) {
tableNames.foreach { name =>
deleteTableFromCommitCoordinatorIfNeeded(name)
spark.sql(s"DROP TABLE IF EXISTS $name")
}
}
}

def withClusteredTable[T](
table: String,
schema: String,
Expand All @@ -170,6 +191,7 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
createOrReplaceClusteredTable("CREATE", table, schema, clusterBy, tableProperties, location)

Utils.tryWithSafeFinally(f) {
deleteTableFromCommitCoordinatorIfNeeded(table)
spark.sql(s"DROP TABLE IF EXISTS $table")
}
}
Expand Down

0 comments on commit cc581c2

Please sign in to comment.