Skip to content

Commit

Permalink
[Spark] Allows data with missing columns to be written when enable 'm…
Browse files Browse the repository at this point in the history
…erge-schema' (apache#5059)
  • Loading branch information
YannByron authored and 李鹏程 committed Feb 14, 2025
1 parent fb9f6cf commit 684102e
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, TypeUtils}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, PaimonUtils, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.functions.{col, lit}

import scala.collection.JavaConverters._

/** Used to write a [[DataFrame]] into a paimon table. */
case class WriteIntoPaimonTable(
override val originTable: FileStoreTable,
saveMode: SaveMode,
data: DataFrame,
_data: DataFrame,
options: Options)
extends RunnableCommand
with PaimonCommand
Expand All @@ -49,10 +50,25 @@ case class WriteIntoPaimonTable(
private lazy val mergeSchema = options.get(SparkConnectorOptions.MERGE_SCHEMA)

override def run(sparkSession: SparkSession): Seq[Row] = {
var data = _data
if (mergeSchema) {
val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema)
val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
mergeAndCommitSchema(dataSchema, allowExplicitCast)

// For case that some columns is absent in data, we still allow to write once write.merge-schema is true.
val newTableSchema = SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {
val resolve = sparkSession.sessionState.conf.resolver
val cols = newTableSchema.map {
field =>
dataSchema.find(f => resolve(f.name, field.name)) match {
case Some(f) => col(f.name)
case _ => lit(null).as(field.name)
}
}
data = data.select(cols: _*)
}
}

val (dynamicPartitionOverwriteMode, overwritePartition) = parseSaveMode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
import org.apache.spark.sql.internal.connector.PredicateUtils
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.PartitioningUtils
import org.apache.spark.util.{Utils => SparkUtils}

Expand Down Expand Up @@ -121,4 +121,8 @@ object PaimonUtils {
partitionColumnNames: Seq[String]): Unit = {
PartitioningUtils.requireExactMatchedPartitionSpec(tableName, spec, partitionColumnNames)
}

def sameType(left: DataType, right: DataType): Boolean = {
left.sameType(right)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -506,4 +506,50 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
}
}
}

test("Paimon Schema Evolution: some columns is absent in the coming data") {

spark.sql(s"""
|CREATE TABLE T (a INT, b STRING)
|""".stripMargin)

val paimonTable = loadTable("T")
val location = paimonTable.location().toString

val df1 = Seq((1, "2023-08-01"), (2, "2023-08-02")).toDF("a", "b")
df1.write.format("paimon").mode("append").save(location)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, "2023-08-01") :: Row(2, "2023-08-02") :: Nil)

// Case 1: two additional fields: DoubleType and TimestampType
val ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0")
val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03", 34.5d, ts))
.toDF("a", "b", "c", "d")
df2.write
.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.save(location)

// Case 2: colum b and d are absent in the coming data
val df3 = Seq((4, 45.6d), (5, 56.7d))
.toDF("a", "c")
df3.write
.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.save(location)
val expected3 =
Row(1, "2023-08-01", null, null) :: Row(1, "2023-08-01", 12.3d, ts) :: Row(
2,
"2023-08-02",
null,
null) :: Row(3, "2023-08-03", 34.5d, ts) :: Row(4, null, 45.6d, null) :: Row(
5,
null,
56.7d,
null) :: Nil
checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected3)
}
}

0 comments on commit 684102e

Please sign in to comment.