From 385827516f770941c04794abf49715705948a759 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 12 Feb 2025 15:48:20 +0800 Subject: [PATCH] [core] Fix that AggregateMergeFunction handles multiple sequence fields mistakenly --- .../aggregate/AggregateMergeFunction.java | 5 +++-- .../paimon/flink/BatchFileStoreITCase.java | 20 +++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java index ca380d2778e2..9af786ae6c2a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java @@ -26,6 +26,7 @@ import org.apache.paimon.mergetree.compact.MergeFunctionFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataType; @@ -149,8 +150,8 @@ public MergeFunction create(@Nullable int[][] projection) { private String getAggFuncName(String fieldName, List sequenceFields) { if (sequenceFields.contains(fieldName)) { - // no agg for sequence fields, use last_non_null_value to do cover - return FieldLastNonNullValueAggFactory.NAME; + // no agg for sequence fields, use last_value to do cover + return FieldLastValueAggFactory.NAME; } if (primaryKeys.contains(fieldName)) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 486bfcb69bb0..e92a8a30b4f8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -682,6 +682,26 @@ public void testIncrementTagQueryWithRescaleBucket() throws Exception { } } + @Test + public void testAggregationWithNullSequenceField() { + sql( + "CREATE TABLE test (" + + " pk INT PRIMARY KEY NOT ENFORCED," + + " v STRING," + + " s0 INT," + + " s1 INT" + + ") WITH (" + + " 'merge-engine' = 'aggregation'," + + " 'sequence.field' = 's0,s1')"); + + sql( + "INSERT INTO test VALUES (1, 'A1', CAST (NULL AS INT), 1), (1, 'A2', 1, CAST (NULL AS INT))"); + assertThat(sql("SELECT * FROM test")).containsExactly(Row.of(1, "A2", 1, null)); + + sql("INSERT INTO test VALUES (1, 'A3', 1, 0)"); + assertThat(sql("SELECT * FROM test")).containsExactly(Row.of(1, "A3", 1, 0)); + } + private void validateCount1PushDown(String sql) { Transformation transformation = AbstractTestBase.translate(tEnv, sql); while (!transformation.getInputs().isEmpty()) {