diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala index 325ae48b154..a5e9aaf4015 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala @@ -1,33 +1,20 @@ package pl.touk.nussknacker.engine.process.runner -import org.apache.flink.configuration.{ - ConfigUtils, - Configuration, - CoreOptions, - PipelineOptions, - RestOptions, - TaskManagerOptions -} +import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.StreamMetaData -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.util.MetaDataExtractor +import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import java.net.{MalformedURLException, URL} import scala.jdk.CollectionConverters._ -import scala.util.Using -class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, configuration: Configuration) { +class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration: Configuration) { - def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment( - MetaDataExtractor - .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) - .parallelism - .getOrElse(1), + def createEnv(parallelism: Int): StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment( + parallelism, configuration ) @@ -46,6 +33,7 @@ class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, config def execute[T]( miniCluster: MiniCluster, env: StreamExecutionEnvironment, + scenarioName: ProcessName, savepointRestoreSettings: SavepointRestoreSettings ): Unit = { // Checkpoints are disabled to prevent waiting for checkpoint to happen @@ -53,10 +41,11 @@ class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, config env.getCheckpointConfig.disableCheckpointing() val streamGraph = env.getStreamGraph - streamGraph.setJobName(process.name.value) + streamGraph.setJobName(scenarioName.value) val jobGraph = streamGraph.getJobGraph() - jobGraph.setClasspaths(classpathsFromModelWithFallbackToConfiguration) + // FIXME abr: Is it fine? + jobGraph.setClasspaths(modelClassLoader.urls.asJava) jobGraph.setSavepointRestoreSettings(savepointRestoreSettings) val configuration: Configuration = new Configuration @@ -78,7 +67,7 @@ class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, config // The class is also used in some scala tests // and this fallback is to work with a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265 // see details in pl.touk.nussknacker.engine.flink.test.MiniClusterExecutionEnvironment#execute - modelData.modelClassLoaderUrls match { + modelClassLoader.urls match { case Nil => ConfigUtils.decodeListFromConfig[String, URL, MalformedURLException]( configuration, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index 27c16053c8f..8ae01a4a1f4 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -6,7 +6,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.test.ScenarioTestData -import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} +import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData} import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.testmode.{ ResultsCollectingListenerHolder, TestServiceInvocationCollector } +import pl.touk.nussknacker.engine.util.MetaDataExtractor import scala.util.Using @@ -55,18 +56,22 @@ class FlinkTestMain( val configuration: Configuration ) { - private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration) + private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration) def runTest: TestResults[Json] = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener try { val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar(collectingListener, scenarioTestData) - val env = stubbedRunner.createEnv + val parallelism = MetaDataExtractor + .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) + .parallelism + .getOrElse(1) + val env = stubbedRunner.createEnv(parallelism) registrar.register(env, process, processVersion, deploymentData, resultCollector) - Using.resource(stubbedRunner.createMiniCluster(env.getParallelism)) { miniCluster => - stubbedRunner.execute(miniCluster, env, SavepointRestoreSettings.none()) + Using.resource(stubbedRunner.createMiniCluster(parallelism)) { miniCluster => + stubbedRunner.execute(miniCluster, env, process.name, SavepointRestoreSettings.none()) collectingListener.results } } finally { diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala index 247ebcff80a..994ebefda2a 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala @@ -3,13 +3,14 @@ package pl.touk.nussknacker.engine.process.runner import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestServiceInvocationCollector} +import pl.touk.nussknacker.engine.util.MetaDataExtractor import scala.util.Using @@ -37,17 +38,21 @@ class FlinkVerificationMain( val configuration: Configuration ) { - private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration) + private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration) def runTest(): Unit = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar() - val env = stubbedRunner.createEnv + val parallelism = MetaDataExtractor + .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) + .parallelism + .getOrElse(1) + val env = stubbedRunner.createEnv(parallelism) registrar.register(env, process, processVersion, deploymentData, resultCollector) - Using.resource(stubbedRunner.createMiniCluster(env.getParallelism)) { miniCluster => - stubbedRunner.execute(miniCluster, env, SavepointRestoreSettings.forPath(savepointPath, true)) + Using.resource(stubbedRunner.createMiniCluster(parallelism)) { miniCluster => + stubbedRunner.execute(miniCluster, env, process.name, SavepointRestoreSettings.forPath(savepointPath, true)) } }