From dbd129d3c7c6a7c39210ab47efc8c1275c7d7ce6 Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Fri, 17 Jan 2025 10:59:37 +0800 Subject: [PATCH] [spark] Introduce SparkV2FilterConverter (#4915) --- .../sql/SparkV2FilterConverterTest.scala | 21 ++ .../sql/SparkV2FilterConverterTest.scala | 21 ++ .../sql/SparkV2FilterConverterTest.scala | 21 ++ .../sql/SparkV2FilterConverterTest.scala | 21 ++ .../paimon/spark/SparkFilterConverter.java | 4 +- .../paimon/spark/SparkV2FilterConverter.scala | 236 ++++++++++++++++++ .../org/apache/spark/sql/PaimonUtils.scala | 16 +- .../paimon/spark/PaimonSparkTestBase.scala | 1 + .../sql/SparkV2FilterConverterTestBase.scala | 224 +++++++++++++++++ 9 files changed, 560 insertions(+), 5 deletions(-) create mode 100644 paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala create mode 100644 paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala create mode 100644 paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala create mode 100644 paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala create mode 100644 paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala new file mode 100644 index 000000000000..21c4c8a495ed --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {} diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala new file mode 100644 index 000000000000..21c4c8a495ed --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala new file mode 100644 index 000000000000..21c4c8a495ed --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {} diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala new file mode 100644 index 000000000000..21c4c8a495ed --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java index 6b9375e5563c..2050c937c6a3 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java @@ -105,10 +105,10 @@ public Predicate convert(Filter filter) { return builder.equal(index, literal); } else if (filter instanceof EqualNullSafe) { EqualNullSafe eq = (EqualNullSafe) filter; + int index = fieldIndex(eq.attribute()); if (eq.value() == null) { - return builder.isNull(fieldIndex(eq.attribute())); + return builder.isNull(index); } else { - int index = fieldIndex(eq.attribute()); Object literal = convertLiteral(index, eq.value()); return builder.equal(index, literal); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala new file mode 100644 index 000000000000..11ef302672e1 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.paimon.data.{BinaryString, Decimal, Timestamp} +import org.apache.paimon.predicate.{Predicate, PredicateBuilder} +import org.apache.paimon.types.{DataTypeRoot, DecimalType, RowType} +import org.apache.paimon.types.DataTypeRoot._ + +import org.apache.spark.sql.connector.expressions.{Literal, NamedReference} +import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or, Predicate => SparkPredicate} + +import scala.collection.JavaConverters._ + +/** Conversion from [[SparkPredicate]] to [[Predicate]]. */ +case class SparkV2FilterConverter(rowType: RowType) { + + import org.apache.paimon.spark.SparkV2FilterConverter._ + + val builder = new PredicateBuilder(rowType) + + def convert(sparkPredicate: SparkPredicate): Predicate = { + sparkPredicate.name() match { + case EQUAL_TO => + BinaryPredicate.unapply(sparkPredicate) match { + case Some((fieldName, literal)) => + // TODO deal with isNaN + val index = fieldIndex(fieldName) + builder.equal(index, convertLiteral(index, literal)) + } + + case EQUAL_NULL_SAFE => + BinaryPredicate.unapply(sparkPredicate) match { + case Some((fieldName, literal)) => + val index = fieldIndex(fieldName) + if (literal == null) { + builder.isNull(index) + } else { + builder.equal(index, convertLiteral(index, literal)) + } + } + + case GREATER_THAN => + BinaryPredicate.unapply(sparkPredicate) match { + case Some((fieldName, literal)) => + val index = fieldIndex(fieldName) + builder.greaterThan(index, convertLiteral(index, literal)) + } + + case GREATER_THAN_OR_EQUAL => + BinaryPredicate.unapply(sparkPredicate) match { + case Some((fieldName, literal)) => + val index = fieldIndex(fieldName) + builder.greaterOrEqual(index, convertLiteral(index, literal)) + } + + case LESS_THAN => + BinaryPredicate.unapply(sparkPredicate) match { + case Some((fieldName, literal)) => + val index = fieldIndex(fieldName) + builder.lessThan(index, convertLiteral(index, literal)) + } + + case LESS_THAN_OR_EQUAL => + BinaryPredicate.unapply(sparkPredicate) match { + case Some((fieldName, literal)) => + val index = fieldIndex(fieldName) + builder.lessOrEqual(index, convertLiteral(index, literal)) + } + + case IN => + MultiPredicate.unapply(sparkPredicate) match { + case Some((fieldName, literals)) => + val index = fieldIndex(fieldName) + literals.map(convertLiteral(index, _)).toList.asJava + builder.in(index, literals.map(convertLiteral(index, _)).toList.asJava) + } + + case IS_NULL => + UnaryPredicate.unapply(sparkPredicate) match { + case Some(fieldName) => + builder.isNull(fieldIndex(fieldName)) + } + + case IS_NOT_NULL => + UnaryPredicate.unapply(sparkPredicate) match { + case Some(fieldName) => + builder.isNotNull(fieldIndex(fieldName)) + } + + case AND => + val and = sparkPredicate.asInstanceOf[And] + PredicateBuilder.and(convert(and.left), convert(and.right())) + + case OR => + val or = sparkPredicate.asInstanceOf[Or] + PredicateBuilder.or(convert(or.left), convert(or.right())) + + case NOT => + val not = sparkPredicate.asInstanceOf[Not] + val negate = convert(not.child()).negate() + if (negate.isPresent) { + negate.get() + } else { + throw new UnsupportedOperationException(s"Convert $sparkPredicate is unsupported.") + } + + case STRING_START_WITH => + BinaryPredicate.unapply(sparkPredicate) match { + case Some((fieldName, literal)) => + val index = fieldIndex(fieldName) + builder.startsWith(index, convertLiteral(index, literal)) + } + + case STRING_END_WITH => + BinaryPredicate.unapply(sparkPredicate) match { + case Some((fieldName, literal)) => + val index = fieldIndex(fieldName) + builder.endsWith(index, convertLiteral(index, literal)) + } + + case STRING_CONTAINS => + BinaryPredicate.unapply(sparkPredicate) match { + case Some((fieldName, literal)) => + val index = fieldIndex(fieldName) + builder.contains(index, convertLiteral(index, literal)) + } + + // TODO: AlwaysTrue, AlwaysFalse + case _ => throw new UnsupportedOperationException(s"Convert $sparkPredicate is unsupported.") + } + } + + private object UnaryPredicate { + def unapply(sparkPredicate: SparkPredicate): Option[String] = { + sparkPredicate.children() match { + case Array(n: NamedReference) => Some(toFieldName(n)) + case _ => None + } + } + } + + private object BinaryPredicate { + def unapply(sparkPredicate: SparkPredicate): Option[(String, Any)] = { + sparkPredicate.children() match { + case Array(l: NamedReference, r: Literal[_]) => Some((toFieldName(l), r.value)) + case Array(l: Literal[_], r: NamedReference) => Some((toFieldName(r), l.value)) + case _ => None + } + } + } + + private object MultiPredicate { + def unapply(sparkPredicate: SparkPredicate): Option[(String, Array[Any])] = { + sparkPredicate.children() match { + case Array(first: NamedReference, rest @ _*) + if rest.nonEmpty && rest.forall(_.isInstanceOf[Literal[_]]) => + Some(toFieldName(first), rest.map(_.asInstanceOf[Literal[_]].value).toArray) + case _ => None + } + } + } + + private def fieldIndex(fieldName: String): Int = { + val index = rowType.getFieldIndex(fieldName) + // TODO: support nested field + if (index == -1) { + throw new UnsupportedOperationException(s"Nested field '$fieldName' is unsupported.") + } + index + } + + private def convertLiteral(index: Int, value: Any): AnyRef = { + if (value == null) { + return null + } + + val dataType = rowType.getTypeAt(index) + dataType.getTypeRoot match { + case BOOLEAN | BIGINT | DOUBLE | TINYINT | SMALLINT | INTEGER | FLOAT | DATE => + value.asInstanceOf[AnyRef] + case DataTypeRoot.VARCHAR => + BinaryString.fromString(value.toString) + case DataTypeRoot.DECIMAL => + val decimalType = dataType.asInstanceOf[DecimalType] + val precision = decimalType.getPrecision + val scale = decimalType.getScale + Decimal.fromBigDecimal( + value.asInstanceOf[org.apache.spark.sql.types.Decimal].toJavaBigDecimal, + precision, + scale) + case DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE | DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE => + Timestamp.fromMicros(value.asInstanceOf[Long]) + case _ => + throw new UnsupportedOperationException( + s"Convert value: $value to datatype: $dataType is unsupported.") + } + } + + private def toFieldName(ref: NamedReference): String = ref.fieldNames().mkString(".") +} + +object SparkV2FilterConverter { + + private val EQUAL_TO = "=" + private val EQUAL_NULL_SAFE = "<=>" + private val GREATER_THAN = ">" + private val GREATER_THAN_OR_EQUAL = ">=" + private val LESS_THAN = "<" + private val LESS_THAN_OR_EQUAL = "<=" + private val IN = "IN" + private val IS_NULL = "IS_NULL" + private val IS_NOT_NULL = "IS_NOT_NULL" + private val AND = "AND" + private val OR = "OR" + private val NOT = "NOT" + private val STRING_START_WITH = "STARTS_WITH" + private val STRING_END_WITH = "ENDS_WITH" + private val STRING_CONTAINS = "CONTAINS" +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala index cc49e787dc81..d01a840f8ece 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala @@ -24,8 +24,10 @@ import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} +import org.apache.spark.sql.connector.expressions.FieldReference +import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.PartitioningUtils @@ -68,8 +70,16 @@ object PaimonUtils { DataSourceStrategy.translateFilter(predicate, supportNestedPredicatePushdown) } - def fieldReference(name: String): NamedReference = { - FieldReference(Seq(name)) + def translateFilterV2(predicate: Expression): Option[Predicate] = { + translateFilterV2WithMapping(predicate, None) + } + + def fieldReference(name: String): FieldReference = { + fieldReference(Seq(name)) + } + + def fieldReference(parts: Seq[String]): FieldReference = { + FieldReference(parts) } def bytesToString(size: Long): String = { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index 9a1647da8140..47f0c5a7d304 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -81,6 +81,7 @@ class PaimonSparkTestBase super.beforeAll() spark.sql(s"USE paimon") spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon.$dbName0") + spark.sql(s"USE paimon.$dbName0") } override protected def afterAll(): Unit = { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala new file mode 100644 index 000000000000..b9cbc29b3aa3 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.data.{BinaryString, Decimal, Timestamp} +import org.apache.paimon.predicate.PredicateBuilder +import org.apache.paimon.spark.{PaimonSparkTestBase, SparkV2FilterConverter} +import org.apache.paimon.types.RowType + +import org.apache.spark.SparkConf +import org.apache.spark.sql.PaimonUtils.translateFilterV2 +import org.apache.spark.sql.catalyst.plans.logical.Filter +import org.apache.spark.sql.connector.expressions.filter.Predicate + +import java.time.{LocalDate, LocalDateTime} + +import scala.collection.JavaConverters._ + +/** Test for [[SparkV2FilterConverter]]. */ +abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase { + + // Add it to disable the automatic generation of the not null filter which impact test. + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.constraintPropagation.enabled", "false") + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + sql(""" + |CREATE TABLE test_tbl ( + | string_col STRING, + | byte_col BYTE, + | short_col SHORT, + | int_col INT, + | long_col LONG, + | float_col FLOAT, + | double_col DOUBLE, + | decimal_col DECIMAL(10, 5), + | boolean_col BOOLEAN, + | date_col DATE, + | binary BINARY + |) USING paimon + |""".stripMargin) + } + + override protected def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS test_tbl") + super.afterAll() + } + + lazy val rowType: RowType = loadTable("test_tbl").rowType() + + lazy val builder = new PredicateBuilder(rowType) + + lazy val converter: SparkV2FilterConverter = SparkV2FilterConverter(rowType) + + test("V2Filter: all types") { + var actual = converter.convert(v2Filter("string_col = 'hello'")) + assert(actual.equals(builder.equal(0, BinaryString.fromString("hello")))) + + actual = converter.convert(v2Filter("byte_col = 1")) + assert(actual.equals(builder.equal(1, 1.toByte))) + + actual = converter.convert(v2Filter("short_col = 1")) + assert(actual.equals(builder.equal(2, 1.toShort))) + + actual = converter.convert(v2Filter("int_col = 1")) + assert(actual.equals(builder.equal(3, 1))) + + actual = converter.convert(v2Filter("long_col = 1")) + assert(actual.equals(builder.equal(4, 1L))) + + actual = converter.convert(v2Filter("float_col = 1.0")) + assert(actual.equals(builder.equal(5, 1.0f))) + + actual = converter.convert(v2Filter("double_col = 1.0")) + assert(actual.equals(builder.equal(6, 1.0d))) + + actual = converter.convert(v2Filter("decimal_col = 12.12345")) + assert( + actual.equals( + builder.equal(7, Decimal.fromBigDecimal(new java.math.BigDecimal("12.12345"), 10, 5)))) + + actual = converter.convert(v2Filter("boolean_col = true")) + assert(actual.equals(builder.equal(8, true))) + + actual = converter.convert(v2Filter("date_col = cast('2025-01-15' as date)")) + val localDate = LocalDate.parse("2025-01-15") + val epochDay = localDate.toEpochDay.toInt + assert(actual.equals(builder.equal(9, epochDay))) + + intercept[UnsupportedOperationException] { + actual = converter.convert(v2Filter("binary = binary('b1')")) + } + } + + test("V2Filter: timestamp and timestamp_ntz") { + withTimeZone("Asia/Shanghai") { + withTable("ts_tbl", "ts_ntz_tbl") { + sql("CREATE TABLE ts_tbl (ts_col TIMESTAMP) USING paimon") + val rowType1 = loadTable("ts_tbl").rowType() + val converter1 = SparkV2FilterConverter(rowType1) + val actual1 = + converter1.convert(v2Filter("ts_col = timestamp'2025-01-15 00:00:00.123'", "ts_tbl")) + assert( + actual1.equals(new PredicateBuilder(rowType1) + .equal(0, Timestamp.fromLocalDateTime(LocalDateTime.parse("2025-01-14T16:00:00.123"))))) + + // Spark support TIMESTAMP_NTZ since Spark 3.4 + if (gteqSpark3_4) { + sql("CREATE TABLE ts_ntz_tbl (ts_ntz_col TIMESTAMP_NTZ) USING paimon") + val rowType2 = loadTable("ts_ntz_tbl").rowType() + val converter2 = SparkV2FilterConverter(rowType2) + val actual2 = converter2.convert( + v2Filter("ts_ntz_col = timestamp_ntz'2025-01-15 00:00:00.123'", "ts_ntz_tbl")) + assert(actual2.equals(new PredicateBuilder(rowType2) + .equal(0, Timestamp.fromLocalDateTime(LocalDateTime.parse("2025-01-15T00:00:00.123"))))) + } + } + } + } + + test("V2Filter: EqualTo") { + val actual = converter.convert(v2Filter("int_col = 1")) + assert(actual.equals(builder.equal(3, 1))) + } + + test("V2Filter: EqualNullSafe") { + var actual = converter.convert(v2Filter("int_col <=> 1")) + assert(actual.equals(builder.equal(3, 1))) + + actual = converter.convert(v2Filter("int_col <=> null")) + assert(actual.equals(builder.isNull(3))) + } + + test("V2Filter: GreaterThan") { + val actual = converter.convert(v2Filter("int_col > 1")) + assert(actual.equals(builder.greaterThan(3, 1))) + } + + test("V2Filter: GreaterThanOrEqual") { + val actual = converter.convert(v2Filter("int_col >= 1")) + assert(actual.equals(builder.greaterOrEqual(3, 1))) + } + + test("V2Filter: LessThan") { + val actual = converter.convert(v2Filter("int_col < 1")) + assert(actual.equals(builder.lessThan(3, 1))) + } + + test("V2Filter: LessThanOrEqual") { + val actual = converter.convert(v2Filter("int_col <= 1")) + assert(actual.equals(builder.lessOrEqual(3, 1))) + } + + test("V2Filter: In") { + val actual = converter.convert(v2Filter("int_col IN (1, 2, 3)")) + assert(actual.equals(builder.in(3, List(1, 2, 3).map(_.asInstanceOf[AnyRef]).asJava))) + } + + test("V2Filter: IsNull") { + val actual = converter.convert(v2Filter("int_col IS NULL")) + assert(actual.equals(builder.isNull(3))) + } + + test("V2Filter: IsNotNull") { + val actual = converter.convert(v2Filter("int_col IS NOT NULL")) + assert(actual.equals(builder.isNotNull(3))) + } + + test("V2Filter: And") { + val actual = converter.convert(v2Filter("int_col > 1 AND int_col < 10")) + assert(actual.equals(PredicateBuilder.and(builder.greaterThan(3, 1), builder.lessThan(3, 10)))) + } + + test("V2Filter: Or") { + val actual = converter.convert(v2Filter("int_col > 1 OR int_col < 10")) + assert(actual.equals(PredicateBuilder.or(builder.greaterThan(3, 1), builder.lessThan(3, 10)))) + } + + test("V2Filter: Not") { + val actual = converter.convert(v2Filter("NOT (int_col > 1)")) + assert(actual.equals(builder.greaterThan(3, 1).negate().get())) + } + + test("V2Filter: StartWith") { + val actual = converter.convert(v2Filter("string_col LIKE 'h%'")) + assert(actual.equals(builder.startsWith(0, BinaryString.fromString("h")))) + } + + test("V2Filter: EndWith") { + val actual = converter.convert(v2Filter("string_col LIKE '%o'")) + assert(actual.equals(builder.endsWith(0, BinaryString.fromString("o")))) + } + + test("V2Filter: Contains") { + val actual = converter.convert(v2Filter("string_col LIKE '%e%'")) + assert(actual.equals(builder.contains(0, BinaryString.fromString("e")))) + } + + private def v2Filter(str: String, tableName: String = "test_tbl"): Predicate = { + val condition = sql(s"SELECT * FROM $tableName WHERE $str").queryExecution.optimizedPlan + .collectFirst { case f: Filter => f } + .get + .condition + translateFilterV2(condition).get + } +}