diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java new file mode 100644 index 0000000000000..189769d11cbcd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperator.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * A {@link StreamOperator} for executing a {@link ReduceFunction} on a {@link + * org.apache.flink.streaming.api.datastream.KeyedStream}. + */ +@Internal +public class StreamGroupedReduceAsyncStateOperator + extends AbstractAsyncStateUdfStreamOperator> + implements OneInputStreamOperator { + + private static final long serialVersionUID = 1L; + + private static final String STATE_NAME = "_op_state"; + + private transient ValueState values; + + private final TypeSerializer serializer; + + public StreamGroupedReduceAsyncStateOperator( + ReduceFunction reducer, TypeSerializer serializer) { + super(reducer); + this.serializer = serializer; + } + + @Override + public void open() throws Exception { + super.open(); + ValueStateDescriptor stateId = new ValueStateDescriptor<>(STATE_NAME, serializer); + values = getRuntimeContext().getValueState(stateId); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + IN value = element.getValue(); + values.asyncValue() + .thenAccept( + currentValue -> { + if (currentValue != null) { + IN reduced = userFunction.reduce(currentValue, value); + values.asyncUpdate(reduced) + .thenAccept(e -> output.collect(element.replace(reduced))); + } else { + values.asyncUpdate(value) + .thenAccept(e -> output.collect(element.replace(value))); + } + }); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java index 3b0f7564a7576..10e8fb4c34512 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java @@ -44,6 +44,7 @@ public final class ReduceTransformation extends PhysicalTransformation keySelector; private final TypeInformation keyTypeInfo; private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY; + private boolean isEnableAsyncState; public ReduceTransformation( String name, @@ -100,4 +101,13 @@ protected List> getTransitivePredecessorsInternal() { public List> getInputs() { return Collections.singletonList(input); } + + @Override + public void enableAsyncState() { + isEnableAsyncState = true; + } + + public boolean isEnableAsyncState() { + return isEnableAsyncState; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java index 030e4adcad6ba..00e5f7227b265 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java @@ -22,7 +22,9 @@ import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.operators.BatchGroupedReduceOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamGroupedReduceAsyncStateOperator; import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.transformations.ReduceTransformation; import java.util.Collection; @@ -66,15 +68,24 @@ public Collection translateForBatchInternal( @Override public Collection translateForStreamingInternal( final ReduceTransformation transformation, final Context context) { - StreamGroupedReduceOperator groupedReduce = - new StreamGroupedReduceOperator<>( - transformation.getReducer(), - transformation - .getInputType() - .createSerializer( - context.getStreamGraph() - .getExecutionConfig() - .getSerializerConfig())); + StreamOperator groupedReduce = + transformation.isEnableAsyncState() + ? new StreamGroupedReduceAsyncStateOperator<>( + transformation.getReducer(), + transformation + .getInputType() + .createSerializer( + context.getStreamGraph() + .getExecutionConfig() + .getSerializerConfig())) + : new StreamGroupedReduceOperator<>( + transformation.getReducer(), + transformation + .getInputType() + .createSerializer( + context.getStreamGraph() + .getExecutionConfig() + .getSerializerConfig())); SimpleOperatorFactory operatorFactory = SimpleOperatorFactory.of(groupedReduce); operatorFactory.setChainingStrategy(transformation.getChainingStrategy()); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java index 4391a9a3ef112..22502be8f1802 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; @@ -30,13 +31,18 @@ import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamGroupedReduceAsyncStateOperator; import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator; import org.apache.flink.streaming.util.MockContext; import org.apache.flink.streaming.util.keys.KeySelectorUtil; import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.Serializable; import java.util.ArrayList; @@ -45,10 +51,12 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link AggregationFunction}. */ +@RunWith(Parameterized.class) class AggregationFunctionTest { - @Test - void groupSumIntegerTest() throws Exception { + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) + void groupSumIntegerTest(boolean asyncState) throws Exception { // preparing expected outputs List> expectedGroupSumList = new ArrayList<>(); @@ -99,27 +107,30 @@ void groupSumIntegerTest() throws Exception { List> groupedSumList = MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( sumFunction, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputList(), keySelector, keyType); List> groupedMinList = MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( minFunction, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputList(), keySelector, keyType); List> groupedMaxList = MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( maxFunction, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputList(), keySelector, keyType); @@ -129,8 +140,9 @@ void groupSumIntegerTest() throws Exception { assertThat(groupedMaxList).isEqualTo(expectedGroupMaxList); } - @Test - void pojoGroupSumIntegerTest() throws Exception { + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) + void pojoGroupSumIntegerTest(boolean asyncState) throws Exception { // preparing expected outputs List expectedGroupSumList = new ArrayList<>(); @@ -179,27 +191,30 @@ void pojoGroupSumIntegerTest() throws Exception { List groupedSumList = MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( sumFunction, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputPojoList(), keySelector, keyType); List groupedMinList = MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( minFunction, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputPojoList(), keySelector, keyType); List groupedMaxList = MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( maxFunction, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputPojoList(), keySelector, keyType); @@ -209,8 +224,9 @@ void pojoGroupSumIntegerTest() throws Exception { assertThat(groupedMaxList).isEqualTo(expectedGroupMaxList); } - @Test - void minMaxByTest() throws Exception { + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) + void minMaxByTest(boolean asyncState) throws Exception { // Tuples are grouped on field 0, aggregated on field 1 // preparing expected outputs @@ -285,9 +301,10 @@ void minMaxByTest() throws Exception { assertThat( MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( maxByFunctionFirst, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputByList(), keySelector, keyType)) @@ -295,9 +312,10 @@ void minMaxByTest() throws Exception { assertThat( MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( maxByFunctionLast, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputByList(), keySelector, keyType)) @@ -305,9 +323,10 @@ void minMaxByTest() throws Exception { assertThat( MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( minByFunctionLast, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputByList(), keySelector, keyType)) @@ -315,17 +334,19 @@ void minMaxByTest() throws Exception { assertThat( MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( minByFunctionFirst, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputByList(), keySelector, keyType)) .isEqualTo(minByFirstExpected); } - @Test - void pojoMinMaxByTest() throws Exception { + @ParameterizedTest(name = "Enable async state = {0}") + @ValueSource(booleans = {false, true}) + void pojoMinMaxByTest(boolean asyncState) throws Exception { // Pojos are grouped on field 0, aggregated on field 1 // preparing expected outputs @@ -399,9 +420,10 @@ void pojoMinMaxByTest() throws Exception { assertThat( MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( maxByFunctionFirst, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputByPojoList(), keySelector, keyType)) @@ -409,9 +431,10 @@ void pojoMinMaxByTest() throws Exception { assertThat( MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( maxByFunctionLast, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputByPojoList(), keySelector, keyType)) @@ -419,9 +442,10 @@ void pojoMinMaxByTest() throws Exception { assertThat( MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( minByFunctionLast, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputByPojoList(), keySelector, keyType)) @@ -429,9 +453,10 @@ void pojoMinMaxByTest() throws Exception { assertThat( MockContext.createAndExecuteForKeyedStream( - new StreamGroupedReduceOperator<>( + createOperator( minByFunctionFirst, - typeInfo.createSerializer(config.getSerializerConfig())), + typeInfo.createSerializer(config.getSerializerConfig()), + asyncState), getInputByPojoList(), keySelector, keyType)) @@ -442,6 +467,13 @@ void pojoMinMaxByTest() throws Exception { // UTILS // ************************************************************************* + private OneInputStreamOperator createOperator( + ReduceFunction reducer, TypeSerializer serializer, boolean asyncState) { + return asyncState + ? new StreamGroupedReduceAsyncStateOperator<>(reducer, serializer) + : new StreamGroupedReduceOperator<>(reducer, serializer); + } + private List> getInputList() { ArrayList> inputList = new ArrayList<>(); for (int i = 0; i < 9; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperatorTest.java new file mode 100644 index 0000000000000..0124e7bdf9212 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceAsyncStateOperatorTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link StreamGroupedReduceAsyncStateOperator}. These test that: + * + *
    + *
  • RichFunction methods are called correctly + *
  • Timestamps of processed elements match the input timestamp + *
  • Watermarks are correctly forwarded + *
+ */ +class StreamGroupedReduceAsyncStateOperatorTest { + + @Test + void testGroupedReduce() throws Exception { + + KeySelector keySelector = new IntegerKeySelector(); + + StreamGroupedReduceAsyncStateOperator operator = + new StreamGroupedReduceAsyncStateOperator<>( + new MyReducer(), IntSerializer.INSTANCE); + + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( + operator, keySelector, BasicTypeInfo.INT_TYPE_INFO)) { + + long initialTime = 0L; + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, initialTime + 1)); + testHarness.processElement(new StreamRecord<>(1, initialTime + 2)); + testHarness.processWatermark(new Watermark(initialTime + 2)); + testHarness.processElement(new StreamRecord<>(2, initialTime + 3)); + testHarness.processElement(new StreamRecord<>(2, initialTime + 4)); + testHarness.processElement(new StreamRecord<>(3, initialTime + 5)); + + expectedOutput.add(new StreamRecord<>(1, initialTime + 1)); + expectedOutput.add(new StreamRecord<>(2, initialTime + 2)); + expectedOutput.add(new Watermark(initialTime + 2)); + expectedOutput.add(new StreamRecord<>(2, initialTime + 3)); + expectedOutput.add(new StreamRecord<>(4, initialTime + 4)); + expectedOutput.add(new StreamRecord<>(3, initialTime + 5)); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + } + } + + @Test + void testOpenClose() throws Exception { + + KeySelector keySelector = new IntegerKeySelector(); + + StreamGroupedReduceAsyncStateOperator operator = + new StreamGroupedReduceAsyncStateOperator<>( + new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE); + AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( + operator, keySelector, BasicTypeInfo.INT_TYPE_INFO); + + long initialTime = 0L; + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, initialTime)); + testHarness.processElement(new StreamRecord<>(2, initialTime)); + + testHarness.close(); + + assertThat(TestOpenCloseReduceFunction.openCalled) + .as("RichFunction methods where not called.") + .isTrue(); + assertThat(testHarness.getOutput()).as("Output contains no elements.").isNotEmpty(); + } + + // This must only be used in one test, otherwise the static fields will be changed + // by several tests concurrently + private static class TestOpenCloseReduceFunction extends RichReduceFunction { + private static final long serialVersionUID = 1L; + + public static boolean openCalled = false; + public static boolean closeCalled = false; + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + assertThat(closeCalled).as("Close called before open.").isFalse(); + openCalled = true; + } + + @Override + public void close() throws Exception { + super.close(); + assertThat(openCalled).as("Open was not called before close.").isTrue(); + closeCalled = true; + } + + @Override + public Integer reduce(Integer in1, Integer in2) throws Exception { + assertThat(openCalled).as("Open was not called before run.").isTrue(); + return in1 + in2; + } + } + + // Utilities + + private static class MyReducer implements ReduceFunction { + + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1 + value2; + } + } + + private static class IntegerKeySelector implements KeySelector { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + } + + private static TypeInformation typeInfo = BasicTypeInfo.INT_TYPE_INFO; +} diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockContext.java index 986bc05c19fb8..2980719b77df2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -19,9 +19,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.function.ThrowingConsumer; import java.util.ArrayList; import java.util.Collection; @@ -57,6 +60,7 @@ public static List createAndExecute( return createAndExecuteForKeyedStream(operator, inputs, null, null); } + @SuppressWarnings({"unchecked", "rawtypes"}) public static List createAndExecuteForKeyedStream( OneInputStreamOperator operator, List inputs, @@ -64,8 +68,25 @@ public static List createAndExecuteForKeyedStream( TypeInformation keyType) throws Exception { - OneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType); + ThrowingConsumer, Exception> consumer; + AbstractStreamOperatorTestHarness testHarness; + + if (operator instanceof AbstractAsyncStateStreamOperator) { + testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( + operator, keySelector, keyType); + consumer = + (in) -> + ((AsyncKeyedOneInputStreamOperatorTestHarness) testHarness) + .processElement(in); + } else { + testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType); + consumer = + (in) -> + ((KeyedOneInputStreamOperatorTestHarness) testHarness) + .processElement(in); + } testHarness.setup(); testHarness.open(); @@ -73,7 +94,7 @@ public static List createAndExecuteForKeyedStream( operator.open(); for (IN in : inputs) { - testHarness.processElement(new StreamRecord<>(in)); + consumer.accept(new StreamRecord<>(in)); } testHarness.close(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java index 448eb75bc45f2..fbca7135f3841 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java @@ -62,6 +62,14 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness /** The executor service for async state processing. */ private ExecutorService executor; + public static AsyncKeyedOneInputStreamOperatorTestHarness create( + OneInputStreamOperator operator, + final KeySelector keySelector, + TypeInformation keyType) + throws Exception { + return create(operator, keySelector, keyType, 1, 1, 0); + } + public static AsyncKeyedOneInputStreamOperatorTestHarness create( OneInputStreamOperator operator, final KeySelector keySelector,