Skip to content

Commit

Permalink
[Spark] Fix path handling in DeltaTableCreationSuite (#3662)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Fix a number of tests in `DeltaTableCreationSuite` that were not
correctly handling paths with special chars. Enable special char
injection in `DeltaTableCreationIdColumnMappingSuite` and
`DeltaTableCreationNameColumnMappingSuite` suites.

## How was this patch tested?

Test-only PR.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
cstavr authored Sep 10, 2024
1 parent 559ca40 commit 81afbe3
Showing 1 changed file with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ trait DeltaTableCreationTests
test("create datasource table with a non-existing location") {
withTempPath { dir =>
withTable("t") {
spark.sql(s"CREATE TABLE t(a int, b int) USING delta LOCATION '${dir.toURI}'")
spark.sql(s"CREATE TABLE t(a int, b int) USING delta LOCATION '${dir.getAbsolutePath}'")

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
Expand All @@ -1225,17 +1225,21 @@ trait DeltaTableCreationTests
withTempPath { dir =>
withTable("t1") {
spark.sql(
s"CREATE TABLE t1(a int, b int) USING delta PARTITIONED BY(a) LOCATION '${dir.toURI}'")
s"""
|CREATE TABLE t1(a int, b int) USING delta PARTITIONED BY(a)
|LOCATION '${dir.getAbsolutePath}'
|""".stripMargin)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

Seq((1, 2)).toDF("a", "b")
.write.format("delta").mode("append").save(table.location.toString)
val read = spark.read.format("delta").load(table.location.toString)
.write.format("delta").mode("append").save(table.location.getPath)
val read = spark.read.format("delta").load(table.location.getPath)
checkAnswer(read, Seq(Row(1, 2)))

val deltaLog = loadDeltaLog(table.location.toString)
val deltaLog = loadDeltaLog(table.location.getPath)
assert(deltaLog.update().version > 0)
assertPartitionWithValueExists("a", "1", deltaLog)
}
}
Expand All @@ -1252,7 +1256,7 @@ trait DeltaTableCreationTests
s"""
|CREATE TABLE t
|USING delta
|LOCATION '${dir.toURI}'
|LOCATION '${dir.getAbsolutePath}'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
Expand All @@ -1262,6 +1266,7 @@ trait DeltaTableCreationTests

// Query the data and the metadata directly via the DeltaLog
val deltaLog = getDeltaLog(table)
assert(deltaLog.update().version >= 0)

assertEqual(deltaLog.snapshot.schema, new StructType()
.add("a", "integer").add("b", "integer")
Expand Down Expand Up @@ -1290,7 +1295,7 @@ trait DeltaTableCreationTests
|CREATE TABLE t1
|USING delta
|PARTITIONED BY(a, b)
|LOCATION '${dir.toURI}'
|LOCATION '${dir.getAbsolutePath}'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
Expand Down Expand Up @@ -1380,12 +1385,12 @@ trait DeltaTableCreationTests
dir.delete()
Seq((3, 4)).toDF("a", "b")
.write.format("delta")
.save(dir.toString)
.save(dir.getAbsolutePath)
val ex = intercept[AnalysisException](spark.sql(
s"""
|CREATE TABLE t
|USING delta
|LOCATION '${dir.toURI}'
|LOCATION '${dir.getAbsolutePath}'
|AS SELECT 1 as a, 2 as b
""".stripMargin))
assert(ex.getMessage.contains("Cannot create table"))
Expand All @@ -1395,14 +1400,12 @@ trait DeltaTableCreationTests
withTable("t") {
withTempDir { dir =>
dir.delete()
Seq((3, 4)).toDF("a", "b")
.write.format("parquet")
.save(dir.toString)
Seq((3, 4)).toDF("a", "b").write.format("parquet").save(dir.getCanonicalPath)
val ex = intercept[AnalysisException](spark.sql(
s"""
|CREATE TABLE t
|USING delta
|LOCATION '${dir.toURI}'
|LOCATION '${dir.getAbsolutePath}'
|AS SELECT 1 as a, 2 as b
""".stripMargin))
assert(ex.getMessage.contains("Cannot create table"))
Expand Down Expand Up @@ -1474,13 +1477,14 @@ trait DeltaTableCreationTests
|CREATE TABLE t(a string, `$specialChars` string)
|USING delta
|PARTITIONED BY(`$specialChars`)
|LOCATION '${dir.toURI}'
|LOCATION '${dir.getAbsolutePath}'
""".stripMargin)

assert(dir.listFiles().forall(_.toString.contains("_delta_log")))
spark.sql(s"INSERT INTO TABLE t SELECT 1, 2")

val deltaLog = loadDeltaLog(dir.toString)
val deltaLog = loadDeltaLog(dir.getAbsolutePath)
assert(deltaLog.update().version > 0)
assertPartitionWithValueExists(specialChars, "2", deltaLog)

checkAnswer(spark.table("t"), Row("1", "2") :: Nil)
Expand Down Expand Up @@ -2428,8 +2432,6 @@ trait DeltaTableCreationColumnMappingSuiteBase extends DeltaColumnMappingSelecte
class DeltaTableCreationIdColumnMappingSuite extends DeltaTableCreationSuite
with DeltaColumnMappingEnableIdMode {

override val defaultTempDirPrefix = "spark"

override protected def getTableProperties(tableName: String): Map[String, String] = {
// ignore comparing column mapping properties
dropColumnMappingConfigurations(super.getTableProperties(tableName))
Expand All @@ -2438,7 +2440,6 @@ class DeltaTableCreationIdColumnMappingSuite extends DeltaTableCreationSuite

class DeltaTableCreationNameColumnMappingSuite extends DeltaTableCreationSuite
with DeltaColumnMappingEnableNameMode {
override val defaultTempDirPrefix = "spark"

override protected def getTableProperties(tableName: String): Map[String, String] = {
// ignore comparing column mapping properties
Expand Down

0 comments on commit 81afbe3

Please sign in to comment.