Skip to content

Commit

Permalink
[NU-1962] Flink test mechanism refactoring: passing the same jobgraph…
Browse files Browse the repository at this point in the history
… as mutated
  • Loading branch information
arkadius committed Jan 16, 2025
1 parent 6504481 commit e2cb08a
Showing 1 changed file with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,25 @@ final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration
savepointRestoreSettings: SavepointRestoreSettings
): Unit = {
val streamGraph = env.getStreamGraph
setupStreamGraph(streamGraph, scenarioName, savepointRestoreSettings)
streamGraph.setJobName(scenarioName.value)
val jobGraph = streamGraph.getJobGraph
setupJobGraph(jobGraph, savepointRestoreSettings)

val miniClusterConfiguration = prepareMiniClusterConfiguration(parallelism, streamGraph)
val miniClusterConfiguration = prepareMiniClusterConfiguration(parallelism, jobGraph)

// it is required for proper working of HadoopFileSystem
FileSystem.initialize(miniClusterConfiguration, null)

Using.resource(createMiniCluster(env, miniClusterConfiguration)) { exec =>
val id = exec.submitJob(streamGraph.getJobGraph).get().getJobID
val id = exec.submitJob(jobGraph).get().getJobID
exec.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader)
}
}

private def setupStreamGraph(
streamGraph: StreamGraph,
scenarioName: ProcessName,
private def setupJobGraph(
jobGraph: JobGraph,
savepointRestoreSettings: SavepointRestoreSettings
): Unit = {
streamGraph.setJobName(scenarioName.value)

val jobGraph = streamGraph.getJobGraph()
jobGraph.setClasspaths(classpathsFromModelWithFallbackToConfiguration)
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings)
}
Expand All @@ -85,9 +83,9 @@ final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration
}
}

private def prepareMiniClusterConfiguration[T](parallelism: Int, streamGraph: StreamGraph) = {
private def prepareMiniClusterConfiguration[T](parallelism: Int, jobGraph: JobGraph) = {
val configuration: Configuration = new Configuration
configuration.addAll(streamGraph.getJobGraph.getJobConfiguration)
configuration.addAll(jobGraph.getJobConfiguration)
configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
configuration.set[Integer](RestOptions.PORT, 0)

Expand Down

0 comments on commit e2cb08a

Please sign in to comment.