Skip to content

Commit

Permalink
[core] Check that all fields with aggregate functions in partial-upda…
Browse files Browse the repository at this point in the history
…te should be protected by sequence-group
  • Loading branch information
yuzelin committed Feb 7, 2025
1 parent 50d2316 commit 9a4cdc5
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,9 @@ private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
String aggFuncName = getAggFuncName(options, fieldName);
if (aggFuncName != null) {
checkArgument(
!fieldSeqComparators.isEmpty(),
"Must use sequence group for aggregation functions.");
fieldSeqComparators.containsKey(fieldNames.indexOf(fieldName)),
"Must use sequence group for aggregation functions but not found for field %s.",
fieldName);
fieldAggregators.put(
i,
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
Expand All @@ -31,6 +32,7 @@
import org.junit.jupiter.api.Test;

import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC;
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -820,14 +822,39 @@ public void testMultiSequenceFieldsPartialUpdateWithAggregationProjectPushDown()

@Test
public void testAggregationWithoutSequenceGroup() {
Options options = new Options();
options.set("fields.f1.aggregate-function", "listagg");
RowType rowType = RowType.of(DataTypes.INT(), DataTypes.INT());
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT()
},
new String[] {"pk", "f0", "g0", "f1", "g1"});

Options options1 = new Options();
options1.set("fields.f0.aggregate-function", "listagg");
options1.set("fields.f1.aggregate-function", "listagg");
assertThatThrownBy(
() ->
PartialUpdateMergeFunction.factory(
options, rowType, ImmutableList.of("f0")))
.hasMessageContaining("Must use sequence group for aggregation functions");
options1, rowType, ImmutableList.of("pk")))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Must use sequence group for aggregation functions but not found for field f0."));

Options options2 = new Options(options1.toMap());
options2.set("fields.g0.sequence-group", "f0");
assertThatThrownBy(
() ->
PartialUpdateMergeFunction.factory(
options2, rowType, ImmutableList.of("pk")))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Must use sequence group for aggregation functions but not found for field f1."));
}

private void add(MergeFunction<KeyValue> function, Integer... f) {
Expand Down

0 comments on commit 9a4cdc5

Please sign in to comment.