Skip to content

Commit

Permalink
Add parallelismConfigured
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Feb 13, 2025
1 parent a2ed191 commit a07de06
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 102 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

/** Utility methods about Flink operator parallelism to resolve compatibility issues. */
public class ParallelismUtils {
/**
* Configures the parallelism of the target stream to be the same as the source stream. In Flink
* 1.17+, this method will also configure {@link Transformation#isParallelismConfigured()}.
*/
public static void forwardParallelism(
SingleOutputStreamOperator<?> targetStream, DataStream<?> sourceStream) {
targetStream.setParallelism(sourceStream.getParallelism());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

/** Utility methods about Flink operator parallelism to resolve compatibility issues. */
public class ParallelismUtils {
/**
* Configures the parallelism of the target stream to be the same as the source stream. In Flink
* 1.17+, this method will also configure {@link Transformation#isParallelismConfigured()}.
*/
public static void forwardParallelism(
SingleOutputStreamOperator<?> targetStream, DataStream<?> sourceStream) {
targetStream.setParallelism(sourceStream.getParallelism());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import javax.annotation.Nullable;

import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;

/**
* Builder for sink when syncing records into one Paimon table.
Expand Down Expand Up @@ -99,8 +100,8 @@ public DataStreamSink<?> build() {
SingleOutputStreamOperator<CdcRecord> parsed =
input.forward()
.process(new CdcParsingProcessFunction<>(parserFactory))
.name("Side Output")
.setParallelism(input.getParallelism());
.name("Side Output");
forwardParallelism(parsed, input);

DataStream<Void> schemaChangeProcessFunction =
SingleOutputStreamOperatorUtils.getSideOutput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter;
import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;

/**
* A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema change if necessary.
Expand Down Expand Up @@ -109,10 +110,8 @@ public DataStreamSink<?> sinkFrom(
MultiTableCommittableTypeInfo typeInfo = new MultiTableCommittableTypeInfo();
SingleOutputStreamOperator<MultiTableCommittable> written =
input.transform(
WRITER_NAME,
typeInfo,
createWriteOperator(sinkProvider, commitUser))
.setParallelism(input.getParallelism());
WRITER_NAME, typeInfo, createWriteOperator(sinkProvider, commitUser));
forwardParallelism(written, input);

// shuffle committables by table
DataStream<MultiTableCommittable> partitioned =
Expand All @@ -122,17 +121,16 @@ public DataStreamSink<?> sinkFrom(
input.getParallelism());

SingleOutputStreamOperator<?> committed =
partitioned
.transform(
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperatorFactory<>(
true,
false,
commitUser,
createCommitterFactory(),
createCommittableStateManager()))
.setParallelism(input.getParallelism());
partitioned.transform(
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperatorFactory<>(
true,
false,
commitUser,
createCommitterFactory(),
createCommittableStateManager()));
forwardParallelism(committed, input);
configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
return committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;

/**
* Builder for CDC {@link FlinkWriteSink} when syncing the whole database into one Paimon database.
Expand Down Expand Up @@ -140,8 +141,8 @@ private void buildCombinedCdcSink() {
.process(
new CdcDynamicTableParsingProcessFunction<>(
database, catalogLoader, parserFactory))
.name("Side Output")
.setParallelism(input.getParallelism());
.name("Side Output");
forwardParallelism(parsed, input);

// for newly-added tables, create a multiplexing operator that handles all their records
// and writes to multiple tables
Expand Down Expand Up @@ -188,8 +189,8 @@ private void buildDividedCdcSink() {
SingleOutputStreamOperator<Void> parsed =
input.forward()
.process(new CdcMultiTableParsingProcessFunction<>(parserFactory))
.name("Side Output")
.setParallelism(input.getParallelism());
.name("Side Output");
forwardParallelism(parsed, input);

for (FileStoreTable table : tables) {
DataStream<Void> schemaChangeProcessFunction =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
"ABSTRACT KEY AND SIZE",
new StreamMap<>(new KeyAndSizeExtractor<>(valueRowType, isSortBySize)),
new TupleTypeInfo<>(keyTypeInformation, BasicTypeInfo.INT_TYPE_INFO),
input.getParallelism());
input.getParallelism(),
input.isParallelismConfigured());

// 1. Fixed size sample in each partitions.
OneInputTransformation<Tuple2<T, Integer>, Tuple3<Double, T, Integer>> localSample =
Expand All @@ -123,7 +124,8 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
BasicTypeInfo.DOUBLE_TYPE_INFO,
keyTypeInformation,
BasicTypeInfo.INT_TYPE_INFO),
keyInput.getParallelism());
keyInput.getParallelism(),
keyInput.isParallelismConfigured());

// 2. Collect all the samples and gather them into a sorted key range.
OneInputTransformation<Tuple3<Double, T, Integer>, List<T>> sampleAndHistogram =
Expand All @@ -132,7 +134,8 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
"GLOBAL SAMPLE",
new GlobalSampleOperator<>(globalSampleSize, keyComparator, rangeNum),
new ListTypeInfo<>(keyTypeInformation),
1);
1,
true);

// 3. Take range boundaries as broadcast input and take the tuple of partition id and
// record as output.
Expand All @@ -153,7 +156,8 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
new AssignRangeIndexOperator<>(keyComparator),
new TupleTypeInfo<>(
BasicTypeInfo.INT_TYPE_INFO, input.getOutputType()),
input.getParallelism());
input.getParallelism(),
input.isParallelismConfigured());

// 4. Remove the partition id. (shuffle according range partition)
return new DataStream<>(
Expand All @@ -168,7 +172,8 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
"REMOVE RANGE INDEX",
new RemoveRangeIndexOperator<>(),
input.getOutputType(),
outParallelism));
outParallelism,
true));
}

/** KeyAndSizeExtractor is responsible for extracting the sort key and row size. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.apache.paimon.flink.sink.FlinkSink.assertBatchAdaptiveParallelism;
import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;

/** A sink for processing multi-tables in dedicated compaction job. */
public class CombinedTableCompactorSink implements Serializable {
Expand Down Expand Up @@ -103,25 +104,23 @@ public DataStream<MultiTableCommittable> doWrite(
== RuntimeExecutionMode.STREAMING;

SingleOutputStreamOperator<MultiTableCommittable> multiBucketTableRewriter =
awareBucketTableSource
.transform(
String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
combinedMultiCompactionWriteOperator(
env.getCheckpointConfig(),
isStreaming,
fullCompaction,
commitUser))
.setParallelism(awareBucketTableSource.getParallelism());
awareBucketTableSource.transform(
String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
combinedMultiCompactionWriteOperator(
env.getCheckpointConfig(),
isStreaming,
fullCompaction,
commitUser));
forwardParallelism(multiBucketTableRewriter, awareBucketTableSource);

SingleOutputStreamOperator<MultiTableCommittable> unawareBucketTableRewriter =
unawareBucketTableSource
.transform(
String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
new AppendOnlyMultiTableCompactionWorkerOperator.Factory(
catalogLoader, commitUser, options))
.setParallelism(unawareBucketTableSource.getParallelism());
unawareBucketTableSource.transform(
String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
new AppendOnlyMultiTableCompactionWorkerOperator.Factory(
catalogLoader, commitUser, options));
forwardParallelism(unawareBucketTableRewriter, unawareBucketTableSource);

if (!isStreaming) {
assertBatchAdaptiveParallelism(env, multiBucketTableRewriter.getParallelism());
Expand Down Expand Up @@ -156,18 +155,17 @@ protected DataStreamSink<?> doCommit(
new MultiTableCommittableChannelComputer(),
written.getParallelism());
SingleOutputStreamOperator<?> committed =
partitioned
.transform(
GLOBAL_COMMITTER_NAME,
new MultiTableCommittableTypeInfo(),
new CommitterOperatorFactory<>(
streamingCheckpointEnabled,
false,
commitUser,
createCommitterFactory(isStreaming),
createCommittableStateManager(),
options.get(END_INPUT_WATERMARK)))
.setParallelism(written.getParallelism());
partitioned.transform(
GLOBAL_COMMITTER_NAME,
new MultiTableCommittableTypeInfo(),
new CommitterOperatorFactory<>(
streamingCheckpointEnabled,
false,
commitUser,
createCommitterFactory(isStreaming),
createCommittableStateManager(),
options.get(END_INPUT_WATERMARK)));
forwardParallelism(committed, written);
if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) {
committed = committed.startNewChain();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import javax.annotation.Nullable;

import java.util.Map;

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;

/** This class is only used for generate compact sink topology for dynamic bucket table. */
public class DynamicBucketCompactSink extends RowDynamicBucketSink {
Expand All @@ -54,9 +56,9 @@ public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable Integer
initialCommitUser, table, null, extractorFunction(), true);
TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO);
DataStream<Tuple2<InternalRow, Integer>> bucketAssigned =
input.transform("dynamic-bucket-assigner", rowWithBucketType, assignerOperator)
.setParallelism(input.getParallelism());
SingleOutputStreamOperator<Tuple2<InternalRow, Integer>> bucketAssigned =
input.transform("dynamic-bucket-assigner", rowWithBucketType, assignerOperator);
forwardParallelism(bucketAssigned, input);
return sinkFrom(bucketAssigned, initialCommitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;

/** Sink for dynamic bucket table. */
public abstract class DynamicBucketSink<T> extends FlinkWriteSink<Tuple2<T, Integer>> {
Expand Down Expand Up @@ -95,10 +96,9 @@ public DataStreamSink<?> build(DataStream<T> input, @Nullable Integer parallelis
TupleTypeInfo<Tuple2<T, Integer>> rowWithBucketType =
new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO);
SingleOutputStreamOperator<Tuple2<T, Integer>> bucketAssigned =
partitionByKeyHash
.transform(
DYNAMIC_BUCKET_ASSIGNER_NAME, rowWithBucketType, assignerOperator)
.setParallelism(partitionByKeyHash.getParallelism());
partitionByKeyHash.transform(
DYNAMIC_BUCKET_ASSIGNER_NAME, rowWithBucketType, assignerOperator);
forwardParallelism(bucketAssigned, partitionByKeyHash);

String uidSuffix = table.options().get(SINK_OPERATOR_UID_SUFFIX.key());
if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) {
Expand Down
Loading

0 comments on commit a07de06

Please sign in to comment.