diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala index c9a7c3bc7a3..1f29313d3e3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala @@ -165,7 +165,8 @@ object CoordinatedCommitsUtils extends DeltaLogging { protocol: Protocol, failIfImplUnavailable: Boolean): Option[CommitCoordinatorClient] = { metadata.coordinatedCommitsCoordinatorName.flatMap { commitCoordinatorStr => - assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature)) + assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature), + "coordinated commits table feature is not supported") val coordinatorConf = metadata.coordinatedCommitsCoordinatorConf val coordinatorOpt = CommitCoordinatorProvider.getCommitCoordinatorClientOpt( commitCoordinatorStr, coordinatorConf, spark) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 9983beb1888..e38bd4988b5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -89,6 +89,7 @@ class CheckpointsSuite // `FakeGCSFileSystemValidatingCheckpoint`. // The default one is `HDFSLogStore` which requires a `FileContext` but we don't have one. super.sparkConf.set("spark.delta.logStore.gs.impl", classOf[LocalLogStore].getName) + .set(DeltaSQLConf.TEST_DV_NAME_PREFIX.key, "test%dv%prefix-") } test("checkpoint metadata - checkpoint schema above the configured threshold are not" + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala index fd1d6c820c4..ddbc4cc5998 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta import java.util.Date +import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaTestImplicits._ @@ -226,9 +227,9 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) val currentTime = new Date().getTime - modifyDeltaTimestamp(deltaLog, 0, currentTime - 100000) - modifyDeltaTimestamp(deltaLog, 1, currentTime) - modifyDeltaTimestamp(deltaLog, 2, currentTime + 100000) + modifyCommitTimestamp(deltaLog, 0, currentTime - 100000) + modifyCommitTimestamp(deltaLog, 1, currentTime) + modifyCommitTimestamp(deltaLog, 2, currentTime + 100000) val readDf = sql(s"SELECT * FROM table_changes('$tbl', 0, now())") checkCDCAnswer( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala index d432eee2f45..580c9124abc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala @@ -16,24 +16,23 @@ package org.apache.spark.sql.delta -import java.io.File import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Date import scala.language.implicitConversions +import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp import org.apache.spark.sql.delta.actions.AddCDCFile import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.sources.{DeltaSourceOffset, DeltaSQLConf} import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} +import org.apache.spark.sql.delta.util.JsonUtils import io.delta.tables._ import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkConf, SparkThrowable} -import org.apache.spark.sql.DataFrame +import org.apache.spark.SparkConf import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} import org.apache.spark.sql.types.StructType @@ -48,16 +47,6 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest override protected def sparkConf: SparkConf = super.sparkConf .set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true") - /** Modify timestamp for a delta commit, used to test timestamp querying */ - def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = { - val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) - file.setLastModified(time) - val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) - if (crc.exists()) { - crc.setLastModified(time) - } - } - /** * Create two tests for maxFilesPerTrigger and maxBytesPerTrigger */ @@ -198,11 +187,11 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest // version 0 Seq(1, 2, 3).toDF("id").write.delta(inputDir.toString) val deltaLog = DeltaLog.forTable(spark, inputDir.getAbsolutePath) - modifyDeltaTimestamp(deltaLog, 0, 1000) + modifyCommitTimestamp(deltaLog, 0, 1000) // version 1 Seq(-1).toDF("id").write.mode("append").delta(inputDir.toString) - modifyDeltaTimestamp(deltaLog, 1, 2000) + modifyCommitTimestamp(deltaLog, 1, 2000) val deltaTable = io.delta.tables.DeltaTable.forPath(inputDir.getAbsolutePath) val startTs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") @@ -231,7 +220,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest // version 0 Seq(1, 2, 3, 4, 5, 6).toDF("id").write.delta(inputDir.toString) val deltaLog = DeltaLog.forTable(spark, inputDir.getAbsolutePath) - modifyDeltaTimestamp(deltaLog, 0, 1000) + modifyCommitTimestamp(deltaLog, 0, 1000) val df1 = spark.readStream .option(DeltaOptions.CDC_READ_OPTION, "true") @@ -278,7 +267,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest Seq(1, 2, 3).toDF("id").write.delta(inputDir.toString) val inputPath = inputDir.getAbsolutePath val deltaLog = DeltaLog.forTable(spark, inputPath) - modifyDeltaTimestamp(deltaLog, 0, 1000) + modifyCommitTimestamp(deltaLog, 0, 1000) val deltaTable = io.delta.tables.DeltaTable.forPath(inputPath) @@ -1065,6 +1054,21 @@ class DeltaCDCStreamDeletionVectorSuite extends DeltaCDCStreamSuite } class DeltaCDCStreamSuite extends DeltaCDCStreamSuiteBase +class DeltaCDCStreamWithCoordinatedCommitsBatch1Suite + extends DeltaCDCStreamSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1) +} + +class DeltaCDCStreamWithCoordinatedCommitsBatch10Suite + extends DeltaCDCStreamSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10) +} + +class DeltaCDCStreamWithCoordinatedCommitsBatch100Suite + extends DeltaCDCStreamSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100) +} + abstract class DeltaCDCStreamColumnMappingSuiteBase extends DeltaCDCStreamSuite with ColumnMappingStreamingBlockedWorkflowSuiteBase with DeltaColumnMappingSelectedTestMixin { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala index 710174f5d10..771b8af2eaa 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala @@ -23,7 +23,7 @@ import java.util.Date import scala.collection.JavaConverters._ // scalastyle:off import.ordering.noEmptyLine -import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN +import org.apache.spark.sql.delta.DeltaTestUtils.{modifyCommitTimestamp, BOOLEAN_DOMAIN} import org.apache.spark.sql.delta.commands.cdc.CDCReader._ import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -108,16 +108,6 @@ abstract class DeltaCDCSuiteBase schemaMode: Option[DeltaBatchCDFSchemaMode] = Some(BatchCDFSchemaLegacy), readerOptions: Map[String, String] = Map.empty): DataFrame - /** Modify timestamp for a delta commit, used to test timestamp querying */ - def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = { - val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) - file.setLastModified(time) - val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) - if (crc.exists()) { - crc.setLastModified(time) - } - } - /** Create table utility method */ def ctas(srcTbl: String, dstTbl: String, disableCDC: Boolean = false): Unit = { val readDf = cdcRead(new TableName(srcTbl), StartingVersion("0"), EndingVersion("1")) @@ -252,14 +242,14 @@ abstract class DeltaCDCSuiteBase // modify timestamps // version 0 - modifyDeltaTimestamp(deltaLog, 0, 0) + modifyCommitTimestamp(deltaLog, 0, 0) val tsAfterV0 = dateFormat.format(new Date(1)) // version 1 - modifyDeltaTimestamp(deltaLog, 1, 1000) + modifyCommitTimestamp(deltaLog, 1, 1000) val tsAfterV1 = dateFormat.format(new Date(1001)) - modifyDeltaTimestamp(deltaLog, 2, 2000) + modifyCommitTimestamp(deltaLog, 2, 2000) val readDf = cdcRead( new TablePath(tempDir.getAbsolutePath), @@ -278,9 +268,9 @@ abstract class DeltaCDCSuiteBase createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - modifyDeltaTimestamp(deltaLog, 0, 0) - modifyDeltaTimestamp(deltaLog, 1, 10000) - modifyDeltaTimestamp(deltaLog, 2, 20000) + modifyCommitTimestamp(deltaLog, 0, 0) + modifyCommitTimestamp(deltaLog, 1, 10000) + modifyCommitTimestamp(deltaLog, 2, 20000) val ts0 = dateFormat.format(new Date(2000)) val readDf = cdcRead( @@ -299,9 +289,9 @@ abstract class DeltaCDCSuiteBase createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - modifyDeltaTimestamp(deltaLog, 0, 0) - modifyDeltaTimestamp(deltaLog, 1, 1000) - modifyDeltaTimestamp(deltaLog, 2, 2000) + modifyCommitTimestamp(deltaLog, 0, 0) + modifyCommitTimestamp(deltaLog, 1, 1000) + modifyCommitTimestamp(deltaLog, 2, 2000) val ts0 = dateFormat.format(new Date(0)) val readDf = cdcRead( @@ -320,9 +310,9 @@ abstract class DeltaCDCSuiteBase createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - modifyDeltaTimestamp(deltaLog, 0, 4000) - modifyDeltaTimestamp(deltaLog, 1, 8000) - modifyDeltaTimestamp(deltaLog, 2, 12000) + modifyCommitTimestamp(deltaLog, 0, 4000) + modifyCommitTimestamp(deltaLog, 1, 8000) + modifyCommitTimestamp(deltaLog, 2, 12000) val ts0 = dateFormat.format(new Date(1000)) val ts1 = dateFormat.format(new Date(3000)) @@ -341,9 +331,9 @@ abstract class DeltaCDCSuiteBase createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - modifyDeltaTimestamp(deltaLog, 0, 0) - modifyDeltaTimestamp(deltaLog, 1, 4000) - modifyDeltaTimestamp(deltaLog, 2, 8000) + modifyCommitTimestamp(deltaLog, 0, 0) + modifyCommitTimestamp(deltaLog, 1, 4000) + modifyCommitTimestamp(deltaLog, 2, 8000) val ts0 = dateFormat.format(new Date(1000)) val ts1 = dateFormat.format(new Date(3000)) @@ -363,9 +353,9 @@ abstract class DeltaCDCSuiteBase createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - modifyDeltaTimestamp(deltaLog, 0, 0) - modifyDeltaTimestamp(deltaLog, 1, 4000) - modifyDeltaTimestamp(deltaLog, 2, 8000) + modifyCommitTimestamp(deltaLog, 0, 0) + modifyCommitTimestamp(deltaLog, 1, 4000) + modifyCommitTimestamp(deltaLog, 2, 8000) val ts0 = dateFormat.format(new Date(3000)) val ts1 = dateFormat.format(new Date(5000)) @@ -385,9 +375,9 @@ abstract class DeltaCDCSuiteBase createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - modifyDeltaTimestamp(deltaLog, 0, 0) - modifyDeltaTimestamp(deltaLog, 1, 4000) - modifyDeltaTimestamp(deltaLog, 2, 8000) + modifyCommitTimestamp(deltaLog, 0, 0) + modifyCommitTimestamp(deltaLog, 1, 4000) + modifyCommitTimestamp(deltaLog, 2, 8000) val ts0 = dateFormat.format(new Date(3000)) val ts1 = dateFormat.format(new Date(1000)) @@ -406,9 +396,9 @@ abstract class DeltaCDCSuiteBase createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - modifyDeltaTimestamp(deltaLog, 0, 0) - modifyDeltaTimestamp(deltaLog, 1, 4000) - modifyDeltaTimestamp(deltaLog, 2, 8000) + modifyCommitTimestamp(deltaLog, 0, 0) + modifyCommitTimestamp(deltaLog, 1, 4000) + modifyCommitTimestamp(deltaLog, 2, 8000) val ts0 = dateFormat.format(new Date(5000)) val ts1 = dateFormat.format(new Date(3000)) @@ -449,7 +439,7 @@ abstract class DeltaCDCSuiteBase // Set commit time during Daylight savings time change. val restoreDate = "2022-11-06 01:42:44" val timestamp = dateFormat.parse(s"$restoreDate -0800").getTime - modifyDeltaTimestamp(deltaLog, 0, timestamp) + modifyCommitTimestamp(deltaLog, 0, timestamp) // Verify DST is respected. val e = intercept[Exception] { @@ -558,9 +548,9 @@ abstract class DeltaCDCSuiteBase createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - modifyDeltaTimestamp(deltaLog, 0, 0) - modifyDeltaTimestamp(deltaLog, 1, 1000) - modifyDeltaTimestamp(deltaLog, 2, 2000) + modifyCommitTimestamp(deltaLog, 0, 0) + modifyCommitTimestamp(deltaLog, 1, 1000) + modifyCommitTimestamp(deltaLog, 2, 2000) val ts0 = dateFormat.format(new Date(2000)) val ts1 = dateFormat.format(new Date(1)) @@ -795,13 +785,13 @@ abstract class DeltaCDCSuiteBase // modify timestamps // version 0 - modifyDeltaTimestamp(deltaLog, 0, 0) + modifyCommitTimestamp(deltaLog, 0, 0) // version 1 - modifyDeltaTimestamp(deltaLog, 1, 1000) + modifyCommitTimestamp(deltaLog, 1, 1000) // version 2 - modifyDeltaTimestamp(deltaLog, 2, 2000) + modifyCommitTimestamp(deltaLog, 2, 2000) val tsStart = dateFormat.format(new Date(3000)) val tsEnd = dateFormat.format(new Date(4000)) @@ -825,13 +815,13 @@ abstract class DeltaCDCSuiteBase // modify timestamps // version 0 - modifyDeltaTimestamp(deltaLog, 0, 0) + modifyCommitTimestamp(deltaLog, 0, 0) // version 1 - modifyDeltaTimestamp(deltaLog, 1, 1000) + modifyCommitTimestamp(deltaLog, 1, 1000) // version 2 - modifyDeltaTimestamp(deltaLog, 2, 2000) + modifyCommitTimestamp(deltaLog, 2, 2000) val tsStart = dateFormat.format(new Date(0)) val tsEnd = dateFormat.format(new Date(4000)) @@ -1107,23 +1097,6 @@ class DeltaCDCScalaWithDeletionVectorsSuite extends DeltaCDCScalaSuite } class DeltaCDCScalaSuiteWithCoordinatedCommitsBatch10 extends DeltaCDCScalaSuite - with CoordinatedCommitsBaseSuite { - - /** Modify timestamp for a delta commit, used to test timestamp querying */ - override def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = { - val fileProvider = DeltaCommitFileProvider(deltaLog.snapshot) - val file = new File(fileProvider.deltaFile(version).toUri) - InCommitTimestampTestUtils.overwriteICTInDeltaFile( - deltaLog, - new Path(file.getPath), - Some(time)) - file.setLastModified(time) - val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) - if (crc.exists()) { - InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(time)) - crc.setLastModified(time) - } - } - + with CoordinatedCommitsBaseSuite { override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala index b87ab71091d..4bfe722fbff 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala @@ -29,14 +29,14 @@ import scala.language.implicitConversions import com.databricks.spark.util.Log4jUsageLogger import org.apache.spark.sql.delta.DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED import org.apache.spark.sql.delta.DeltaHistoryManagerSuiteShims._ -import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile +import org.apache.spark.sql.delta.DeltaTestUtils.{createTestAddFile, modifyCommitTimestamp} import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.stats.StatsUtils import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames} +import org.apache.spark.sql.delta.util.FileNames import org.scalatest.GivenWhenThen import org.apache.spark.{SparkConf, SparkException} @@ -64,31 +64,6 @@ trait DeltaTimeTravelTests extends QueryTest protected val timeFormatter = new SimpleDateFormat("yyyyMMddHHmmssSSS") - protected def modifyCommitTimestamp(deltaLog: DeltaLog, version: Long, ts: Long): Unit = { - val filePath = DeltaCommitFileProvider(deltaLog.update()).deltaFile(version) - val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) - if (isICTEnabledForNewTables) { - InCommitTimestampTestUtils.overwriteICTInDeltaFile(deltaLog, filePath, Some(ts)) - if (FileNames.isUnbackfilledDeltaFile(filePath)) { - // Also change the ICT in the backfilled file if it exists. - val backfilledFilePath = FileNames.unsafeDeltaFile(deltaLog.logPath, version) - val fs = backfilledFilePath.getFileSystem(deltaLog.newDeltaHadoopConf()) - if (fs.exists(backfilledFilePath)) { - InCommitTimestampTestUtils.overwriteICTInDeltaFile(deltaLog, backfilledFilePath, Some(ts)) - } - } - if (crc.exists()) { - InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(ts)) - } - } else { - val file = new File(filePath.toUri) - file.setLastModified(ts) - if (crc.exists()) { - crc.setLastModified(ts) - } - } - } - protected def versionAsOf(table: String, version: Long): String = { s"$table version as of $version" } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceLargeLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceLargeLogSuite.scala index f251165bdc4..743017a3360 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceLargeLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceLargeLogSuite.scala @@ -23,3 +23,13 @@ class DeltaSourceLargeLogSuite extends DeltaSourceSuite { super.sparkConf.set(DeltaSQLConf.LOG_SIZE_IN_MEMORY_THRESHOLD.key, "0") } } + +class DeltaSourceLargeLogWithCoordinatedCommitsBatch1Suite + extends DeltaSourceLargeLogSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1) +} + +class DeltaSourceLargeLogWithCoordinatedCommitsBatch100Suite + extends DeltaSourceLargeLogSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100) +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index 1750a7580d2..a7b503b49a7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -16,14 +16,16 @@ package org.apache.spark.sql.delta -import java.io.{File, FileInputStream, OutputStream} +import java.io.{File, FileInputStream, OutputStream, PrintWriter, StringWriter} import java.net.URI +import java.sql.Timestamp import java.util.UUID import java.util.concurrent.TimeoutException import scala.concurrent.duration._ import scala.language.implicitConversions +import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp import org.apache.spark.sql.delta.actions.{AddFile, Protocol} import org.apache.spark.sql.delta.sources.{DeltaSourceOffset, DeltaSQLConf} import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -1034,7 +1036,8 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase testQuietly("recreate the reservoir should fail the query") { withTempDir { inputDir => - val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + val tablePath = new Path(inputDir.toURI) + val deltaLog = DeltaLog.forTable(spark, tablePath) withMetadata(deltaLog, StructType.fromDDL("value STRING")) val df = spark.readStream @@ -1049,7 +1052,10 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase StopStream, AssertOnQuery { _ => Utils.deleteRecursively(inputDir) - val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + if (coordinatedCommitsEnabledInTests) { + deleteTableFromCommitCoordinator(tablePath) + } + val deltaLog = DeltaLog.forTable(spark, tablePath) // All Delta tables in tests use the same tableId by default. Here we pass a new tableId // to simulate a new table creation in production withMetadata(deltaLog, StructType.fromDDL("value STRING"), tableId = Some("tableId-1234")) @@ -1447,7 +1453,16 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase val e = intercept[StreamingQueryException] { stream.processAllAvailable() } - assert(e.getCause.isInstanceOf[InvalidProtocolVersionException]) + val cause = e.getCause + val sw = new StringWriter() + cause.printStackTrace(new PrintWriter(sw)) + assert( + cause.isInstanceOf[InvalidProtocolVersionException] || + // When coordinated commits are enabled, the following assertion error coming from + // CoordinatedCommitsUtils.getCommitCoordinatorClient may get hit + (cause.isInstanceOf[AssertionError] && + e.getCause.getMessage.contains("coordinated commits table feature is not supported")), + s"Caused by: ${sw.toString}") } finally { stream.stop() } @@ -1462,8 +1477,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase val rangeStart = startVersion * 10 val rangeEnd = rangeStart + 10 spark.range(rangeStart, rangeEnd).write.format("delta").mode("append").save(location) - val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, startVersion).toUri) - file.setLastModified(ts) + modifyCommitTimestamp(deltaLog, startVersion, ts) startVersion += 1 } } @@ -2599,3 +2613,15 @@ class MonotonicallyIncreasingTimestampFS extends RawLocalFileSystem { object MonotonicallyIncreasingTimestampFS { val scheme = s"MonotonicallyIncreasingTimestampFS" } + +class DeltaSourceWithCoordinatedCommitsBatch1Suite extends DeltaSourceSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1) +} + +class DeltaSourceWithCoordinatedCommitsBatch10Suite extends DeltaSourceSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10) +} + +class DeltaSourceWithCoordinatedCommitsBatch100Suite extends DeltaSourceSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100) +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuiteBase.scala index d60c09608e1..f54ce5dc9f1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuiteBase.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta import java.io.File import org.apache.spark.sql.delta.actions.Format +import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.test.DeltaSQLTestUtils @@ -27,7 +28,8 @@ import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.types.StructType trait DeltaSourceSuiteBase extends StreamTest - with DeltaSQLTestUtils { + with DeltaSQLTestUtils + with CoordinatedCommitsBaseSuite { /** * Creates 3 temporary directories for use within a function. @@ -91,13 +93,21 @@ trait DeltaSourceSuiteBase extends StreamTest val baseMetadata = tableId.map { tId => txn.metadata.copy(id = tId) }.getOrElse(txn.metadata) // We need to fill up the missing id/physical name in column mapping mode // while maintaining existing metadata if there is any - val updatedMetadata = copyOverMetadata( + val updatedSchema = copyOverMetadata( schema, baseMetadata.schema, baseMetadata.columnMappingMode) + // Configure coordinated commits + val updatedConfiguration = if (coordinatedCommitsEnabledInTests) { + baseMetadata.configuration + + (DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> defaultCommitsCoordinatorName) + } else { + baseMetadata.configuration + } txn.commit( DeltaColumnMapping.assignColumnIdAndPhysicalName( baseMetadata.copy( - schemaString = updatedMetadata.json, + schemaString = updatedSchema.json, + configuration = updatedConfiguration, format = Format(format)), baseMetadata, isChangingModeOnExistingTable = false, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala index 4ee653c6a98..4866529e03d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} -import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames} import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.delta.tables.{DeltaTable => IODeltaTable} @@ -452,6 +452,29 @@ object DeltaTestUtils extends DeltaTestUtilsBase { lteq(requirement, actual) } + def modifyCommitTimestamp(deltaLog: DeltaLog, version: Long, ts: Long): Unit = { + val filePath = DeltaCommitFileProvider(deltaLog.update()).deltaFile(version) + val file = new File(filePath.toUri) + InCommitTimestampTestUtils.overwriteICTInDeltaFile( + deltaLog, + new Path(file.getPath), + Some(ts)) + file.setLastModified(ts) + if (FileNames.isUnbackfilledDeltaFile(filePath)) { + // Also change the ICT in the backfilled file if it exists. + val backfilledFilePath = FileNames.unsafeDeltaFile(deltaLog.logPath, version) + val fs = backfilledFilePath.getFileSystem(deltaLog.newDeltaHadoopConf()) + if (fs.exists(backfilledFilePath)) { + InCommitTimestampTestUtils.overwriteICTInDeltaFile(deltaLog, backfilledFilePath, Some(ts)) + } + } + val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) + if (crc.exists()) { + InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(ts)) + crc.setLastModified(ts) + } + } + def withTimeZone(zone: String)(f: => Unit): Unit = { val currentDefault = TimeZone.getDefault try { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala index 7f9f2193c32..ee530a2bd23 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala @@ -23,9 +23,8 @@ import scala.collection.mutable import scala.util.control.NonFatal import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaTestUtilsBase} -import org.apache.spark.sql.delta.DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol} -import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, JsonUtils} import io.delta.storage.LogStore import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, GetCommitsResponse => JGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions} import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} @@ -34,6 +33,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.test.SharedSparkSession trait CoordinatedCommitsTestUtils @@ -175,6 +175,7 @@ trait CoordinatedCommitsTestUtils updatedActions.getOldProtocol ) } + } case class TrackingInMemoryCommitCoordinatorBuilder( @@ -334,11 +335,6 @@ trait CoordinatedCommitsBaseSuite // clean the table data in the commit coordinator. Note that we should call this before // the table actually gets DROP. def deleteTableFromCommitCoordinator(tableName: String): Unit = { - val cc = CommitCoordinatorProvider.getCommitCoordinatorClient( - defaultCommitsCoordinatorName, defaultCommitsCoordinatorConf, spark) - assert( - cc.isInstanceOf[TrackingCommitCoordinatorClient], - s"Please implement delete/drop method for coordinator: ${cc.getClass.getName}") val location = try { spark.sql(s"describe detail $tableName") .select("location") @@ -349,7 +345,17 @@ trait CoordinatedCommitsBaseSuite // Ignore if the table does not exist/broken. return } - val locKey = location.stripPrefix("file:") + deleteTableFromCommitCoordinator(new Path(location)) + } + + def deleteTableFromCommitCoordinator(path: Path): Unit = { + val cc = CommitCoordinatorProvider.getCommitCoordinatorClient( + defaultCommitsCoordinatorName, defaultCommitsCoordinatorConf, spark) + assert( + cc.isInstanceOf[TrackingCommitCoordinatorClient], + s"Please implement delete/drop method for coordinator: ${cc.getClass.getName}") + + val locKey = path.toString.stripPrefix("file:") if (locRefCount.contains(locKey)) { locRefCount(locKey) -= 1 } @@ -357,11 +363,11 @@ trait CoordinatedCommitsBaseSuite // names could be pointing to the same location. We should only clean up the table data in the // commit coordinator when the last table name pointing to the location is dropped. if (locRefCount.getOrElse(locKey, 0) == 0) { - val logPath = location + "/_delta_log" + val logPath = new Path(path, "_delta_log") cc.asInstanceOf[TrackingCommitCoordinatorClient] .delegatingCommitCoordinatorClient .asInstanceOf[InMemoryCommitCoordinator] - .dropTable(new Path(logPath)) + .dropTable(logPath) } DeltaLog.clearCache() } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala index 110ecd66b74..1fb2ed20a39 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala @@ -54,6 +54,11 @@ class DeletionVectorsSuite extends QueryTest with DeltaExcludedBySparkVersionTestMixinShims { import testImplicits._ + override protected def sparkConf: SparkConf = { + super.sparkConf + .set(DeltaSQLConf.TEST_DV_NAME_PREFIX.key, "test%dv%prefix-") + } + override def beforeAll(): Unit = { super.beforeAll() spark.conf.set(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key, "false")