Skip to content

Commit

Permalink
[NU-1962] Flink test mechanism refactoring: execute method splitted i…
Browse files Browse the repository at this point in the history
…nto logical parts
  • Loading branch information
arkadius committed Jan 16, 2025
1 parent 107d3a2 commit 6504481
Showing 1 changed file with 40 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package pl.touk.nussknacker.engine.process.runner

import org.apache.flink.configuration._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointRestoreSettings}
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.StreamGraph
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader

Expand All @@ -14,10 +15,16 @@ import scala.util.Using

final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration: Configuration) {

def createEnv(parallelism: Int): StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(
parallelism,
configuration
)
def createEnv(parallelism: Int): StreamExecutionEnvironment = {
val env = StreamExecutionEnvironment.createLocalEnvironment(
parallelism,
configuration
)
// Checkpoints are disabled to prevent waiting for checkpoint to happen
// before finishing execution.
env.getCheckpointConfig.disableCheckpointing()
env
}

private def createMiniCluster[T](env: StreamExecutionEnvironment, configuration: Configuration) = {
val miniCluster = new MiniCluster(
Expand All @@ -37,34 +44,32 @@ final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration
scenarioName: ProcessName,
savepointRestoreSettings: SavepointRestoreSettings
): Unit = {
// Checkpoints are disabled to prevent waiting for checkpoint to happen
// before finishing execution.
env.getCheckpointConfig.disableCheckpointing()

val streamGraph = env.getStreamGraph
streamGraph.setJobName(scenarioName.value)

val jobGraph = streamGraph.getJobGraph()
jobGraph.setClasspaths(classpathsFromModelWithFallbackToConfiguration)
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings)
setupStreamGraph(streamGraph, scenarioName, savepointRestoreSettings)

val configuration: Configuration = new Configuration
configuration.addAll(jobGraph.getJobConfiguration)
configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
configuration.set[Integer](RestOptions.PORT, 0)

// FIXME: reversing flink default order
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")
val miniClusterConfiguration = prepareMiniClusterConfiguration(parallelism, streamGraph)

// it is required for proper working of HadoopFileSystem
FileSystem.initialize(configuration, null)
FileSystem.initialize(miniClusterConfiguration, null)

Using.resource(createMiniCluster(env, configuration)) { exec =>
val id = exec.submitJob(jobGraph).get().getJobID
Using.resource(createMiniCluster(env, miniClusterConfiguration)) { exec =>
val id = exec.submitJob(streamGraph.getJobGraph).get().getJobID
exec.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader)
}
}

private def setupStreamGraph(
streamGraph: StreamGraph,
scenarioName: ProcessName,
savepointRestoreSettings: SavepointRestoreSettings
): Unit = {
streamGraph.setJobName(scenarioName.value)

val jobGraph = streamGraph.getJobGraph()
jobGraph.setClasspaths(classpathsFromModelWithFallbackToConfiguration)
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings)
}

private def classpathsFromModelWithFallbackToConfiguration = {
// The class is also used in some scala tests
// and this fallback is to work with a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265
Expand All @@ -80,4 +85,15 @@ final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration
}
}

private def prepareMiniClusterConfiguration[T](parallelism: Int, streamGraph: StreamGraph) = {
val configuration: Configuration = new Configuration
configuration.addAll(streamGraph.getJobGraph.getJobConfiguration)
configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
configuration.set[Integer](RestOptions.PORT, 0)

// FIXME: reversing flink default order
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")
configuration
}

}

0 comments on commit 6504481

Please sign in to comment.