From 0ff5d03edc90a2fe408c87d470cc33eb595c31bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Cio=C5=82ecki?= Date: Sat, 18 Jan 2025 10:13:16 +0100 Subject: [PATCH] [Fix] Passing Flink Job Global Params (#7324) (#7470) --- docs/Changelog.md | 1 + .../engine/flink/api/NkGlobalParameters.scala | 30 +++++++++++++++---- .../process/ExecutionConfigPreparer.scala | 19 +++--------- .../NkGlobalParametersEncoderTest.scala | 3 ++ 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index 1d669514b45..c008dee443c 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -59,6 +59,7 @@ * in order to use it, DM must implement interface `schedulingSupported`, that handles deployments on a specific engine * implementation provided for Flink DM * [#7443](https://github.com/TouK/nussknacker/pull/7443) Indexing on record is more similar to indexing on map. The change lets us access record values dynamically. For example now spel expression "{a: 5, b: 10}[#input.field]" compiles and has type "Integer" inferred from types of values of the record. This lets us access record value based on user input, for instance if user passes "{"field": "b"}" to scenario we will get value "10", whereas input {"field": "c"} would result in "null". Expression "{a: 5}["b"]" still does not compile because it is known at compile time that record does not have property "b". +* [#7324](https://github.com/TouK/nussknacker/pull/7324) Fix: Passing Flink Job Global Params ## 1.18 diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala index ddd7fbc38e2..ac9e0bc9960 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala @@ -17,6 +17,7 @@ import scala.jdk.CollectionConverters._ //Also, those configuration properties will be exposed via Flink REST API/webconsole case class NkGlobalParameters( buildInfo: String, + deploymentId: String, // TODO: Pass here DeploymentId? processVersion: ProcessVersion, configParameters: Option[ConfigGlobalParameters], namespaceParameters: Option[NamespaceMetricsTags], @@ -63,13 +64,21 @@ object NkGlobalParameters { def create( buildInfo: String, + deploymentId: String, // TODO: Pass here DeploymentId? processVersion: ProcessVersion, modelConfig: Config, namespaceTags: Option[NamespaceMetricsTags], additionalInformation: Map[String, String] ): NkGlobalParameters = { val configGlobalParameters = modelConfig.getAs[ConfigGlobalParameters]("globalParameters") - NkGlobalParameters(buildInfo, processVersion, configGlobalParameters, namespaceTags, additionalInformation) + NkGlobalParameters( + buildInfo, + deploymentId, + processVersion, + configGlobalParameters, + namespaceTags, + additionalInformation + ) } def fromMap(jobParameters: java.util.Map[String, String]): Option[NkGlobalParameters] = @@ -79,11 +88,12 @@ object NkGlobalParameters { def encode(parameters: NkGlobalParameters): Map[String, String] = { def encodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = { - map.map { case (key, value) => s"$prefix$key" -> value } + map.map { case (key, value) => s"$prefix.$key" -> value } } val baseProperties = Map[String, String]( "buildInfo" -> parameters.buildInfo, + "deploymentId" -> parameters.deploymentId, "versionId" -> parameters.processVersion.versionId.value.toString, "processId" -> parameters.processVersion.processId.value.toString, "modelVersion" -> parameters.processVersion.modelVersion.map(_.toString).orNull, @@ -95,9 +105,11 @@ object NkGlobalParameters { val configMap = parameters.configParameters .map(ConfigGlobalParametersToMapEncoder.encode) .getOrElse(Map.empty) + val namespaceTagsMap = parameters.namespaceParameters .map(p => encodeWithKeyPrefix(p.tags, namespaceTagsMapPrefix)) .getOrElse(Map.empty) + val additionalInformationMap = encodeWithKeyPrefix(parameters.additionalInformation, additionalInformationMapPrefix) @@ -107,8 +119,8 @@ object NkGlobalParameters { def decode(map: Map[String, String]): Option[NkGlobalParameters] = { def decodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = { map.view - .filter { case (key, _) => key.startsWith(prefix) } - .map { case (key, value) => key.stripPrefix(prefix) -> value } + .filter { case (key, _) => key.startsWith(s"$prefix.") } + .map { case (key, value) => key.stripPrefix(s"$prefix.") -> value } .toMap } @@ -134,7 +146,15 @@ object NkGlobalParameters { for { processVersion <- processVersionOpt buildInfo <- buildInfoOpt - } yield NkGlobalParameters(buildInfo, processVersion, configParameters, namespaceTags, additionalInformation) + deploymentId <- map.get("deploymentId") + } yield NkGlobalParameters( + buildInfo, + deploymentId, + processVersion, + configParameters, + namespaceTags, + additionalInformation + ) } private object ConfigGlobalParametersToMapEncoder { diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala index 34c9140ac1a..debf951005f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala @@ -56,30 +56,19 @@ object ExecutionConfigPreparer extends LazyLogging { config.setGlobalJobParameters( NkGlobalParameters.create( buildInfo, + deploymentData.deploymentId.value, jobData.processVersion, modelConfig, namespaceTags = NamespaceMetricsTags(jobData.metaData.name.value, namingStrategy), - prepareMap(jobData.processVersion, deploymentData) + prepareMap(deploymentData) ) ) } - private def prepareMap(processVersion: ProcessVersion, deploymentData: DeploymentData) = { - - val baseProperties = Map[String, String]( - "buildInfo" -> buildInfo, - "versionId" -> processVersion.versionId.value.toString, - "processId" -> processVersion.processId.value.toString, - "labels" -> Encoder[List[String]].apply(processVersion.labels).noSpaces, - "modelVersion" -> processVersion.modelVersion.map(_.toString).orNull, - "user" -> processVersion.user, - "deploymentId" -> deploymentData.deploymentId.value - ) - val scenarioProperties = deploymentData.additionalDeploymentData.map { case (k, v) => + private def prepareMap(deploymentData: DeploymentData) = + deploymentData.additionalDeploymentData.map { case (k, v) => s"deployment.properties.$k" -> v } - baseProperties ++ scenarioProperties - } } diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala index 7ad5e7528a1..8ebb8aa889c 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala @@ -12,6 +12,7 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers { test("global parameters set and read from context are equal") { val globalParamsWithAllOptionalValues = NkGlobalParameters( buildInfo = "aBuildInfo", + deploymentId = "1", processVersion = ProcessVersion( VersionId.initialVersionId, ProcessName("aProcessName"), @@ -27,6 +28,7 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers { val globalParamsWithNoOptionalValues = NkGlobalParameters( buildInfo = "aBuildInfo", + deploymentId = "1", processVersion = ProcessVersion( VersionId.initialVersionId, ProcessName("aProcessName"), @@ -44,6 +46,7 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers { val decodedParams = NkGlobalParameters.fromMap(params.toMap).get decodedParams.buildInfo shouldBe params.buildInfo + decodedParams.deploymentId shouldBe params.deploymentId decodedParams.processVersion shouldBe params.processVersion decodedParams.configParameters shouldBe params.configParameters decodedParams.namespaceParameters shouldBe params.namespaceParameters