From e2cb08a2ecdf89b1da1df36f381b9437f60b9cee Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Thu, 16 Jan 2025 14:54:02 +0100 Subject: [PATCH] [NU-1962] Flink test mechanism refactoring: passing the same jobgraph as mutated --- .../process/runner/FlinkStubbedRunner.scala | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala index 733a5ae2626..d1f46ede633 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala @@ -45,27 +45,25 @@ final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration savepointRestoreSettings: SavepointRestoreSettings ): Unit = { val streamGraph = env.getStreamGraph - setupStreamGraph(streamGraph, scenarioName, savepointRestoreSettings) + streamGraph.setJobName(scenarioName.value) + val jobGraph = streamGraph.getJobGraph + setupJobGraph(jobGraph, savepointRestoreSettings) - val miniClusterConfiguration = prepareMiniClusterConfiguration(parallelism, streamGraph) + val miniClusterConfiguration = prepareMiniClusterConfiguration(parallelism, jobGraph) // it is required for proper working of HadoopFileSystem FileSystem.initialize(miniClusterConfiguration, null) Using.resource(createMiniCluster(env, miniClusterConfiguration)) { exec => - val id = exec.submitJob(streamGraph.getJobGraph).get().getJobID + val id = exec.submitJob(jobGraph).get().getJobID exec.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader) } } - private def setupStreamGraph( - streamGraph: StreamGraph, - scenarioName: ProcessName, + private def setupJobGraph( + jobGraph: JobGraph, savepointRestoreSettings: SavepointRestoreSettings ): Unit = { - streamGraph.setJobName(scenarioName.value) - - val jobGraph = streamGraph.getJobGraph() jobGraph.setClasspaths(classpathsFromModelWithFallbackToConfiguration) jobGraph.setSavepointRestoreSettings(savepointRestoreSettings) } @@ -85,9 +83,9 @@ final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration } } - private def prepareMiniClusterConfiguration[T](parallelism: Int, streamGraph: StreamGraph) = { + private def prepareMiniClusterConfiguration[T](parallelism: Int, jobGraph: JobGraph) = { val configuration: Configuration = new Configuration - configuration.addAll(streamGraph.getJobGraph.getJobConfiguration) + configuration.addAll(jobGraph.getJobConfiguration) configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, parallelism) configuration.set[Integer](RestOptions.PORT, 0)