-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink] Infer parallelism only in situation of parallelism is not set. #4975
base: master
Are you sure you want to change the base?
Conversation
@@ -81,6 +81,8 @@ void testInferScanParallelism() throws Exception { | |||
null); | |||
PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); | |||
StreamExecutionEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment(); | |||
sEnv1.setParallelism(-1); | |||
System.out.println(sEnv1.getParallelism()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This print seems unnecessary.
@@ -149,9 +149,18 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) { | |||
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM))); | |||
} | |||
Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM); | |||
if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) { | |||
if (parallelism == null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to understand the parallelism inferring logic this PR wants to achieve, please see if this is correct.
- If Paimon option SCAN_PARALLELISM is configured, then use this value.
- Else if Flink configuration
parallelism.default
is configured, then use this value. - Else if Paimon option INFER_SCAN_PARALLELISM is set to true, and the parallelism of source can be inferred by Paimon (like when fixed bucket + unbounded stream), then Paimon would provide an inference result.
- Else Paimon would not set the parallelism of the source operator, instead it would be Flink infra that decide the parallelism of this operator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as far as I am concerned, if we set default.parallelism in flink sql environment, we should not infer the parallelism, we should respect what the user writes. If we set INFER_SCAN_PARALLELISM default to true, than we must be careful, otherwise, users may be confused, cause they set the parallelism but seems not work.
We should respect environment and user as far as we can. This function is just auxiliary, we can't depend on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@leaves12138 Thanks for the update. According to our offline discussion, I learned some background information of this PR as follows. In a Flink job where Paimon source is directly followed by a Paimon sink, the Paimon writer operator's parallelism cannot be dynamically inferred by Flink infra. The reason is because the writer operator sets its parallelism according to the upstream(source) operator, and the source operator's parallelism has explicitly decided. So in order to allow Flink infra to change the parallelism of the writer operator, this PR made some change to the source operator's implementation. This way sounds indirect and incomplete, as Flink infra can still not set the parallelism in other situations. A better solution would be change writerDataStream.setParalleism(inputDataStream.getParallelism()); to writerDataStream.getTransformation().setParalleism(inputDataStream.getParallelism(), false); Without regard to the background motivation, this PR itself looks good to me so I'm +1 on it. We may open a next PR in future to use the APIs mentioned above to better facilitate writer operators for Flink infra. |
Purpose
default.parallelism is not working because scan.infer-parallelism is default true
Tests
API and Format
Documentation