Skip to content

Commit

Permalink
[Spark] Add configurable expansions to partition-like skipping covera…
Browse files Browse the repository at this point in the history
…ge (#4104)

#### Which Delta project/connector is this regarding?
- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Currently, partition-like skipping is limited to a set of whitelisted
expressions referencing only Liquid clustering columns. This PR adds
configs that can be used to expand these restrictions (allow an
arbitrary expression referencing any column).

## How was this patch tested?
See test changes.

## Does this PR introduce _any_ user-facing changes?
No.
  • Loading branch information
chirag-s-db authored Jan 30, 2025
1 parent 75d6b8e commit 57029e1
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,24 @@ trait DeltaSQLConfBase {
.intConf
.createWithDefault(100)

val DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_CLUSTERING_COLUMNS_ONLY =
buildConf("skipping.partitionLikeDataSkipping.limitToClusteringColumns")
.internal()
.doc("Limits partition-like data skipping to filters referencing only clustering columns" +
"In general, clustering columns will be most likely to produce files with the same" +
"min-max values, though this restriction might exclude filters on columns highly " +
"correlated with the clustering columns.")
.booleanConf
.createWithDefault(true)

val DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_ADDITIONAL_SUPPORTED_EXPRESSIONS =
buildConf("skipping.partitionLikeDataSkipping.additionalSupportedExpressions")
.internal()
.doc("Comma-separated list of the canonical class names of additional expressions for which" +
"partition-like data skipping can be safely applied.")
.stringConf
.createOptional

/**
* The below confs have a special prefix `spark.databricks.io` because this is the conf value
* already used by Databricks' data skipping implementation. There's no benefit to making OSS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,13 @@ trait DataSkippingReaderBase

private def useStats = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING)

private lazy val limitPartitionLikeFiltersToClusteringColumns = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_CLUSTERING_COLUMNS_ONLY)
private lazy val additionalPartitionLikeFilterSupportedExpressions =
spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_ADDITIONAL_SUPPORTED_EXPRESSIONS)
.toSet.flatMap((exprs: String) => exprs.split(","))

/** Returns a DataFrame expression to obtain a list of files with parsed statistics. */
private def withStatsInternal0: DataFrame = {
allFiles.withColumn("stats", from_json(col("stats"), statsSchema))
Expand Down Expand Up @@ -681,7 +688,8 @@ trait DataSkippingReaderBase

// Don't attempt partition-like skipping on any unknown expressions: there's no way to
// guarantee it's safe to do so.
case _ => false
case _ => additionalPartitionLikeFilterSupportedExpressions.contains(
expr.getClass.getCanonicalName)
}

/**
Expand All @@ -700,6 +708,7 @@ trait DataSkippingReaderBase
* CAST(a AS DATE) = '2024-09-11' -> CAST(parsed_stats[minValues][a] AS DATE) = '2024-09-11'
*
* @param expr The expression to rewrite.
* @param clusteringColumnPaths The logical paths to the clustering columns in the table.
* @return If the expression is safe to rewrite, return the rewritten expression and a
* set of referenced attributes (with both the logical path to the column and the
* column type).
Expand All @@ -718,7 +727,8 @@ trait DataSkippingReaderBase
// to have the same min-max values).
case SkippingEligibleColumn(c, SkippingEligibleDataType(dt))
if dt != TimestampType && dt != TimestampNTZType &&
clusteringColumnPaths.exists(SchemaUtils.areLogicalNamesEqual(_, c.reverse)) =>
(!limitPartitionLikeFiltersToClusteringColumns ||
clusteringColumnPaths.exists(SchemaUtils.areLogicalNamesEqual(_, c.reverse))) =>
// Only rewrite the expression if all stats are collected for this column.
val minStatsCol = StatsColumn(MIN, c, dt)
val maxStatsCol = StatsColumn(MAX, c, dt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,57 @@ trait PartitionLikeDataSkippingSuiteBase
minNumFilesToApply = 1)
}
}

test("partition-like skipping can reference non-clustering columns via config") {
withSQLConf(
DeltaSQLConf.DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_CLUSTERING_COLUMNS_ONLY.key ->
"false") {
validateExpectedScanMetrics(
tableName = testTableName,
query = s"SELECT * FROM $testTableName WHERE CAST(e AS STRING) = '1'",
expectedNumFiles = 12,
expectedNumPartitionLikeDataFilters = 1,
allPredicatesUsed = true,
minNumFilesToApply = 1L)
}
}

test("partition-like skipping whitelist can be expanded via config") {
// Single additional supported expression.
withSQLConf(
DeltaSQLConf.DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_ADDITIONAL_SUPPORTED_EXPRESSIONS.key ->
"org.apache.spark.sql.catalyst.expressions.RegExpExtract") {
val query = s"SELECT * FROM $testTableName " +
"WHERE REGEXP_EXTRACT(s.b, '([0-9][0-9][0-9][0-9]).*') = '1971'"
validateExpectedScanMetrics(
tableName = testTableName,
query = query,
expectedNumFiles = 12,
expectedNumPartitionLikeDataFilters = 1,
allPredicatesUsed = true,
minNumFilesToApply = 1L)
}

// Multiple additional supported expressions.
DeltaLog.clearCache() // Clear cache to avoid stale config reads.
withSQLConf(
DeltaSQLConf.DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_ADDITIONAL_SUPPORTED_EXPRESSIONS.key ->
("org.apache.spark.sql.catalyst.expressions.RegExpExtract," +
"org.apache.spark.sql.catalyst.expressions.JsonToStructs")) {
val query = s"""
|SELECT * FROM $testTableName
|WHERE (REGEXP_EXTRACT(s.b, '([0-9][0-9][0-9][0-9]).*') = '1971' OR
|FROM_JSON(CONCAT('{"date":"', STRING(c), '"}'), 'date STRING')['date'] = '1972-03-02')
|""".stripMargin
validateExpectedScanMetrics(
tableName = testTableName,
query = query,
expectedNumFiles = 13,
expectedNumPartitionLikeDataFilters = 1,
allPredicatesUsed = true,
minNumFilesToApply = 1L)
}
}
}

class PartitionLikeDataSkippingSuite extends PartitionLikeDataSkippingSuiteBase
Expand Down

0 comments on commit 57029e1

Please sign in to comment.