From 5b9e809e9b671610d74b1d7821c3fa5445376726 Mon Sep 17 00:00:00 2001 From: benjobs Date: Sat, 5 Oct 2024 20:23:39 +0800 Subject: [PATCH 1/2] [Improve] read flinkConf bug fixed. --- .../flink/core/FlinkStreamingInitializer.scala | 16 +++++++++++++--- .../flink/core/FlinkTableInitializer.scala | 18 ++++++++++++++---- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index 47a5bce6b3..55774e468c 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -86,16 +86,26 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api throw new IllegalArgumentException( "[StreamPark] Usage:can't fond config, please set \"--conf $path \" in main arguments") } - val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) + val appFlinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX) // config priority: explicitly specified priority > project profiles > system profiles val parameter = ParameterTool .fromSystemProperties() - .mergeWith(ParameterTool.fromMap(flinkConf)) + .mergeWith(ParameterTool.fromMap(appFlinkConf)) .mergeWith(ParameterTool.fromMap(appConf)) .mergeWith(argsMap) - val envConfig = Configuration.fromMap(flinkConf) + val flinkConf: Map[String, String] = { + parameter.get(KEY_FLINK_CONF(), null) match { + case flinkConf if flinkConf != null => + PropertiesUtils + .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf)) + .filter(_._2.nonEmpty) + case _ => Map.empty + } + } + + val envConfig = Configuration.fromMap(flinkConf + appFlinkConf) FlinkConfiguration(parameter, envConfig, null) } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index fc2d073fcc..e2709f8795 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -19,7 +19,7 @@ package org.apache.streampark.flink.core import org.apache.streampark.common.conf.ConfigConst._ import org.apache.streampark.common.enums.{ApiType, PlannerType} import org.apache.streampark.common.enums.ApiType.ApiType -import org.apache.streampark.common.util.DeflaterUtils +import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils} import org.apache.streampark.flink.core.EnhancerImplicit._ import org.apache.flink.api.java.utils.ParameterTool @@ -182,17 +182,27 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType FlinkConfiguration(cliParameterTool, new Configuration(), new Configuration()) } else { // config priority: explicitly specified priority > project profiles > system profiles - val flinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) + val appFlinkConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX) val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX) val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX) val sqlConf = extractConfigByPrefix(configMap, KEY_SQL_PREFIX) - val envConfig = Configuration.fromMap(flinkConf) + val flinkConf: Map[String, String] = { + parameter.get(KEY_FLINK_CONF(), null) match { + case flinkConf if flinkConf != null => + PropertiesUtils + .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf)) + .filter(_._2.nonEmpty) + case _ => Map.empty + } + } + + val envConfig = Configuration.fromMap(flinkConf + appFlinkConf) val tableConfig = Configuration.fromMap(tableConf) val parameterTool = ParameterTool .fromSystemProperties() - .mergeWith(ParameterTool.fromMap(flinkConf)) + .mergeWith(ParameterTool.fromMap(appFlinkConf)) .mergeWith(ParameterTool.fromMap(appConf)) .mergeWith(ParameterTool.fromMap(tableConf)) .mergeWith(ParameterTool.fromMap(sqlConf)) From 5a036574c3300ca6cdb4d269de1cbe10011ea72d Mon Sep 17 00:00:00 2001 From: benjobs Date: Sat, 5 Oct 2024 21:14:51 +0800 Subject: [PATCH 2/2] [Improve] scala map bug fixed. --- .../streampark/flink/core/FlinkStreamingInitializer.scala | 2 +- .../apache/streampark/flink/core/FlinkTableInitializer.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index 55774e468c..ab1808f1c6 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -105,7 +105,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api } } - val envConfig = Configuration.fromMap(flinkConf + appFlinkConf) + val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf) FlinkConfiguration(parameter, envConfig, null) } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index e2709f8795..89f3044d31 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -197,7 +197,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType } } - val envConfig = Configuration.fromMap(flinkConf + appFlinkConf) + val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf) val tableConfig = Configuration.fromMap(tableConf) val parameterTool = ParameterTool