Skip to content

Commit

Permalink
[core] Introduce nested-update agg to support aggregate nested table (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Dec 26, 2023
1 parent accd4da commit 53b8534
Show file tree
Hide file tree
Showing 8 changed files with 512 additions and 7 deletions.
69 changes: 69 additions & 0 deletions docs/content/concepts/primary-key-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,75 @@ Current supported aggregate functions and data types are:
The first_not_null_value function selects the first non-null value in a data set.
It supports all data types.

* `nested-update`:
The nested-update function collects multiple rows of a primary-key table into one single row
(so-called 'nested table'). Use `fields.<field-name>.nested-keys=pk0;pk1;...` to specify the
primary keys of the nested table. It supports ARRAY<ROW> data types.

An example:

{{< tabs "nested-update-example" >}}

{{< tab "Flink" >}}

```sql
-- orders table
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY NOT ENFORCED,
user_name STRING,
address STRING
);

-- sub orders that have the same order_id
-- belongs to the same order
CREATE TABLE sub_orders (
order_id BIGINT,
sub_order_id INT,
product_name STRING,
price BIGINT,
PRIMARY KEY (order_id, sub_order_id) NOT ENFORCED
);

-- wide table
CREATE TABLE order_wide (
order_id BIGINT PRIMARY KEY NOT ENFORCED,
user_name STRING,
address STRING,
sub_orders ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>
) WITH (
'merge-engine' = 'aggregation',
'fields.sub_orders.aggregate-function' = 'nested-update',
'fields.sub_orders.nested-keys' = 'sub_order_id'
);

-- widen
INSERT INTO order_wide

SELECT
order_id,
user_name,
address,
CAST (NULL AS ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>)
FROM orders

UNION ALL

SELECT
order_id,
CAST (NULL AS STRING),
CAST (NULL AS STRING),
ARRAY[ROW(sub_order_id, product_name, price)]
FROM sub_orders;

-- query using UNNEST
SELECT order_id, user_name, address, sub_order_id, product_name, price
FROM order_wide, UNNEST(sub_orders) AS so(sub_order_id, product_name, price)
```

{{< /tab >}}

{{< /tabs >}}

Only `sum` and `product` supports retraction (`UPDATE_BEFORE` and `DELETE`), others aggregate functions do not support retraction.
If you allow some functions to ignore retraction messages, you can configure:
`'fields.${field_name}.ignore-retract'='true'`.
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class CoreOptions implements Serializable {

public static final String IGNORE_RETRACT = "ignore-retract";

public static final String NESTED_KEYS = "nested-keys";

public static final ConfigOption<Integer> BUCKET =
key("bucket")
.intType()
Expand Down Expand Up @@ -1093,6 +1095,14 @@ public boolean fieldAggIgnoreRetract(String fieldName) {
.defaultValue(false));
}

public List<String> fieldNestedUpdateAggNestedKeys(String fieldName) {
return options.get(
key(FIELDS_PREFIX + "." + fieldName + "." + NESTED_KEYS)
.stringType()
.asList()
.noDefaultValue());
}

public String fileCompression() {
return options.get(FILE_COMPRESSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldNestedUpdateAgg;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.DataType;
Expand All @@ -44,6 +45,7 @@

import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
Expand Down Expand Up @@ -373,10 +375,21 @@ private Map<Integer, FieldAggregator> createFieldAggregators(
boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName);

if (strAggFunc != null) {
fieldAggregators.put(
i,
FieldAggregator.createFieldAggregator(
fieldType, strAggFunc, ignoreRetract, isPrimaryKey));
if (FieldAggregator.isNestedUpdateAgg(strAggFunc)) {
List<String> nestedKeys = options.fieldNestedUpdateAggNestedKeys(fieldName);
checkArgument(
nestedKeys != null && !nestedKeys.isEmpty(),
"Must set nested keys when using " + FieldNestedUpdateAgg.NAME);
fieldAggregators.put(
i,
FieldAggregator.createFieldNestedUpdateAgg(
fieldType, nestedKeys, ignoreRetract));
} else {
fieldAggregators.put(
i,
FieldAggregator.createFieldAggregator(
fieldType, strAggFunc, ignoreRetract, isPrimaryKey));
}
}
}
return fieldAggregators;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;

import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -140,9 +141,19 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
String strAggFunc = options.fieldAggFunc(fieldName);
boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName);

fieldAggregators[i] =
FieldAggregator.createFieldAggregator(
fieldType, strAggFunc, ignoreRetract, isPrimaryKey);
if (FieldAggregator.isNestedUpdateAgg(strAggFunc)) {
List<String> nestedKeys = options.fieldNestedUpdateAggNestedKeys(fieldName);
checkArgument(
nestedKeys != null && !nestedKeys.isEmpty(),
"Must set nested keys when using " + FieldNestedUpdateAgg.NAME);
fieldAggregators[i] =
FieldAggregator.createFieldNestedUpdateAgg(
fieldType, nestedKeys, ignoreRetract);
} else {
fieldAggregators[i] =
FieldAggregator.createFieldAggregator(
fieldType, strAggFunc, ignoreRetract, isPrimaryKey);
}
}

return new AggregateMergeFunction(createFieldGetters(fieldTypes), fieldAggregators);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@

package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.List;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** abstract class of aggregating a field of a row. */
public abstract class FieldAggregator implements Serializable {
Expand Down Expand Up @@ -99,6 +104,21 @@ public static FieldAggregator createFieldAggregator(
return fieldAggregator;
}

public static boolean isNestedUpdateAgg(@Nullable String strAggFunc) {
return FieldNestedUpdateAgg.NAME.equals(strAggFunc);
}

public static FieldAggregator createFieldNestedUpdateAgg(
DataType fieldType, List<String> nestedKeys, boolean ignoreRetract) {
String typeErrorMsg = "Data type of nested table column must be 'Array<Row>' but was '%s'.";
checkArgument(fieldType instanceof ArrayType, typeErrorMsg, fieldType);
ArrayType arrayType = (ArrayType) fieldType;
checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType);

FieldNestedUpdateAgg agg = new FieldNestedUpdateAgg(arrayType, nestedKeys);
return ignoreRetract ? new FieldIgnoreRetractAgg(agg) : agg;
}

abstract String name();

public abstract Object agg(Object accumulator, Object inputField);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* Used to update a field which representing a nested table. The data type of nested table field is
* {@code ARRAY<ROW>}.
*/
public class FieldNestedUpdateAgg extends FieldAggregator {

public static final String NAME = "nested-update";

private final BiFunction<InternalArray, Integer, InternalRow> rowGetter;
private final List<InternalRow.FieldGetter> keyGetters;

public FieldNestedUpdateAgg(ArrayType dataType, List<String> nestedKeys) {
super(dataType);
RowType rowType = (RowType) dataType.getElementType();

InternalArray.ElementGetter objectGetter = InternalArray.createElementGetter(rowType);
this.rowGetter = (array, pos) -> (InternalRow) objectGetter.getElementOrNull(array, pos);

this.keyGetters = new ArrayList<>(nestedKeys.size());
List<DataField> dataFields = rowType.getFields();
for (int i = 0; i < dataFields.size(); i++) {
DataField dataField = dataFields.get(i);
if (nestedKeys.contains(dataField.name())) {
keyGetters.add(InternalRow.createFieldGetter(dataField.type(), i));
}
}
checkArgument(keyGetters.size() == nestedKeys.size(), "You have set wrong nested keys.");
}

@Override
String name() {
return NAME;
}

@Override
public Object agg(Object accumulator, Object inputField) {
if (accumulator == null || inputField == null) {
return accumulator == null ? inputField : accumulator;
}

Map<List<Object>, InternalRow> unnestedAcc = unnest(accumulator);
InternalArray inputs = (InternalArray) inputField;

for (int i = 0; i < inputs.size(); i++) {
InternalRow row = rowGetter.apply(inputs, i);
List<Object> keys = getKeys(row);
// update by nested keys
unnestedAcc.put(keys, row);
}

return new GenericArray(unnestedAcc.values().toArray());
}

private Map<List<Object>, InternalRow> unnest(@Nullable Object accumulator) {
Map<List<Object>, InternalRow> unnested = new HashMap<>();
if (accumulator != null) {
InternalArray array = (InternalArray) accumulator;
for (int i = 0; i < array.size(); i++) {
InternalRow row = rowGetter.apply(array, i);
List<Object> keys = getKeys(row);
unnested.put(keys, row);
}
}

return unnested;
}

private List<Object> getKeys(InternalRow row) {
return keyGetters.stream().map(g -> g.getFieldOrNull(row)).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.FloatType;
Expand All @@ -33,6 +38,11 @@
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -274,4 +284,47 @@ private static Decimal toDecimal(int i) {
return Decimal.fromBigDecimal(
new BigDecimal(i), DecimalType.DEFAULT_PRECISION, DecimalType.DEFAULT_SCALE);
}

@Test
public void testFieldNestedUpdateAgg() {
FieldNestedUpdateAgg agg =
new FieldNestedUpdateAgg(
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD(0, "k0", DataTypes.INT()),
DataTypes.FIELD(1, "k1", DataTypes.INT()),
DataTypes.FIELD(2, "v", DataTypes.STRING()))),
Arrays.asList("k0", "k1"));

InternalArray accumulator;

InternalRow current = row(0, 0, "A");
accumulator = (InternalArray) agg.agg(null, singletonArray(current));
assertThat(unnest(accumulator))
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(current));

current = row(0, 1, "B");
accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current));
assertThat(unnest(accumulator))
.containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 0, "A"), row(0, 1, "B")));

current = row(0, 1, "b");
accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current));
assertThat(unnest(accumulator))
.containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 0, "A"), row(0, 1, "b")));
}

private List<InternalRow> unnest(InternalArray array) {
return IntStream.range(0, array.size())
.mapToObj(i -> array.getRow(i, 3))
.collect(Collectors.toList());
}

private GenericArray singletonArray(InternalRow row) {
return new GenericArray(new InternalRow[] {row});
}

private InternalRow row(int k0, int k1, String v) {
return GenericRow.of(k0, k1, BinaryString.fromString(v));
}
}
Loading

0 comments on commit 53b8534

Please sign in to comment.