Skip to content

Commit

Permalink
[spark] PaimonSplitScan supports column pruning and filter push down (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Sep 25, 2024
1 parent 1b09de9 commit 3e87195
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 119 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.types.RowType;

import java.util.List;
import java.util.Map;

/**
* A table to hold some known data splits. For now, it is only used by internal for Spark engine.
*/
public class KnownSplitsTable implements ReadonlyTable {
private final InnerTable origin;
private final DataSplit[] splits;

KnownSplitsTable(InnerTable origin, DataSplit[] splits) {
this.origin = origin;
this.splits = splits;
}

public static KnownSplitsTable create(InnerTable origin, DataSplit[] splits) {
return new KnownSplitsTable(origin, splits);
}

public DataSplit[] splits() {
return splits;
}

@Override
public String name() {
return origin.name();
}

@Override
public RowType rowType() {
return origin.rowType();
}

@Override
public List<String> primaryKeys() {
return origin.primaryKeys();
}

@Override
public List<String> partitionKeys() {
return origin.partitionKeys();
}

@Override
public Map<String, String> options() {
return origin.options();
}

@Override
public InnerTableRead newRead() {
return origin.newRead();
}

// ===== unused method ===========================================

@Override
public InnerTableScan newScan() {
throw new UnsupportedOperationException();
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ case class PaimonScan(
requiredSchema: StructType,
filters: Seq[Predicate],
reservedFilters: Seq[Filter],
pushDownLimit: Option[Int],
override val pushDownLimit: Option[Int],
disableBucketedScan: Boolean = false)
extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, pushDownLimit)
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class PaimonScan(
requiredSchema: StructType,
filters: Seq[Predicate],
reservedFilters: Seq[Filter],
pushDownLimit: Option[Int],
override val pushDownLimit: Option[Int],
disableBucketedScan: Boolean = false)
extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, pushDownLimit)
with SupportsRuntimeFiltering {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark

import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.table.Table
import org.apache.paimon.table.source.ReadBuilder
import org.apache.paimon.types.RowType

import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.types.StructType

trait ColumnPruningAndPushDown extends Scan {
def table: Table
def requiredSchema: StructType
def filters: Seq[Predicate]
def pushDownLimit: Option[Int] = None

val tableRowType: RowType = table.rowType
val tableSchema: StructType = SparkTypeUtils.fromPaimonRowType(tableRowType)

final def partitionType: StructType = {
SparkTypeUtils.toSparkPartitionType(table)
}

private[paimon] val (requiredTableFields, metadataFields) = {
val nameToField = tableSchema.map(field => (field.name, field)).toMap
val _tableFields = requiredSchema.flatMap(field => nameToField.get(field.name))
val _metadataFields =
requiredSchema
.filterNot(field => tableSchema.fieldNames.contains(field.name))
.filter(field => PaimonMetadataColumn.SUPPORTED_METADATA_COLUMNS.contains(field.name))
(_tableFields, _metadataFields)
}

lazy val readBuilder: ReadBuilder = {
val _readBuilder = table.newReadBuilder()
val projection =
requiredTableFields.map(field => tableSchema.fieldNames.indexOf(field.name)).toArray
_readBuilder.withProjection(projection)
if (filters.nonEmpty) {
val pushedPredicate = PredicateBuilder.and(filters: _*)
_readBuilder.withFilter(pushedPredicate)
}
pushDownLimit.foreach(_readBuilder.withLimit)
_readBuilder
}

final def metadataColumns: Seq[PaimonMetadataColumn] = {
metadataFields.map(field => PaimonMetadataColumn.get(field.name, partitionType))
}

override def readSchema(): StructType = {
StructType(requiredTableFields ++ metadataFields)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ package org.apache.paimon.spark

import org.apache.paimon.{stats, CoreOptions}
import org.apache.paimon.annotation.VisibleForTesting
import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.predicate.Predicate
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.sources.PaimonMicroBatchStream
import org.apache.paimon.spark.statistics.StatisticsHelper
import org.apache.paimon.table.{DataTable, FileStoreTable, Table}
import org.apache.paimon.table.source.{InnerTableScan, ReadBuilder, Split}
import org.apache.paimon.types.RowType
import org.apache.paimon.table.source.{InnerTableScan, Split}

import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, SupportsReportStatistics}
Expand All @@ -48,22 +46,9 @@ abstract class PaimonBaseScan(
extends Scan
with SupportsReportStatistics
with ScanHelper
with ColumnPruningAndPushDown
with StatisticsHelper {

val tableRowType: RowType = table.rowType

private lazy val tableSchema = SparkTypeUtils.fromPaimonRowType(tableRowType)

private[paimon] val (requiredTableFields, metadataFields) = {
val nameToField = tableSchema.map(field => (field.name, field)).toMap
val _tableFields = requiredSchema.flatMap(field => nameToField.get(field.name))
val _metadataFields =
requiredSchema
.filterNot(field => tableSchema.fieldNames.contains(field.name))
.filter(field => PaimonMetadataColumn.SUPPORTED_METADATA_COLUMNS.contains(field.name))
(_tableFields, _metadataFields)
}

protected var runtimeFilters: Array[Filter] = Array.empty

protected var inputPartitions: Seq[PaimonInputPartition] = _
Expand All @@ -79,21 +64,6 @@ abstract class PaimonBaseScan(
StructType(tableSchema.filter(field => fieldNames.contains(field.name)))
}

lazy val readBuilder: ReadBuilder = {
val _readBuilder = table.newReadBuilder()

val projection =
requiredTableFields.map(field => tableSchema.fieldNames.indexOf(field.name)).toArray
_readBuilder.withProjection(projection)
if (filters.nonEmpty) {
val pushedPredicate = PredicateBuilder.and(filters: _*)
_readBuilder.withFilter(pushedPredicate)
}
pushDownLimit.foreach(_readBuilder.withLimit)

_readBuilder
}

@VisibleForTesting
def getOriginSplits: Array[Split] = {
readBuilder
Expand All @@ -113,17 +83,7 @@ abstract class PaimonBaseScan(
inputPartitions
}

final def partitionType: StructType = {
SparkTypeUtils.toSparkPartitionType(table)
}

override def readSchema(): StructType = {
StructType(requiredTableFields ++ metadataFields)
}

override def toBatch: Batch = {
val metadataColumns =
metadataFields.map(field => PaimonMetadataColumn.get(field.name, partitionType))
PaimonBatch(lazyInputPartitions, readBuilder, metadataColumns)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ case class PaimonScan(
requiredSchema: StructType,
filters: Seq[Predicate],
reservedFilters: Seq[Filter],
pushDownLimit: Option[Int],
override val pushDownLimit: Option[Int],
bucketedScanDisabled: Boolean = false)
extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, pushDownLimit)
with SupportsRuntimeFiltering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,50 @@
package org.apache.paimon.spark

import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.table.Table
import org.apache.paimon.predicate.Predicate
import org.apache.paimon.table.{KnownSplitsTable, Table}
import org.apache.paimon.table.source.{DataSplit, Split}

import org.apache.spark.sql.connector.read.{Batch, Scan}
import org.apache.spark.sql.types.StructType

class PaimonSplitScanBuilder(table: KnownSplitsTable) extends PaimonBaseScanBuilder(table) {
override def build(): Scan = {
PaimonSplitScan(table, table.splits(), requiredSchema, pushed.map(_._2))
}
}

/** For internal use only. */
case class PaimonSplitScan(
table: Table,
dataSplits: Array[DataSplit],
metadataColumns: Seq[PaimonMetadataColumn] = Seq.empty)
extends Scan
requiredSchema: StructType,
filters: Seq[Predicate])
extends ColumnPruningAndPushDown
with ScanHelper {

override val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())

override def readSchema(): StructType = SparkTypeUtils.fromPaimonRowType(table.rowType())

override def toBatch: Batch = {
PaimonBatch(
getInputPartitions(dataSplits.asInstanceOf[Array[Split]]),
table.newReadBuilder,
readBuilder,
metadataColumns)
}

override def description(): String = {
val pushedFiltersStr = if (filters.nonEmpty) {
", PushedFilters: [" + filters.mkString(",") + "]"
} else {
""
}
s"PaimonSplitScan: [${table.name}]" + pushedFiltersStr
}
}

object PaimonSplitScan {
def apply(table: Table, dataSplits: Array[DataSplit]): PaimonSplitScan = {
new PaimonSplitScan(table, dataSplits)
val requiredSchema = SparkTypeUtils.fromPaimonRowType(table.rowType)
new PaimonSplitScan(table, dataSplits, requiredSchema, Seq.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
import org.apache.paimon.options.Options
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.table.{DataTable, FileStoreTable, Table}
import org.apache.paimon.table.{DataTable, FileStoreTable, KnownSplitsTable, Table}
import org.apache.paimon.utils.StringUtils

import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability, TableCatalog}
Expand Down Expand Up @@ -91,7 +91,12 @@ case class SparkTable(table: Table)
}

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new PaimonScanBuilder(table.copy(options.asCaseSensitiveMap))
table match {
case t: KnownSplitsTable =>
new PaimonSplitScanBuilder(t)
case _ =>
new PaimonScanBuilder(table.copy(options.asCaseSensitiveMap))
}
}

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, BasePredicate, EqualTo, Expression, Literal, Or, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, BasePredicate, Expression, Literal, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down
Loading

0 comments on commit 3e87195

Please sign in to comment.