From 107d3a240cc5b94c7ed8fe317a62f699bab80035 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Thu, 16 Jan 2025 12:23:07 +0100 Subject: [PATCH] [NU-1962] Flink test mechanism refactoring: less parameters passing --- .../process/runner/FlinkStubbedRunner.scala | 53 ++++++++----------- .../engine/process/runner/FlinkTestMain.scala | 13 +++-- .../runner/FlinkVerificationMain.scala | 13 +++-- 3 files changed, 41 insertions(+), 38 deletions(-) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala index c30464fa795..43007d5a543 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala @@ -1,39 +1,40 @@ package pl.touk.nussknacker.engine.process.runner -import org.apache.flink.configuration.{ - ConfigUtils, - Configuration, - CoreOptions, - PipelineOptions, - RestOptions, - TaskManagerOptions -} +import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.StreamMetaData -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.util.MetaDataExtractor +import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import java.net.{MalformedURLException, URL} import scala.jdk.CollectionConverters._ import scala.util.Using -final class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, configuration: Configuration) { +final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration: Configuration) { - def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment( - MetaDataExtractor - .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) - .parallelism - .getOrElse(1), + def createEnv(parallelism: Int): StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment( + parallelism, configuration ) + private def createMiniCluster[T](env: StreamExecutionEnvironment, configuration: Configuration) = { + val miniCluster = new MiniCluster( + new MiniClusterConfiguration.Builder() + .setNumSlotsPerTaskManager(env.getParallelism) + .setConfiguration(configuration) + .build() + ) + miniCluster.start() + miniCluster + } + // we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath... def execute[T]( env: StreamExecutionEnvironment, + parallelism: Int, + scenarioName: ProcessName, savepointRestoreSettings: SavepointRestoreSettings ): Unit = { // Checkpoints are disabled to prevent waiting for checkpoint to happen @@ -41,7 +42,7 @@ final class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, env.getCheckpointConfig.disableCheckpointing() val streamGraph = env.getStreamGraph - streamGraph.setJobName(process.name.value) + streamGraph.setJobName(scenarioName.value) val jobGraph = streamGraph.getJobGraph() jobGraph.setClasspaths(classpathsFromModelWithFallbackToConfiguration) @@ -49,7 +50,7 @@ final class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, val configuration: Configuration = new Configuration configuration.addAll(jobGraph.getJobConfiguration) - configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, env.getParallelism) + configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, parallelism) configuration.set[Integer](RestOptions.PORT, 0) // FIXME: reversing flink default order @@ -58,15 +59,7 @@ final class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, // it is required for proper working of HadoopFileSystem FileSystem.initialize(configuration, null) - Using.resource( - new MiniCluster( - new MiniClusterConfiguration.Builder() - .setNumSlotsPerTaskManager(env.getParallelism) - .setConfiguration(configuration) - .build() - ) - ) { exec => - exec.start() + Using.resource(createMiniCluster(env, configuration)) { exec => val id = exec.submitJob(jobGraph).get().getJobID exec.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader) } @@ -76,7 +69,7 @@ final class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, // The class is also used in some scala tests // and this fallback is to work with a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265 // see details in pl.touk.nussknacker.engine.flink.test.MiniClusterExecutionEnvironment#execute - modelData.modelClassLoaderUrls match { + modelClassLoader.urls match { case Nil => ConfigUtils.decodeListFromConfig[String, URL, MalformedURLException]( configuration, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index 75ba6546e70..183931a697f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -4,7 +4,7 @@ import io.circe.Json import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} +import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.test.ScenarioTestData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.testmode.{ ResultsCollectingListenerHolder, TestServiceInvocationCollector } +import pl.touk.nussknacker.engine.util.MetaDataExtractor import scala.util.Using @@ -55,17 +56,21 @@ class FlinkTestMain( val configuration: Configuration ) { - private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration) + private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration) def runTest: TestResults[Json] = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener try { val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar(collectingListener, scenarioTestData) - val env = stubbedRunner.createEnv + val parallelism = MetaDataExtractor + .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) + .parallelism + .getOrElse(1) + val env = stubbedRunner.createEnv(parallelism) registrar.register(env, process, processVersion, deploymentData, resultCollector) - stubbedRunner.execute(env, SavepointRestoreSettings.none()) + stubbedRunner.execute(env, parallelism, process.name, SavepointRestoreSettings.none()) collectingListener.results } finally { collectingListener.clean() diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala index b5795736780..49a73900af6 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala @@ -3,13 +3,14 @@ package pl.touk.nussknacker.engine.process.runner import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestRunId, TestServiceInvocationCollector} +import pl.touk.nussknacker.engine.util.MetaDataExtractor object FlinkVerificationMain extends FlinkRunner { @@ -35,16 +36,20 @@ class FlinkVerificationMain( val configuration: Configuration ) { - private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration) + private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration) def runTest(): Unit = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar() - val env = stubbedRunner.createEnv + val parallelism = MetaDataExtractor + .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) + .parallelism + .getOrElse(1) + val env = stubbedRunner.createEnv(parallelism) registrar.register(env, process, processVersion, deploymentData, resultCollector) - stubbedRunner.execute(env, SavepointRestoreSettings.forPath(savepointPath, true)) + stubbedRunner.execute(env, parallelism, process.name, SavepointRestoreSettings.forPath(savepointPath, true)) } protected def prepareRegistrar(): FlinkProcessRegistrar = {