diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java new file mode 100644 index 000000000000..c57ade157274 --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +/** Utility methods about Flink operator parallelism to resolve compatibility issues. */ +public class ParallelismUtils { + /** + * Configures the parallelism of the target stream to be the same as the source stream. In Flink + * 1.17+, this method will also configure {@link Transformation#isParallelismConfigured()}. + */ + public static void forwardParallelism( + SingleOutputStreamOperator targetStream, DataStream sourceStream) { + targetStream.setParallelism(sourceStream.getParallelism()); + } + + public static void setParallelism( + SingleOutputStreamOperator targetStream, + int parallelism, + boolean parallelismConfigured) { + targetStream.setParallelism(parallelism); + } +} diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java new file mode 100644 index 000000000000..c57ade157274 --- /dev/null +++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +/** Utility methods about Flink operator parallelism to resolve compatibility issues. */ +public class ParallelismUtils { + /** + * Configures the parallelism of the target stream to be the same as the source stream. In Flink + * 1.17+, this method will also configure {@link Transformation#isParallelismConfigured()}. + */ + public static void forwardParallelism( + SingleOutputStreamOperator targetStream, DataStream sourceStream) { + targetStream.setParallelism(sourceStream.getParallelism()); + } + + public static void setParallelism( + SingleOutputStreamOperator targetStream, + int parallelism, + boolean parallelismConfigured) { + targetStream.setParallelism(parallelism); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java index 5c27db6ddf1b..5dc69b4349f0 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java @@ -35,6 +35,7 @@ import javax.annotation.Nullable; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; +import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism; /** * Builder for sink when syncing records into one Paimon table. @@ -99,8 +100,8 @@ public DataStreamSink build() { SingleOutputStreamOperator parsed = input.forward() .process(new CdcParsingProcessFunction<>(parserFactory)) - .name("Side Output") - .setParallelism(input.getParallelism()); + .name("Side Output"); + forwardParallelism(parsed, input); DataStream schemaChangeProcessFunction = SingleOutputStreamOperatorUtils.getSideOutput( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 4cd9235cb58a..a6c1eb2373f2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -49,6 +49,7 @@ import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration; import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter; +import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism; /** * A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema change if necessary. @@ -109,10 +110,8 @@ public DataStreamSink sinkFrom( MultiTableCommittableTypeInfo typeInfo = new MultiTableCommittableTypeInfo(); SingleOutputStreamOperator written = input.transform( - WRITER_NAME, - typeInfo, - createWriteOperator(sinkProvider, commitUser)) - .setParallelism(input.getParallelism()); + WRITER_NAME, typeInfo, createWriteOperator(sinkProvider, commitUser)); + forwardParallelism(written, input); // shuffle committables by table DataStream partitioned = @@ -122,17 +121,16 @@ public DataStreamSink sinkFrom( input.getParallelism()); SingleOutputStreamOperator committed = - partitioned - .transform( - GLOBAL_COMMITTER_NAME, - typeInfo, - new CommitterOperatorFactory<>( - true, - false, - commitUser, - createCommitterFactory(), - createCommittableStateManager())) - .setParallelism(input.getParallelism()); + partitioned.transform( + GLOBAL_COMMITTER_NAME, + typeInfo, + new CommitterOperatorFactory<>( + true, + false, + commitUser, + createCommitterFactory(), + createCommittableStateManager())); + forwardParallelism(committed, input); configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory); return committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index bd18c7e7ad82..036b040993ae 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -43,6 +43,7 @@ import static org.apache.paimon.CoreOptions.createCommitUser; import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; +import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism; /** * Builder for CDC {@link FlinkWriteSink} when syncing the whole database into one Paimon database. @@ -140,8 +141,8 @@ private void buildCombinedCdcSink() { .process( new CdcDynamicTableParsingProcessFunction<>( database, catalogLoader, parserFactory)) - .name("Side Output") - .setParallelism(input.getParallelism()); + .name("Side Output"); + forwardParallelism(parsed, input); // for newly-added tables, create a multiplexing operator that handles all their records // and writes to multiple tables @@ -188,8 +189,8 @@ private void buildDividedCdcSink() { SingleOutputStreamOperator parsed = input.forward() .process(new CdcMultiTableParsingProcessFunction<>(parserFactory)) - .name("Side Output") - .setParallelism(input.getParallelism()); + .name("Side Output"); + forwardParallelism(parsed, input); for (FileStoreTable table : tables) { DataStream schemaChangeProcessFunction = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java index 8760f1dc5f80..9e069324654e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java @@ -111,7 +111,8 @@ public static DataStream> rangeShuffleByKey( "ABSTRACT KEY AND SIZE", new StreamMap<>(new KeyAndSizeExtractor<>(valueRowType, isSortBySize)), new TupleTypeInfo<>(keyTypeInformation, BasicTypeInfo.INT_TYPE_INFO), - input.getParallelism()); + input.getParallelism(), + input.isParallelismConfigured()); // 1. Fixed size sample in each partitions. OneInputTransformation, Tuple3> localSample = @@ -123,7 +124,8 @@ public static DataStream> rangeShuffleByKey( BasicTypeInfo.DOUBLE_TYPE_INFO, keyTypeInformation, BasicTypeInfo.INT_TYPE_INFO), - keyInput.getParallelism()); + keyInput.getParallelism(), + keyInput.isParallelismConfigured()); // 2. Collect all the samples and gather them into a sorted key range. OneInputTransformation, List> sampleAndHistogram = @@ -132,7 +134,8 @@ public static DataStream> rangeShuffleByKey( "GLOBAL SAMPLE", new GlobalSampleOperator<>(globalSampleSize, keyComparator, rangeNum), new ListTypeInfo<>(keyTypeInformation), - 1); + 1, + true); // 3. Take range boundaries as broadcast input and take the tuple of partition id and // record as output. @@ -153,7 +156,8 @@ public static DataStream> rangeShuffleByKey( new AssignRangeIndexOperator<>(keyComparator), new TupleTypeInfo<>( BasicTypeInfo.INT_TYPE_INFO, input.getOutputType()), - input.getParallelism()); + input.getParallelism(), + input.isParallelismConfigured()); // 4. Remove the partition id. (shuffle according range partition) return new DataStream<>( @@ -168,7 +172,8 @@ public static DataStream> rangeShuffleByKey( "REMOVE RANGE INDEX", new RemoveRangeIndexOperator<>(), input.getOutputType(), - outParallelism)); + outParallelism, + true)); } /** KeyAndSizeExtractor is responsible for extracting the sort key and row size. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index 1106add7d3fa..5d94c6fa2ce2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -47,6 +47,7 @@ import static org.apache.paimon.flink.sink.FlinkSink.assertBatchAdaptiveParallelism; import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration; import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory; +import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism; /** A sink for processing multi-tables in dedicated compaction job. */ public class CombinedTableCompactorSink implements Serializable { @@ -103,25 +104,23 @@ public DataStream doWrite( == RuntimeExecutionMode.STREAMING; SingleOutputStreamOperator multiBucketTableRewriter = - awareBucketTableSource - .transform( - String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME), - new MultiTableCommittableTypeInfo(), - combinedMultiCompactionWriteOperator( - env.getCheckpointConfig(), - isStreaming, - fullCompaction, - commitUser)) - .setParallelism(awareBucketTableSource.getParallelism()); + awareBucketTableSource.transform( + String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME), + new MultiTableCommittableTypeInfo(), + combinedMultiCompactionWriteOperator( + env.getCheckpointConfig(), + isStreaming, + fullCompaction, + commitUser)); + forwardParallelism(multiBucketTableRewriter, awareBucketTableSource); SingleOutputStreamOperator unawareBucketTableRewriter = - unawareBucketTableSource - .transform( - String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME), - new MultiTableCommittableTypeInfo(), - new AppendOnlyMultiTableCompactionWorkerOperator.Factory( - catalogLoader, commitUser, options)) - .setParallelism(unawareBucketTableSource.getParallelism()); + unawareBucketTableSource.transform( + String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME), + new MultiTableCommittableTypeInfo(), + new AppendOnlyMultiTableCompactionWorkerOperator.Factory( + catalogLoader, commitUser, options)); + forwardParallelism(unawareBucketTableRewriter, unawareBucketTableSource); if (!isStreaming) { assertBatchAdaptiveParallelism(env, multiBucketTableRewriter.getParallelism()); @@ -156,18 +155,17 @@ protected DataStreamSink doCommit( new MultiTableCommittableChannelComputer(), written.getParallelism()); SingleOutputStreamOperator committed = - partitioned - .transform( - GLOBAL_COMMITTER_NAME, - new MultiTableCommittableTypeInfo(), - new CommitterOperatorFactory<>( - streamingCheckpointEnabled, - false, - commitUser, - createCommitterFactory(isStreaming), - createCommittableStateManager(), - options.get(END_INPUT_WATERMARK))) - .setParallelism(written.getParallelism()); + partitioned.transform( + GLOBAL_COMMITTER_NAME, + new MultiTableCommittableTypeInfo(), + new CommitterOperatorFactory<>( + streamingCheckpointEnabled, + false, + commitUser, + createCommitterFactory(isStreaming), + createCommittableStateManager(), + options.get(END_INPUT_WATERMARK))); + forwardParallelism(committed, written); if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) { committed = committed.startNewChain(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java index f9dbd19741ce..28ee182020bf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java @@ -26,12 +26,14 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import javax.annotation.Nullable; import java.util.Map; import static org.apache.paimon.CoreOptions.createCommitUser; +import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism; /** This class is only used for generate compact sink topology for dynamic bucket table. */ public class DynamicBucketCompactSink extends RowDynamicBucketSink { @@ -54,9 +56,9 @@ public DataStreamSink build(DataStream input, @Nullable Integer initialCommitUser, table, null, extractorFunction(), true); TupleTypeInfo> rowWithBucketType = new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO); - DataStream> bucketAssigned = - input.transform("dynamic-bucket-assigner", rowWithBucketType, assignerOperator) - .setParallelism(input.getParallelism()); + SingleOutputStreamOperator> bucketAssigned = + input.transform("dynamic-bucket-assigner", rowWithBucketType, assignerOperator); + forwardParallelism(bucketAssigned, input); return sinkFrom(bucketAssigned, initialCommitUser); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java index c2299a7e8699..a4ed77207911 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java @@ -41,6 +41,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX; import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; +import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism; /** Sink for dynamic bucket table. */ public abstract class DynamicBucketSink extends FlinkWriteSink> { @@ -95,10 +96,9 @@ public DataStreamSink build(DataStream input, @Nullable Integer parallelis TupleTypeInfo> rowWithBucketType = new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO); SingleOutputStreamOperator> bucketAssigned = - partitionByKeyHash - .transform( - DYNAMIC_BUCKET_ASSIGNER_NAME, rowWithBucketType, assignerOperator) - .setParallelism(partitionByKeyHash.getParallelism()); + partitionByKeyHash.transform( + DYNAMIC_BUCKET_ASSIGNER_NAME, rowWithBucketType, assignerOperator); + forwardParallelism(bucketAssigned, partitionByKeyHash); String uidSuffix = table.options().get(SINK_OPERATOR_UID_SUFFIX.key()); if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 4cd085883d33..209872985846 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -69,6 +69,8 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY; import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid; import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory; +import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism; +import static org.apache.paimon.flink.utils.ParallelismUtils.setParallelism; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Abstract sink of paimon. */ @@ -182,7 +184,7 @@ public DataStreamSink sinkFrom(DataStream input) { public DataStreamSink sinkFrom(DataStream input, String initialCommitUser) { // do the actually writing action, no snapshot generated in this stage - DataStream written = doWrite(input, initialCommitUser, input.getParallelism()); + DataStream written = doWrite(input, initialCommitUser, null); // commit the committable to generate a new snapshot return doCommit(written, initialCommitUser); } @@ -216,17 +218,19 @@ public DataStream doWrite( boolean writeOnly = table.coreOptions().writeOnly(); SingleOutputStreamOperator written = input.transform( - (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) - + " : " - + table.name(), - new CommittableTypeInfo(), - createWriteOperatorFactory( - createWriteProvider( - env.getCheckpointConfig(), - isStreaming, - hasSinkMaterializer(input)), - commitUser)) - .setParallelism(parallelism == null ? input.getParallelism() : parallelism); + (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " : " + table.name(), + new CommittableTypeInfo(), + createWriteOperatorFactory( + createWriteProvider( + env.getCheckpointConfig(), + isStreaming, + hasSinkMaterializer(input)), + commitUser)); + if (parallelism == null) { + setParallelism(written, input.getParallelism(), false); + } else { + written.setParallelism(parallelism); + } Options options = Options.fromMap(table.options()); @@ -240,7 +244,7 @@ public DataStream doWrite( } if (options.get(PRECOMMIT_COMPACT)) { - written = + SingleOutputStreamOperator newWritten = written.transform( "Changelog Compact Coordinator", new EitherTypeInfo<>( @@ -250,8 +254,9 @@ public DataStream doWrite( .transform( "Changelog Compact Worker", new CommittableTypeInfo(), - new ChangelogCompactWorkerOperator(table)) - .setParallelism(written.getParallelism()); + new ChangelogCompactWorkerOperator(table)); + forwardParallelism(newWritten, written); + written = newWritten; } return written; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 4fd0edf6f058..d99a7d818f58 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -59,6 +60,8 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR; import static org.apache.paimon.flink.sink.FlinkSink.isStreaming; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; +import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism; +import static org.apache.paimon.flink.utils.ParallelismUtils.setParallelism; import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; @@ -102,13 +105,14 @@ public FlinkSinkBuilder forRow(DataStream input, DataType rowDataType) { DataFormatConverters.RowConverter converter = new DataFormatConverters.RowConverter(fieldDataTypes); - this.input = + SingleOutputStreamOperator newInput = input.transform( - "Map", - InternalTypeInfo.of(rowType), - new StreamMapWithForwardingRecordAttributes<>( - (MapFunction) converter::toInternal)) - .setParallelism(input.getParallelism()); + "Map", + InternalTypeInfo.of(rowType), + new StreamMapWithForwardingRecordAttributes<>( + (MapFunction) converter::toInternal)); + setParallelism(newInput, input.getParallelism(), false); + this.input = newInput; return this; } @@ -217,13 +221,14 @@ public DataStreamSink build() { input = trySortInput(input); DataStream input = mapToInternalRow(this.input, table.rowType()); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { - input = + SingleOutputStreamOperator newInput = input.forward() .transform( "local merge", input.getType(), - new LocalMergeOperator.Factory(table.schema())) - .setParallelism(input.getParallelism()); + new LocalMergeOperator.Factory(table.schema())); + forwardParallelism(newInput, input); + input = newInput; } BucketMode bucketMode = table.bucketMode(); @@ -243,12 +248,14 @@ public DataStreamSink build() { protected DataStream mapToInternalRow( DataStream input, org.apache.paimon.types.RowType rowType) { - return input.transform( + SingleOutputStreamOperator result = + input.transform( "Map", org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(rowType), new StreamMapWithForwardingRecordAttributes<>( - (MapFunction) FlinkRowWrapper::new)) - .setParallelism(input.getParallelism()); + (MapFunction) FlinkRowWrapper::new)); + forwardParallelism(result, input); + return result; } protected DataStreamSink buildDynamicBucketSink( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkStreamPartitioner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkStreamPartitioner.java index f9b81760c30e..efa2a9d7b7d7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkStreamPartitioner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkStreamPartitioner.java @@ -73,9 +73,7 @@ public static DataStream partition( FlinkStreamPartitioner partitioner = new FlinkStreamPartitioner<>(channelComputer); PartitionTransformation partitioned = new PartitionTransformation<>(input.getTransformation(), partitioner); - if (parallelism != null) { - partitioned.setParallelism(parallelism); - } + partitioned.setParallelism(parallelism == null ? input.getParallelism() : parallelism); return new DataStream<>(input.getExecutionEnvironment(), partitioned); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java index 487d0f268986..b53e2031e98d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java @@ -31,11 +31,15 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import javax.annotation.Nullable; import java.util.Map; +import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism; +import static org.apache.paimon.flink.utils.ParallelismUtils.setParallelism; + /** * Sink for unaware-bucket table. * @@ -67,7 +71,7 @@ public DataStream doWrite( Options options = new Options(table.options()); if (options.get(FlinkConnectorOptions.PRECOMMIT_COMPACT)) { - written = + SingleOutputStreamOperator newWritten = written.transform( "New Files Compact Coordinator: " + table.name(), new EitherTypeInfo<>( @@ -83,8 +87,9 @@ public DataStream doWrite( "New Files Compact Worker: " + table.name(), new CommittableTypeInfo(), new UnawareBucketNewFilesCompactionWorkerOperator(table)) - .startNewChain() - .setParallelism(written.getParallelism()); + .startNewChain(); + forwardParallelism(newWritten, written); + written = newWritten; } boolean enableCompaction = !table.coreOptions().writeOnly(); @@ -95,7 +100,7 @@ public DataStream doWrite( == RuntimeExecutionMode.STREAMING; // if enable compaction, we need to add compaction topology to this job if (enableCompaction && isStreamingMode) { - written = + SingleOutputStreamOperator newWritten = written.transform( "Compact Coordinator: " + table.name(), new EitherTypeInfo<>( @@ -109,8 +114,9 @@ public DataStream doWrite( new CommittableTypeInfo(), new AppendBypassCompactWorkerOperator.Factory( table, initialCommitUser)) - .startNewChain() - .setParallelism(written.getParallelism()); + .startNewChain(); + setParallelism(newWritten, written.getParallelism(), false); + written = newWritten; } return written; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java index 7022002a43ba..7b18e2328699 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java @@ -39,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; @@ -83,7 +84,7 @@ public DataStreamSink build(DataStream input, @Nullable Integer // input -- bootstrap -- shuffle by key hash --> bucket-assigner -- shuffle by bucket --> // writer --> committer - DataStream> bootstraped = + SingleOutputStreamOperator> bootstraped = input.transform( "INDEX_BOOTSTRAP", new InternalTypeInfo<>( @@ -110,7 +111,7 @@ public DataStreamSink build(DataStream input, @Nullable Integer // 2. bucket-assigner TupleTypeInfo> rowWithBucketType = new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO); - DataStream> bucketAssigned = + SingleOutputStreamOperator> bucketAssigned = partitionByKeyHash .transform( "cross-partition-bucket-assigner", diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index d99efae0539d..542480cd197e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; @@ -67,6 +68,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SOURCE_OPERATOR_UID_SUFFIX; import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; @@ -272,9 +274,11 @@ public DataStream buildForRow() { DataFormatConverters.RowConverter converter = new DataFormatConverters.RowConverter(fieldDataTypes); DataStream source = build(); - return source.map((MapFunction) converter::toExternal) - .setParallelism(source.getParallelism()) - .returns(ExternalTypeInfo.of(rowType)); + SingleOutputStreamOperator result = + source.map((MapFunction) converter::toExternal) + .returns(ExternalTypeInfo.of(rowType)); + forwardParallelism(result, source); + return result; } /** Build source {@link DataStream} with {@link RowData}. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java new file mode 100644 index 000000000000..f22408f4c355 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ParallelismUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +/** Utility methods about Flink operator parallelism to resolve compatibility issues. */ +public class ParallelismUtils { + /** + * Configures the parallelism of the target stream to be the same as the source stream. In Flink + * 1.17+, this method will also configure {@link Transformation#isParallelismConfigured()}. + */ + public static void forwardParallelism( + SingleOutputStreamOperator targetStream, DataStream sourceStream) { + setParallelism( + targetStream, + sourceStream.getParallelism(), + sourceStream.getTransformation().isParallelismConfigured()); + } + + public static void setParallelism( + SingleOutputStreamOperator targetStream, + int parallelism, + boolean parallelismConfigured) { + // In Flink, SingleOutputStreamOperator#setParallelism wraps Transformation#setParallelism + // and provide additional checks and validations. In order to enable the checks in + // SingleOutputStreamOperator as well as the parallelismConfigured ability in + // Transformation, this method would invoke both setParallelism methods. + + targetStream.setParallelism(parallelism); + + targetStream.getTransformation().setParallelism(parallelism, parallelismConfigured); + } +}