Skip to content

Commit

Permalink
Extend streaming tests with coordinated commits (1/2)
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Feb 2, 2025
1 parent 990c8e8 commit 2f9460c
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ object CoordinatedCommitsUtils extends DeltaLogging {
protocol: Protocol,
failIfImplUnavailable: Boolean): Option[CommitCoordinatorClient] = {
metadata.coordinatedCommitsCoordinatorName.flatMap { commitCoordinatorStr =>
assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature))
assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature),
"coordinated commits table feature is not supported")
val coordinatorConf = metadata.coordinatedCommitsCoordinatorConf
val coordinatorOpt = CommitCoordinatorProvider.getCommitCoordinatorClientOpt(
commitCoordinatorStr, coordinatorConf, spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class CheckpointsSuite
// `FakeGCSFileSystemValidatingCheckpoint`.
// The default one is `HDFSLogStore` which requires a `FileContext` but we don't have one.
super.sparkConf.set("spark.delta.logStore.gs.impl", classOf[LocalLogStore].getName)
.set(DeltaSQLConf.TEST_DV_NAME_PREFIX.key, "test%dv%prefix-")
}

test("checkpoint metadata - checkpoint schema above the configured threshold are not" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta

import java.util.Date

import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
Expand Down Expand Up @@ -226,9 +227,9 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl))

val currentTime = new Date().getTime
modifyDeltaTimestamp(deltaLog, 0, currentTime - 100000)
modifyDeltaTimestamp(deltaLog, 1, currentTime)
modifyDeltaTimestamp(deltaLog, 2, currentTime + 100000)
modifyCommitTimestamp(deltaLog, 0, currentTime - 100000)
modifyCommitTimestamp(deltaLog, 1, currentTime)
modifyCommitTimestamp(deltaLog, 2, currentTime + 100000)

val readDf = sql(s"SELECT * FROM table_changes('$tbl', 0, now())")
checkCDCAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,23 @@

package org.apache.spark.sql.delta

import java.io.File
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date

import scala.language.implicitConversions

import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp
import org.apache.spark.sql.delta.actions.AddCDCFile
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.sources.{DeltaSourceOffset, DeltaSQLConf}
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.tables._
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkConf, SparkThrowable}
import org.apache.spark.sql.DataFrame
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger}
import org.apache.spark.sql.types.StructType
Expand All @@ -48,16 +47,6 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
override protected def sparkConf: SparkConf = super.sparkConf
.set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")

/** Modify timestamp for a delta commit, used to test timestamp querying */
def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
file.setLastModified(time)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (crc.exists()) {
crc.setLastModified(time)
}
}

/**
* Create two tests for maxFilesPerTrigger and maxBytesPerTrigger
*/
Expand Down Expand Up @@ -198,11 +187,11 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
// version 0
Seq(1, 2, 3).toDF("id").write.delta(inputDir.toString)
val deltaLog = DeltaLog.forTable(spark, inputDir.getAbsolutePath)
modifyDeltaTimestamp(deltaLog, 0, 1000)
modifyCommitTimestamp(deltaLog, 0, 1000)

// version 1
Seq(-1).toDF("id").write.mode("append").delta(inputDir.toString)
modifyDeltaTimestamp(deltaLog, 1, 2000)
modifyCommitTimestamp(deltaLog, 1, 2000)

val deltaTable = io.delta.tables.DeltaTable.forPath(inputDir.getAbsolutePath)
val startTs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
Expand Down Expand Up @@ -231,7 +220,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
// version 0
Seq(1, 2, 3, 4, 5, 6).toDF("id").write.delta(inputDir.toString)
val deltaLog = DeltaLog.forTable(spark, inputDir.getAbsolutePath)
modifyDeltaTimestamp(deltaLog, 0, 1000)
modifyCommitTimestamp(deltaLog, 0, 1000)

val df1 = spark.readStream
.option(DeltaOptions.CDC_READ_OPTION, "true")
Expand Down Expand Up @@ -278,7 +267,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
Seq(1, 2, 3).toDF("id").write.delta(inputDir.toString)
val inputPath = inputDir.getAbsolutePath
val deltaLog = DeltaLog.forTable(spark, inputPath)
modifyDeltaTimestamp(deltaLog, 0, 1000)
modifyCommitTimestamp(deltaLog, 0, 1000)

val deltaTable = io.delta.tables.DeltaTable.forPath(inputPath)

Expand Down Expand Up @@ -1065,6 +1054,21 @@ class DeltaCDCStreamDeletionVectorSuite extends DeltaCDCStreamSuite
}

class DeltaCDCStreamSuite extends DeltaCDCStreamSuiteBase
class DeltaCDCStreamWithCoordinatedCommitsBatch1Suite
extends DeltaCDCStreamSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1)
}

class DeltaCDCStreamWithCoordinatedCommitsBatch10Suite
extends DeltaCDCStreamSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10)
}

class DeltaCDCStreamWithCoordinatedCommitsBatch100Suite
extends DeltaCDCStreamSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100)
}

abstract class DeltaCDCStreamColumnMappingSuiteBase extends DeltaCDCStreamSuite
with ColumnMappingStreamingBlockedWorkflowSuiteBase with DeltaColumnMappingSelectedTestMixin {

Expand Down
99 changes: 36 additions & 63 deletions spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Date
import scala.collection.JavaConverters._

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.DeltaTestUtils.{modifyCommitTimestamp, BOOLEAN_DOMAIN}
import org.apache.spark.sql.delta.commands.cdc.CDCReader._
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -108,16 +108,6 @@ abstract class DeltaCDCSuiteBase
schemaMode: Option[DeltaBatchCDFSchemaMode] = Some(BatchCDFSchemaLegacy),
readerOptions: Map[String, String] = Map.empty): DataFrame

/** Modify timestamp for a delta commit, used to test timestamp querying */
def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
file.setLastModified(time)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (crc.exists()) {
crc.setLastModified(time)
}
}

/** Create table utility method */
def ctas(srcTbl: String, dstTbl: String, disableCDC: Boolean = false): Unit = {
val readDf = cdcRead(new TableName(srcTbl), StartingVersion("0"), EndingVersion("1"))
Expand Down Expand Up @@ -252,14 +242,14 @@ abstract class DeltaCDCSuiteBase

// modify timestamps
// version 0
modifyDeltaTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 0, 0)
val tsAfterV0 = dateFormat.format(new Date(1))

// version 1
modifyDeltaTimestamp(deltaLog, 1, 1000)
modifyCommitTimestamp(deltaLog, 1, 1000)
val tsAfterV1 = dateFormat.format(new Date(1001))

modifyDeltaTimestamp(deltaLog, 2, 2000)
modifyCommitTimestamp(deltaLog, 2, 2000)

val readDf = cdcRead(
new TablePath(tempDir.getAbsolutePath),
Expand All @@ -278,9 +268,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 10000)
modifyDeltaTimestamp(deltaLog, 2, 20000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 10000)
modifyCommitTimestamp(deltaLog, 2, 20000)

val ts0 = dateFormat.format(new Date(2000))
val readDf = cdcRead(
Expand All @@ -299,9 +289,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 1000)
modifyDeltaTimestamp(deltaLog, 2, 2000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 1000)
modifyCommitTimestamp(deltaLog, 2, 2000)

val ts0 = dateFormat.format(new Date(0))
val readDf = cdcRead(
Expand All @@ -320,9 +310,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 4000)
modifyDeltaTimestamp(deltaLog, 1, 8000)
modifyDeltaTimestamp(deltaLog, 2, 12000)
modifyCommitTimestamp(deltaLog, 0, 4000)
modifyCommitTimestamp(deltaLog, 1, 8000)
modifyCommitTimestamp(deltaLog, 2, 12000)

val ts0 = dateFormat.format(new Date(1000))
val ts1 = dateFormat.format(new Date(3000))
Expand All @@ -341,9 +331,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 4000)
modifyDeltaTimestamp(deltaLog, 2, 8000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 4000)
modifyCommitTimestamp(deltaLog, 2, 8000)

val ts0 = dateFormat.format(new Date(1000))
val ts1 = dateFormat.format(new Date(3000))
Expand All @@ -363,9 +353,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 4000)
modifyDeltaTimestamp(deltaLog, 2, 8000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 4000)
modifyCommitTimestamp(deltaLog, 2, 8000)

val ts0 = dateFormat.format(new Date(3000))
val ts1 = dateFormat.format(new Date(5000))
Expand All @@ -385,9 +375,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 4000)
modifyDeltaTimestamp(deltaLog, 2, 8000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 4000)
modifyCommitTimestamp(deltaLog, 2, 8000)

val ts0 = dateFormat.format(new Date(3000))
val ts1 = dateFormat.format(new Date(1000))
Expand All @@ -406,9 +396,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 4000)
modifyDeltaTimestamp(deltaLog, 2, 8000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 4000)
modifyCommitTimestamp(deltaLog, 2, 8000)

val ts0 = dateFormat.format(new Date(5000))
val ts1 = dateFormat.format(new Date(3000))
Expand Down Expand Up @@ -449,7 +439,7 @@ abstract class DeltaCDCSuiteBase
// Set commit time during Daylight savings time change.
val restoreDate = "2022-11-06 01:42:44"
val timestamp = dateFormat.parse(s"$restoreDate -0800").getTime
modifyDeltaTimestamp(deltaLog, 0, timestamp)
modifyCommitTimestamp(deltaLog, 0, timestamp)

// Verify DST is respected.
val e = intercept[Exception] {
Expand Down Expand Up @@ -558,9 +548,9 @@ abstract class DeltaCDCSuiteBase
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)

modifyDeltaTimestamp(deltaLog, 0, 0)
modifyDeltaTimestamp(deltaLog, 1, 1000)
modifyDeltaTimestamp(deltaLog, 2, 2000)
modifyCommitTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 1, 1000)
modifyCommitTimestamp(deltaLog, 2, 2000)

val ts0 = dateFormat.format(new Date(2000))
val ts1 = dateFormat.format(new Date(1))
Expand Down Expand Up @@ -795,13 +785,13 @@ abstract class DeltaCDCSuiteBase

// modify timestamps
// version 0
modifyDeltaTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 0, 0)

// version 1
modifyDeltaTimestamp(deltaLog, 1, 1000)
modifyCommitTimestamp(deltaLog, 1, 1000)

// version 2
modifyDeltaTimestamp(deltaLog, 2, 2000)
modifyCommitTimestamp(deltaLog, 2, 2000)
val tsStart = dateFormat.format(new Date(3000))
val tsEnd = dateFormat.format(new Date(4000))

Expand All @@ -825,13 +815,13 @@ abstract class DeltaCDCSuiteBase

// modify timestamps
// version 0
modifyDeltaTimestamp(deltaLog, 0, 0)
modifyCommitTimestamp(deltaLog, 0, 0)

// version 1
modifyDeltaTimestamp(deltaLog, 1, 1000)
modifyCommitTimestamp(deltaLog, 1, 1000)

// version 2
modifyDeltaTimestamp(deltaLog, 2, 2000)
modifyCommitTimestamp(deltaLog, 2, 2000)

val tsStart = dateFormat.format(new Date(0))
val tsEnd = dateFormat.format(new Date(4000))
Expand Down Expand Up @@ -1107,23 +1097,6 @@ class DeltaCDCScalaWithDeletionVectorsSuite extends DeltaCDCScalaSuite
}

class DeltaCDCScalaSuiteWithCoordinatedCommitsBatch10 extends DeltaCDCScalaSuite
with CoordinatedCommitsBaseSuite {

/** Modify timestamp for a delta commit, used to test timestamp querying */
override def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
val fileProvider = DeltaCommitFileProvider(deltaLog.snapshot)
val file = new File(fileProvider.deltaFile(version).toUri)
InCommitTimestampTestUtils.overwriteICTInDeltaFile(
deltaLog,
new Path(file.getPath),
Some(time))
file.setLastModified(time)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (crc.exists()) {
InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(time))
crc.setLastModified(time)
}
}

with CoordinatedCommitsBaseSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10)
}
Loading

0 comments on commit 2f9460c

Please sign in to comment.