Skip to content

Commit

Permalink
[Fix] Passing Flink Job Global Params (#7324) (#7470)
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki authored Jan 18, 2025
1 parent 4026065 commit 0ff5d03
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 20 deletions.
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* in order to use it, DM must implement interface `schedulingSupported`, that handles deployments on a specific engine
* implementation provided for Flink DM
* [#7443](https://github.com/TouK/nussknacker/pull/7443) Indexing on record is more similar to indexing on map. The change lets us access record values dynamically. For example now spel expression "{a: 5, b: 10}[#input.field]" compiles and has type "Integer" inferred from types of values of the record. This lets us access record value based on user input, for instance if user passes "{"field": "b"}" to scenario we will get value "10", whereas input {"field": "c"} would result in "null". Expression "{a: 5}["b"]" still does not compile because it is known at compile time that record does not have property "b".
* [#7324](https://github.com/TouK/nussknacker/pull/7324) Fix: Passing Flink Job Global Params

## 1.18

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import scala.jdk.CollectionConverters._
//Also, those configuration properties will be exposed via Flink REST API/webconsole
case class NkGlobalParameters(
buildInfo: String,
deploymentId: String, // TODO: Pass here DeploymentId?
processVersion: ProcessVersion,
configParameters: Option[ConfigGlobalParameters],
namespaceParameters: Option[NamespaceMetricsTags],
Expand Down Expand Up @@ -63,13 +64,21 @@ object NkGlobalParameters {

def create(
buildInfo: String,
deploymentId: String, // TODO: Pass here DeploymentId?
processVersion: ProcessVersion,
modelConfig: Config,
namespaceTags: Option[NamespaceMetricsTags],
additionalInformation: Map[String, String]
): NkGlobalParameters = {
val configGlobalParameters = modelConfig.getAs[ConfigGlobalParameters]("globalParameters")
NkGlobalParameters(buildInfo, processVersion, configGlobalParameters, namespaceTags, additionalInformation)
NkGlobalParameters(
buildInfo,
deploymentId,
processVersion,
configGlobalParameters,
namespaceTags,
additionalInformation
)
}

def fromMap(jobParameters: java.util.Map[String, String]): Option[NkGlobalParameters] =
Expand All @@ -79,11 +88,12 @@ object NkGlobalParameters {

def encode(parameters: NkGlobalParameters): Map[String, String] = {
def encodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.map { case (key, value) => s"$prefix$key" -> value }
map.map { case (key, value) => s"$prefix.$key" -> value }
}

val baseProperties = Map[String, String](
"buildInfo" -> parameters.buildInfo,
"deploymentId" -> parameters.deploymentId,
"versionId" -> parameters.processVersion.versionId.value.toString,
"processId" -> parameters.processVersion.processId.value.toString,
"modelVersion" -> parameters.processVersion.modelVersion.map(_.toString).orNull,
Expand All @@ -95,9 +105,11 @@ object NkGlobalParameters {
val configMap = parameters.configParameters
.map(ConfigGlobalParametersToMapEncoder.encode)
.getOrElse(Map.empty)

val namespaceTagsMap = parameters.namespaceParameters
.map(p => encodeWithKeyPrefix(p.tags, namespaceTagsMapPrefix))
.getOrElse(Map.empty)

val additionalInformationMap =
encodeWithKeyPrefix(parameters.additionalInformation, additionalInformationMapPrefix)

Expand All @@ -107,8 +119,8 @@ object NkGlobalParameters {
def decode(map: Map[String, String]): Option[NkGlobalParameters] = {
def decodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.view
.filter { case (key, _) => key.startsWith(prefix) }
.map { case (key, value) => key.stripPrefix(prefix) -> value }
.filter { case (key, _) => key.startsWith(s"$prefix.") }
.map { case (key, value) => key.stripPrefix(s"$prefix.") -> value }
.toMap
}

Expand All @@ -134,7 +146,15 @@ object NkGlobalParameters {
for {
processVersion <- processVersionOpt
buildInfo <- buildInfoOpt
} yield NkGlobalParameters(buildInfo, processVersion, configParameters, namespaceTags, additionalInformation)
deploymentId <- map.get("deploymentId")
} yield NkGlobalParameters(
buildInfo,
deploymentId,
processVersion,
configParameters,
namespaceTags,
additionalInformation
)
}

private object ConfigGlobalParametersToMapEncoder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,30 +56,19 @@ object ExecutionConfigPreparer extends LazyLogging {
config.setGlobalJobParameters(
NkGlobalParameters.create(
buildInfo,
deploymentData.deploymentId.value,
jobData.processVersion,
modelConfig,
namespaceTags = NamespaceMetricsTags(jobData.metaData.name.value, namingStrategy),
prepareMap(jobData.processVersion, deploymentData)
prepareMap(deploymentData)
)
)
}

private def prepareMap(processVersion: ProcessVersion, deploymentData: DeploymentData) = {

val baseProperties = Map[String, String](
"buildInfo" -> buildInfo,
"versionId" -> processVersion.versionId.value.toString,
"processId" -> processVersion.processId.value.toString,
"labels" -> Encoder[List[String]].apply(processVersion.labels).noSpaces,
"modelVersion" -> processVersion.modelVersion.map(_.toString).orNull,
"user" -> processVersion.user,
"deploymentId" -> deploymentData.deploymentId.value
)
val scenarioProperties = deploymentData.additionalDeploymentData.map { case (k, v) =>
private def prepareMap(deploymentData: DeploymentData) =
deploymentData.additionalDeploymentData.map { case (k, v) =>
s"deployment.properties.$k" -> v
}
baseProperties ++ scenarioProperties
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers {
test("global parameters set and read from context are equal") {
val globalParamsWithAllOptionalValues = NkGlobalParameters(
buildInfo = "aBuildInfo",
deploymentId = "1",
processVersion = ProcessVersion(
VersionId.initialVersionId,
ProcessName("aProcessName"),
Expand All @@ -27,6 +28,7 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers {

val globalParamsWithNoOptionalValues = NkGlobalParameters(
buildInfo = "aBuildInfo",
deploymentId = "1",
processVersion = ProcessVersion(
VersionId.initialVersionId,
ProcessName("aProcessName"),
Expand All @@ -44,6 +46,7 @@ class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers {
val decodedParams = NkGlobalParameters.fromMap(params.toMap).get

decodedParams.buildInfo shouldBe params.buildInfo
decodedParams.deploymentId shouldBe params.deploymentId
decodedParams.processVersion shouldBe params.processVersion
decodedParams.configParameters shouldBe params.configParameters
decodedParams.namespaceParameters shouldBe params.namespaceParameters
Expand Down

0 comments on commit 0ff5d03

Please sign in to comment.