Skip to content

Commit

Permalink
[FLINK-37213][table-runtime] Improve performance of unbounded OVER ag…
Browse files Browse the repository at this point in the history
…gregations

Instead of sorting all of the records based on the row time
explicilty use timers to achieve the same thing.

This version vs the previous one register a timer for each
record, as opposed to just one timer per key. However since
we are using RocksDB for timers, this is a minor problem.
In exchange, we:
 - don't have to iterate over all of the state for each timer
 - we are firing timers only when needed, vs for each watermark
   for each key. For example if watermarks are fire every 200ms
   and for a given key, we have only one record that should be
   fired 20s into the future, the previous version would be
   firing a timer for that key for each watermark unnecessarily
   without doing any work.
  • Loading branch information
pnowojski committed Feb 4, 2025
1 parent b1eb140 commit 77edff6
Show file tree
Hide file tree
Showing 17 changed files with 427 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@
<td><p>Enum</p></td>
<td>In order to remap state to operators during a restore, it is required that the pipeline's streaming transformations get a UID assigned.<br />The planner can generate and assign explicit UIDs. If no UIDs have been set by the planner, the UIDs will be auto-generated by lower layers that can take the complete topology into account for uniqueness of the IDs. See the DataStream API for more information.<br />This configuration option is for experts only and the default should be sufficient for most use cases. By default, only pipelines created from a persisted compiled plan will get UIDs assigned explicitly. Thus, these pipelines can be arbitrarily moved around within the same topology without affecting the stable UIDs.<br /><br />Possible values:<ul><li>"PLAN_ONLY": Sets UIDs on streaming transformations if and only if the pipeline definition comes from a compiled plan. Pipelines that have been constructed in the API without a compilation step will not set an explicit UID as it might not be stable across multiple translations.</li><li>"ALWAYS": Always sets UIDs on streaming transformations. This strategy is for experts only! Pipelines that have been constructed in the API without a compilation step might not be able to be restored properly. The UID generation depends on previously declared pipelines (potentially across jobs if the same JVM is used). Thus, a stable environment must be ensured. Pipeline definitions that come from a compiled plan are safe to use.</li><li>"DISABLED": No explicit UIDs will be set.</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.unbounded-over.version</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
<td>Which version of the unbounded over aggregation to use: 1 - legacy version 2 - version with improved performance</td>
</tr>
<tr>
<td><h5>table.exec.window-agg.buffer-size-limit</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">100000</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,6 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>table.optimizer.skewed-join-optimization.strategy</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">auto</td>
<td><p>Enum</p></td>
<td>Flink will handle skew in shuffled joins (sort-merge and hash) at runtime by splitting data according to the skewed join key. The value of this configuration determines how Flink performs this optimization. AUTO means Flink will automatically apply this optimization, FORCED means Flink will enforce this optimization even if it introduces extra hash shuffle, and NONE means this optimization will not be executed.<br /><br />Possible values:<ul><li>"auto": Flink will automatically perform this optimization.</li><li>"forced": Flink will perform this optimization even if it introduces extra hash shuffling.</li><li>"none": Skewed join optimization will not be performed.</li></ul></td>
</tr>
<tr>
<td><h5>table.optimizer.skewed-join-optimization.skewed-factor</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">4.0</td>
<td><p>Double</p></td>
<td>When a join operator instance encounters input data that exceeds N times the median size of other concurrent join operator instances, it is considered skewed (where N represents this skewed-factor). In such cases, Flink may automatically split the skewed data into multiple parts to ensure a more balanced data distribution, unless the data volume is below the skewed threshold(defined using table.optimizer.skewed-join-optimization.skewed-threshold).</td>
</tr>
<tr>
<td><h5>table.optimizer.skewed-join-optimization.skewed-threshold</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">256 mb</td>
<td><p>MemorySize</p></td>
<td>When a join operator instance encounters input data that exceeds N times the median size of other concurrent join operator instances, it is considered skewed (where N represents the table.optimizer.skewed-join-optimization.skewed-factor). In such cases, Flink may automatically split the skewed data into multiple parts to ensure a more balanced data distribution, unless the data volume is below this skewed threshold.</td>
</tr>
<tr>
<td><h5>table.optimizer.adaptive-broadcast-join.strategy</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">auto</td>
Expand Down Expand Up @@ -137,6 +119,24 @@
<td>MemorySize</td>
<td>Min data volume threshold of the runtime filter probe side. Estimated data volume needs to be over this value to try to inject runtime filter.This value should be larger than <code class="highlighter-rouge">table.optimizer.runtime-filter.max-build-data-size</code>.</td>
</tr>
<tr>
<td><h5>table.optimizer.skewed-join-optimization.skewed-factor</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">4.0</td>
<td>Double</td>
<td>When a join operator instance encounters input data that exceeds N times the median size of other concurrent join operator instances, it is considered skewed (where N represents this skewed-factor). In such cases, Flink may automatically split the skewed data into multiple parts to ensure a more balanced data distribution, unless the data volume is below the skewed threshold(defined using table.optimizer.skewed-join-optimization.skewed-threshold).</td>
</tr>
<tr>
<td><h5>table.optimizer.skewed-join-optimization.skewed-threshold</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>When a join operator instance encounters input data that exceeds N times the median size of other concurrent join operator instances, it is considered skewed (where N represents the table.optimizer.skewed-join-optimization.skewed-factor). In such cases, Flink may automatically split the skewed data into multiple parts to ensure a more balanced data distribution, unless the data volume is below this skewed threshold.</td>
</tr>
<tr>
<td><h5>table.optimizer.skewed-join-optimization.strategy</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">auto</td>
<td><p>Enum</p></td>
<td>Flink will handle skew in shuffled joins (sort-merge and hash) at runtime by splitting data according to the skewed join key. The value of this configuration determines how Flink performs this optimization. AUTO means Flink will automatically apply this optimization, FORCED means Flink will enforce this optimization even if it introduces extra hash shuffle, and NONE means this optimization will not be executed.<br /><br />Possible values:<ul><li>"auto": Flink will automatically perform this optimization.</li><li>"forced": Flink will perform this optimization even if it introduces extra hash shuffling.</li><li>"none": Skewed join optimization will not be performed.</li></ul></td>
</tr>
<tr>
<td><h5>table.optimizer.source.report-statistics-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<td><h5>python.executable</h5></td>
<td style="word-wrap: break-word;">"python"</td>
<td>String</td>
<td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam (version >= 2.54.0, <= 2.61.0), Pip (version &gt;= 20.3) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
<td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam (version &gt;= 2.54.0, &lt;= 2.61.0), Pip (version &gt;= 20.3) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
</tr>
<tr>
<td><h5>python.execution-mode</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,16 @@ public class ExecutionConfigOptions {
+ "all changes to downstream just like when the mini-batch is "
+ "not enabled.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Integer> UNBOUNDED_OVER_VERSION =
ConfigOptions.key("table.exec.unbounded-over.version")
.intType()
.defaultValue(2)
.withDescription(
"Which version of the unbounded over aggregation to use: "
+ " 1 - legacy version"
+ " 2 - version with improved performance");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<UidGeneration> TABLE_EXEC_UID_GENERATION =
key("table.exec.uid.generation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
Expand All @@ -48,13 +49,15 @@
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver;
import org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.ProcTimeRowsBoundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.RowTimeRowsBoundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.RowTimeRowsUnboundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.RowTimeUnboundedPrecedingOverFunctionV2;
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
Expand All @@ -70,6 +73,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -98,6 +103,11 @@ public class StreamExecOverAggregate extends ExecNodeBase<RowData>

public static final String FIELD_NAME_OVER_SPEC = "overSpec";

public static final String FIELD_NAME_UNBOUNDED_OVER_VERSION = "unboundedOverVersion";

@JsonProperty(FIELD_NAME_UNBOUNDED_OVER_VERSION)
private final int unboundedOverVersion;

@JsonProperty(FIELD_NAME_OVER_SPEC)
private final OverSpec overSpec;

Expand All @@ -114,7 +124,8 @@ public StreamExecOverAggregate(
overSpec,
Collections.singletonList(inputProperty),
outputType,
description);
description,
tableConfig.get(ExecutionConfigOptions.UNBOUNDED_OVER_VERSION));
}

@JsonCreator
Expand All @@ -125,10 +136,17 @@ public StreamExecOverAggregate(
@JsonProperty(FIELD_NAME_OVER_SPEC) OverSpec overSpec,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
@JsonProperty(FIELD_NAME_DESCRIPTION) String description,
@Nullable @JsonProperty(FIELD_NAME_UNBOUNDED_OVER_VERSION)
Integer unboundedOverVersion) {
super(id, context, persistedConfig, inputProperties, outputType, description);
checkArgument(inputProperties.size() == 1);
this.overSpec = checkNotNull(overSpec);

if (unboundedOverVersion == null) {
unboundedOverVersion = 1;
}
this.unboundedOverVersion = unboundedOverVersion;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -316,24 +334,42 @@ private KeyedProcessFunction<RowData, RowData, RowData> createUnboundedOverProce
.toArray(LogicalType[]::new);

if (rowTimeIdx >= 0) {
if (isRowsClause) {
// ROWS unbounded over process function
return new RowTimeRowsUnboundedPrecedingFunction<>(
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
genAggsHandler,
flattenAccTypes,
fieldTypes,
rowTimeIdx);
} else {
// RANGE unbounded over process function
return new RowTimeRangeUnboundedPrecedingFunction<>(
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
genAggsHandler,
flattenAccTypes,
fieldTypes,
rowTimeIdx);
switch (unboundedOverVersion) {
// Currently there is no migration path between first and second versions.
case AbstractRowTimeUnboundedPrecedingOver.FIRST_OVER_VERSION:
if (isRowsClause) {
// ROWS unbounded over process function
return new RowTimeRowsUnboundedPrecedingFunction<>(
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
genAggsHandler,
flattenAccTypes,
fieldTypes,
rowTimeIdx);
} else {
// RANGE unbounded over process function
return new RowTimeRangeUnboundedPrecedingFunction<>(
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
genAggsHandler,
flattenAccTypes,
fieldTypes,
rowTimeIdx);
}
case RowTimeUnboundedPrecedingOverFunctionV2.SECOND_OVER_VERSION:
return new RowTimeUnboundedPrecedingOverFunctionV2<>(
isRowsClause,
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
genAggsHandler,
flattenAccTypes,
fieldTypes,
rowTimeIdx);
default:
throw new UnsupportedOperationException(
"Unsupported unbounded over version: "
+ unboundedOverVersion
+ ". Valid versions are 1 and 2.");
}
} else {
return new ProcTimeUnboundedPrecedingFunction<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@
"fieldType" : "BIGINT NOT NULL"
} ]
},
"description" : "OverAggregate(partitionBy=[a], orderBy=[t ASC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, t, w0$o0])"
"description" : "OverAggregate(partitionBy=[a], orderBy=[t ASC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, t, w0$o0])",
"unboundedOverVersion" : 1
}, {
"id" : 6,
"type" : "batch-exec-calc_1",
Expand Down Expand Up @@ -438,4 +439,4 @@
},
"shuffleMode" : "PIPELINED"
} ]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@
"fieldType" : "BIGINT NOT NULL"
} ]
},
"description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])"
"description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])",
"unboundedOverVersion" : 1
}, {
"id" : 31,
"type" : "stream-exec-calc_1",
Expand Down Expand Up @@ -588,4 +589,4 @@
},
"shuffleMode" : "PIPELINED"
} ]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@
"fieldType" : "BIGINT NOT NULL"
} ]
},
"description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])"
"description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])",
"unboundedOverVersion" : 1
}, {
"id" : 7,
"type" : "stream-exec-calc_1",
Expand Down Expand Up @@ -588,4 +589,4 @@
},
"shuffleMode" : "PIPELINED"
} ]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@
"fieldType" : "BIGINT NOT NULL"
} ]
},
"description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])"
"description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])",
"unboundedOverVersion" : 1
}, {
"id" : 7,
"type" : "stream-exec-calc_1",
Expand Down Expand Up @@ -588,4 +589,4 @@
},
"shuffleMode" : "PIPELINED"
} ]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@
"fieldType" : "BIGINT NOT NULL"
} ]
},
"description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])"
"description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])",
"unboundedOverVersion" : 1
}, {
"id" : 7,
"type" : "stream-exec-calc_1",
Expand Down Expand Up @@ -588,4 +589,4 @@
},
"shuffleMode" : "PIPELINED"
} ]
}
}
Loading

0 comments on commit 77edff6

Please sign in to comment.