Skip to content

Commit

Permalink
Extend streaming tests with coordinated commits (1/2)
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Jan 15, 2025
1 parent dd5d90d commit a8e4088
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ object CoordinatedCommitsUtils extends DeltaLogging {
protocol: Protocol,
failIfImplUnavailable: Boolean): Option[CommitCoordinatorClient] = {
metadata.coordinatedCommitsCoordinatorName.flatMap { commitCoordinatorStr =>
assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature))
assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature),
"coordinated commits table feature is not supported")
val coordinatorConf = metadata.coordinatedCommitsCoordinatorConf
val coordinatorOpt = CommitCoordinatorProvider.getCommitCoordinatorClientOpt(
commitCoordinatorStr, coordinatorConf, spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta

import java.util.Date

import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
Expand Down Expand Up @@ -226,9 +227,9 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl))

val currentTime = new Date().getTime
modifyDeltaTimestamp(deltaLog, 0, currentTime - 100000)
modifyDeltaTimestamp(deltaLog, 1, currentTime)
modifyDeltaTimestamp(deltaLog, 2, currentTime + 100000)
modifyCommitTimestamp(deltaLog, 0, currentTime - 100000)
modifyCommitTimestamp(deltaLog, 1, currentTime)
modifyCommitTimestamp(deltaLog, 2, currentTime + 100000)

val readDf = sql(s"SELECT * FROM table_changes('$tbl', 0, now())")
checkCDCAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,23 @@

package org.apache.spark.sql.delta

import java.io.File
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date

import scala.language.implicitConversions

import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp
import org.apache.spark.sql.delta.actions.AddCDCFile
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.sources.{DeltaSourceOffset, DeltaSQLConf}
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.tables._
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkConf, SparkThrowable}
import org.apache.spark.sql.DataFrame
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger}
import org.apache.spark.sql.types.StructType
Expand All @@ -48,16 +47,6 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
override protected def sparkConf: SparkConf = super.sparkConf
.set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")

/** Modify timestamp for a delta commit, used to test timestamp querying */
def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
file.setLastModified(time)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (crc.exists()) {
crc.setLastModified(time)
}
}

/**
* Create two tests for maxFilesPerTrigger and maxBytesPerTrigger
*/
Expand Down Expand Up @@ -198,11 +187,11 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
// version 0
Seq(1, 2, 3).toDF("id").write.delta(inputDir.toString)
val deltaLog = DeltaLog.forTable(spark, inputDir.getAbsolutePath)
modifyDeltaTimestamp(deltaLog, 0, 1000)
modifyCommitTimestamp(deltaLog, 0, 1000)

// version 1
Seq(-1).toDF("id").write.mode("append").delta(inputDir.toString)
modifyDeltaTimestamp(deltaLog, 1, 2000)
modifyCommitTimestamp(deltaLog, 1, 2000)

val deltaTable = io.delta.tables.DeltaTable.forPath(inputDir.getAbsolutePath)
val startTs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
Expand Down Expand Up @@ -231,7 +220,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
// version 0
Seq(1, 2, 3, 4, 5, 6).toDF("id").write.delta(inputDir.toString)
val deltaLog = DeltaLog.forTable(spark, inputDir.getAbsolutePath)
modifyDeltaTimestamp(deltaLog, 0, 1000)
modifyCommitTimestamp(deltaLog, 0, 1000)

val df1 = spark.readStream
.option(DeltaOptions.CDC_READ_OPTION, "true")
Expand Down Expand Up @@ -278,7 +267,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
Seq(1, 2, 3).toDF("id").write.delta(inputDir.toString)
val inputPath = inputDir.getAbsolutePath
val deltaLog = DeltaLog.forTable(spark, inputPath)
modifyDeltaTimestamp(deltaLog, 0, 1000)
modifyCommitTimestamp(deltaLog, 0, 1000)

val deltaTable = io.delta.tables.DeltaTable.forPath(inputPath)

Expand Down Expand Up @@ -1065,6 +1054,21 @@ class DeltaCDCStreamDeletionVectorSuite extends DeltaCDCStreamSuite
}

class DeltaCDCStreamSuite extends DeltaCDCStreamSuiteBase
class DeltaCDCStreamWithCoordinatedCommitsBatch1Suite
extends DeltaCDCStreamSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1)
}

class DeltaCDCStreamWithCoordinatedCommitsBatch10Suite
extends DeltaCDCStreamSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10)
}

class DeltaCDCStreamWithCoordinatedCommitsBatch100Suite
extends DeltaCDCStreamSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100)
}

abstract class DeltaCDCStreamColumnMappingSuiteBase extends DeltaCDCStreamSuite
with ColumnMappingStreamingBlockedWorkflowSuiteBase with DeltaColumnMappingSelectedTestMixin {

Expand Down
99 changes: 36 additions & 63 deletions spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Date
import scala.collection.JavaConverters._

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.DeltaTestUtils.{modifyCommitTimestamp, BOOLEAN_DOMAIN}
import org.apache.spark.sql.delta.commands.cdc.CDCReader._
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -108,16 +108,6 @@ abstract class DeltaCDCSuiteBase
schemaMode: Option[DeltaBatchCDFSchemaMode] = Some(BatchCDFSchemaLegacy),
readerOptions: Map[String, String] = Map.empty): DataFrame

/** Modify timestamp for a delta commit, used to test timestamp querying */
def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
file.setLastModified(time)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (crc.exists()) {
crc.setLastModified(time)
}
}

/** Create table utility method */
def ctas(srcTbl: String, dstTbl: String, disableCDC: Boolean = false): Unit = {
val readDf = cdcRead(new TableName(srcTbl), StartingVersion("0"), EndingVersion("1"))
Expand Down Expand Up @@ -252,14 +242,14 @@ abstract class DeltaCDCSuiteBase

// modify timestamps
// version 0
modifyDeltaTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 0, 0)
val tsAfterV0 = dateFormat.format(new Date(1))

// version 1
modifyDeltaTimestamp(deltaLog, 1, 1000)
modifyCommitTimestamp(deltaLog, 1, 1000)
val tsAfterV1 = dateFormat.format(new Date(1001))

modifyDeltaTimestamp(deltaLog, 2, 2000)
modifyCommitTimestamp(deltaLog, 2, 2000)

val readDf = cdcRead(
new TablePath(tempDir.getAbsolutePath),
Expand All @@ -278,9 +268,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 10000)
modifyDeltaTimestamp(deltaLog, 2, 20000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 10000)
modifyCommitTimestamp(deltaLog, 2, 20000)

val ts0 = dateFormat.format(new Date(2000))
val readDf = cdcRead(
Expand All @@ -299,9 +289,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 1000)
modifyDeltaTimestamp(deltaLog, 2, 2000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 1000)
modifyCommitTimestamp(deltaLog, 2, 2000)

val ts0 = dateFormat.format(new Date(0))
val readDf = cdcRead(
Expand All @@ -320,9 +310,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 4000)
modifyDeltaTimestamp(deltaLog, 1, 8000)
modifyDeltaTimestamp(deltaLog, 2, 12000)
modifyCommitTimestamp(deltaLog, 0, 4000)
modifyCommitTimestamp(deltaLog, 1, 8000)
modifyCommitTimestamp(deltaLog, 2, 12000)

val ts0 = dateFormat.format(new Date(1000))
val ts1 = dateFormat.format(new Date(3000))
Expand All @@ -341,9 +331,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 4000)
modifyDeltaTimestamp(deltaLog, 2, 8000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 4000)
modifyCommitTimestamp(deltaLog, 2, 8000)

val ts0 = dateFormat.format(new Date(1000))
val ts1 = dateFormat.format(new Date(3000))
Expand All @@ -363,9 +353,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 4000)
modifyDeltaTimestamp(deltaLog, 2, 8000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 4000)
modifyCommitTimestamp(deltaLog, 2, 8000)

val ts0 = dateFormat.format(new Date(3000))
val ts1 = dateFormat.format(new Date(5000))
Expand All @@ -385,9 +375,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 4000)
modifyDeltaTimestamp(deltaLog, 2, 8000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 4000)
modifyCommitTimestamp(deltaLog, 2, 8000)

val ts0 = dateFormat.format(new Date(3000))
val ts1 = dateFormat.format(new Date(1000))
Expand All @@ -406,9 +396,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 4000)
modifyDeltaTimestamp(deltaLog, 2, 8000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 4000)
modifyCommitTimestamp(deltaLog, 2, 8000)

val ts0 = dateFormat.format(new Date(5000))
val ts1 = dateFormat.format(new Date(3000))
Expand Down Expand Up @@ -449,7 +439,7 @@ abstract class DeltaCDCSuiteBase
// Set commit time during Daylight savings time change.
val restoreDate = "2022-11-06 01:42:44"
val timestamp = dateFormat.parse(s"$restoreDate -0800").getTime
modifyDeltaTimestamp(deltaLog, 0, timestamp)
modifyCommitTimestamp(deltaLog, 0, timestamp)

// Verify DST is respected.
val e = intercept[Exception] {
Expand Down Expand Up @@ -558,9 +548,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 1000)
modifyDeltaTimestamp(deltaLog, 2, 2000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 1000)
modifyCommitTimestamp(deltaLog, 2, 2000)

val ts0 = dateFormat.format(new Date(2000))
val ts1 = dateFormat.format(new Date(1))
Expand Down Expand Up @@ -795,13 +785,13 @@ abstract class DeltaCDCSuiteBase

// modify timestamps
// version 0
modifyDeltaTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 0, 0)

// version 1
modifyDeltaTimestamp(deltaLog, 1, 1000)
modifyCommitTimestamp(deltaLog, 1, 1000)

// version 2
modifyDeltaTimestamp(deltaLog, 2, 2000)
modifyCommitTimestamp(deltaLog, 2, 2000)
val tsStart = dateFormat.format(new Date(3000))
val tsEnd = dateFormat.format(new Date(4000))

Expand All @@ -825,13 +815,13 @@ abstract class DeltaCDCSuiteBase

// modify timestamps
// version 0
modifyDeltaTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 0, 0)

// version 1
modifyDeltaTimestamp(deltaLog, 1, 1000)
modifyCommitTimestamp(deltaLog, 1, 1000)

// version 2
modifyDeltaTimestamp(deltaLog, 2, 2000)
modifyCommitTimestamp(deltaLog, 2, 2000)

val tsStart = dateFormat.format(new Date(0))
val tsEnd = dateFormat.format(new Date(4000))
Expand Down Expand Up @@ -1107,23 +1097,6 @@ class DeltaCDCScalaWithDeletionVectorsSuite extends DeltaCDCScalaSuite
}

class DeltaCDCScalaSuiteWithCoordinatedCommitsBatch10 extends DeltaCDCScalaSuite
with CoordinatedCommitsBaseSuite {

/** Modify timestamp for a delta commit, used to test timestamp querying */
override def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
val fileProvider = DeltaCommitFileProvider(deltaLog.snapshot)
val file = new File(fileProvider.deltaFile(version).toUri)
InCommitTimestampTestUtils.overwriteICTInDeltaFile(
deltaLog,
new Path(file.getPath),
Some(time))
file.setLastModified(time)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (crc.exists()) {
InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(time))
crc.setLastModified(time)
}
}

with CoordinatedCommitsBaseSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import scala.language.implicitConversions
import com.databricks.spark.util.Log4jUsageLogger
import org.apache.spark.sql.delta.DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED
import org.apache.spark.sql.delta.DeltaHistoryManagerSuiteShims._
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.DeltaTestUtils.{createTestAddFile, modifyCommitTimestamp}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.StatsUtils
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames}
import org.apache.spark.sql.delta.util.FileNames
import org.scalatest.GivenWhenThen

import org.apache.spark.{SparkConf, SparkException}
Expand Down Expand Up @@ -64,31 +64,6 @@ trait DeltaTimeTravelTests extends QueryTest

protected val timeFormatter = new SimpleDateFormat("yyyyMMddHHmmssSSS")

protected def modifyCommitTimestamp(deltaLog: DeltaLog, version: Long, ts: Long): Unit = {
val filePath = DeltaCommitFileProvider(deltaLog.update()).deltaFile(version)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (isICTEnabledForNewTables) {
InCommitTimestampTestUtils.overwriteICTInDeltaFile(deltaLog, filePath, Some(ts))
if (FileNames.isUnbackfilledDeltaFile(filePath)) {
// Also change the ICT in the backfilled file if it exists.
val backfilledFilePath = FileNames.unsafeDeltaFile(deltaLog.logPath, version)
val fs = backfilledFilePath.getFileSystem(deltaLog.newDeltaHadoopConf())
if (fs.exists(backfilledFilePath)) {
InCommitTimestampTestUtils.overwriteICTInDeltaFile(deltaLog, backfilledFilePath, Some(ts))
}
}
if (crc.exists()) {
InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(ts))
}
} else {
val file = new File(filePath.toUri)
file.setLastModified(ts)
if (crc.exists()) {
crc.setLastModified(ts)
}
}
}

protected def versionAsOf(table: String, version: Long): String = {
s"$table version as of $version"
}
Expand Down
Loading

0 comments on commit a8e4088

Please sign in to comment.