Skip to content

Commit

Permalink
[NU-1962] Flink test mechanism refactoring: less parameters passing
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 16, 2025
1 parent 98ace54 commit 107d3a2
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,55 +1,56 @@
package pl.touk.nussknacker.engine.process.runner

import org.apache.flink.configuration.{
ConfigUtils,
Configuration,
CoreOptions,
PipelineOptions,
RestOptions,
TaskManagerOptions
}
import org.apache.flink.configuration._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.util.MetaDataExtractor
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader

import java.net.{MalformedURLException, URL}
import scala.jdk.CollectionConverters._
import scala.util.Using

final class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, configuration: Configuration) {
final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration: Configuration) {

def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(
MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1),
def createEnv(parallelism: Int): StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(
parallelism,
configuration
)

private def createMiniCluster[T](env: StreamExecutionEnvironment, configuration: Configuration) = {
val miniCluster = new MiniCluster(
new MiniClusterConfiguration.Builder()
.setNumSlotsPerTaskManager(env.getParallelism)
.setConfiguration(configuration)
.build()
)
miniCluster.start()
miniCluster
}

// we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath...
def execute[T](
env: StreamExecutionEnvironment,
parallelism: Int,
scenarioName: ProcessName,
savepointRestoreSettings: SavepointRestoreSettings
): Unit = {
// Checkpoints are disabled to prevent waiting for checkpoint to happen
// before finishing execution.
env.getCheckpointConfig.disableCheckpointing()

val streamGraph = env.getStreamGraph
streamGraph.setJobName(process.name.value)
streamGraph.setJobName(scenarioName.value)

val jobGraph = streamGraph.getJobGraph()
jobGraph.setClasspaths(classpathsFromModelWithFallbackToConfiguration)
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings)

val configuration: Configuration = new Configuration
configuration.addAll(jobGraph.getJobConfiguration)
configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, env.getParallelism)
configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
configuration.set[Integer](RestOptions.PORT, 0)

// FIXME: reversing flink default order
Expand All @@ -58,15 +59,7 @@ final class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess,
// it is required for proper working of HadoopFileSystem
FileSystem.initialize(configuration, null)

Using.resource(
new MiniCluster(
new MiniClusterConfiguration.Builder()
.setNumSlotsPerTaskManager(env.getParallelism)
.setConfiguration(configuration)
.build()
)
) { exec =>
exec.start()
Using.resource(createMiniCluster(env, configuration)) { exec =>
val id = exec.submitJob(jobGraph).get().getJobID
exec.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader)
}
Expand All @@ -76,7 +69,7 @@ final class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess,
// The class is also used in some scala tests
// and this fallback is to work with a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265
// see details in pl.touk.nussknacker.engine.flink.test.MiniClusterExecutionEnvironment#execute
modelData.modelClassLoaderUrls match {
modelClassLoader.urls match {
case Nil =>
ConfigUtils.decodeListFromConfig[String, URL, MalformedURLException](
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.circe.Json
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
Expand All @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.testmode.{
ResultsCollectingListenerHolder,
TestServiceInvocationCollector
}
import pl.touk.nussknacker.engine.util.MetaDataExtractor

import scala.util.Using

Expand Down Expand Up @@ -55,17 +56,21 @@ class FlinkTestMain(
val configuration: Configuration
) {

private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration)
private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration)

def runTest: TestResults[Json] = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
try {
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar(collectingListener, scenarioTestData)
val env = stubbedRunner.createEnv
val parallelism = MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
val env = stubbedRunner.createEnv(parallelism)

registrar.register(env, process, processVersion, deploymentData, resultCollector)
stubbedRunner.execute(env, SavepointRestoreSettings.none())
stubbedRunner.execute(env, parallelism, process.name, SavepointRestoreSettings.none())
collectingListener.results
} finally {
collectingListener.clean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package pl.touk.nussknacker.engine.process.runner
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig}
import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestRunId, TestServiceInvocationCollector}
import pl.touk.nussknacker.engine.util.MetaDataExtractor

object FlinkVerificationMain extends FlinkRunner {

Expand All @@ -35,16 +36,20 @@ class FlinkVerificationMain(
val configuration: Configuration
) {

private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration)
private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration)

def runTest(): Unit = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar()
val env = stubbedRunner.createEnv
val parallelism = MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
val env = stubbedRunner.createEnv(parallelism)

registrar.register(env, process, processVersion, deploymentData, resultCollector)
stubbedRunner.execute(env, SavepointRestoreSettings.forPath(savepointPath, true))
stubbedRunner.execute(env, parallelism, process.name, SavepointRestoreSettings.forPath(savepointPath, true))
}

protected def prepareRegistrar(): FlinkProcessRegistrar = {
Expand Down

0 comments on commit 107d3a2

Please sign in to comment.