From 8118f735bf8c1ca083bff3c86ee2a8456132bbec Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Fri, 17 Jan 2025 18:12:52 +0100 Subject: [PATCH] StreamExecutionEnvironment and MiniCluster created once --- build.sbt | 31 +++--- ...DevelopmentDeploymentManagerProvider.scala | 12 ++- .../MockableDeploymentManagerProvider.scala | 11 +- .../process/runner/FlinkStubbedRunner.scala | 100 ------------------ .../engine/process/runner/FlinkTestMain.scala | 31 +++--- .../runner/FlinkVerificationMain.scala | 43 +++++--- .../testmechanism/FlinkStubbedRunner.scala | 38 +++++++ .../FlinkStreamingProcessMainSpec.scala | 56 ---------- .../runner/SimpleProcessConfigCreator.scala | 59 +++++++++++ .../management/FlinkDeploymentManager.scala | 7 +- .../management/FlinkProcessTestRunner.scala | 29 ----- .../FlinkProcessTestRunner.scala | 45 ++++++++ .../FlinkProcessVerifier.scala | 38 +++++-- .../TestsMechanismMiniClusterFactory.scala | 39 +++++++ ...ismStreamExecutionEnvironmentFactory.scala | 19 ++++ ...er.engine.api.process.ProcessConfigCreator | 1 + .../FlinkProcessTestRunnerSpec.scala} | 53 ++++------ .../SimpleProcessConfigCreator.scala | 59 +++++++++++ .../kafka}/TestFromFileSpec.scala | 15 ++- .../schemedkafka}/TestWithTestDataSpec.scala | 28 +++-- 20 files changed, 409 insertions(+), 305 deletions(-) delete mode 100644 engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala create mode 100644 engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/testmechanism/FlinkStubbedRunner.scala create mode 100644 engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala delete mode 100644 engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala create mode 100644 engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala rename engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/{ => testsmechanism}/FlinkProcessVerifier.scala (52%) create mode 100644 engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismMiniClusterFactory.scala create mode 100644 engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismStreamExecutionEnvironmentFactory.scala create mode 100644 engine/flink/management/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.api.process.ProcessConfigCreator rename engine/flink/{executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala => management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala} (96%) create mode 100644 engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/SimpleProcessConfigCreator.scala rename engine/flink/{kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink => tests/src/test/scala/pl/touk/nussknacker/testsmechanism/kafka}/TestFromFileSpec.scala (92%) rename engine/flink/{schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink => tests/src/test/scala/pl/touk/nussknacker/testsmechanism/schemedkafka}/TestWithTestDataSpec.scala (91%) diff --git a/build.sbt b/build.sbt index 388e6e75faa..69f705a93b4 100644 --- a/build.sbt +++ b/build.sbt @@ -632,7 +632,8 @@ lazy val flinkDeploymentManager = (project in flink("management")) componentsApi % Provided, httpUtils % Provided, flinkScalaUtils % Provided, - flinkTestUtils % IntegrationTest, + flinkExecutor % Test, + flinkTestUtils % "it,test", kafkaTestUtils % "it,test" ) @@ -742,18 +743,22 @@ lazy val flinkTests = (project in flink("tests")) } ) .dependsOn( - defaultModel % Test, - flinkExecutor % Test, - flinkKafkaComponents % Test, - flinkBaseComponents % Test, - flinkBaseUnboundedComponents % Test, - flinkTableApiComponents % Test, - flinkTestUtils % Test, - kafkaTestUtils % Test, - flinkComponentsTestkit % Test, + defaultModel % Test, + flinkExecutor % Test, + flinkKafkaComponents % Test, + flinkBaseComponents % Test, + flinkBaseUnboundedComponents % Test, + flinkTableApiComponents % Test, + flinkTestUtils % Test, + kafkaTestUtils % Test, + flinkComponentsTestkit % Test, + flinkDeploymentManager % Test, + // TODO: cleanup kafka testsmechanism tests in order to remove test->test dependency + flinkKafkaComponentsUtils % "test->test", + flinkSchemedKafkaComponentsUtils % "test->test", // for local development - designer % Test, - deploymentManagerApi % Test + designer % Test, + deploymentManagerApi % Test ) lazy val defaultModel = (project in (file("defaultModel"))) @@ -997,7 +1002,7 @@ lazy val flinkSchemedKafkaComponentsUtils = (project in flink("schemed-kafka-com componentsUtils % Provided, kafkaTestUtils % Test, flinkTestUtils % Test, - flinkExecutor % Test + flinkExecutor % Test, ) lazy val flinkKafkaComponentsUtils = (project in flink("kafka-components-utils")) diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala index 7a247f11d78..e7fe3ac2173 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala @@ -4,6 +4,7 @@ import akka.actor.ActorSystem import cats.data.{Validated, ValidatedNel} import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging +import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.development.manager.DevelopmentStateStatus._ import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.ProcessVersion @@ -19,7 +20,8 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment._ -import pl.touk.nussknacker.engine.management.{FlinkProcessTestRunner, FlinkStreamingPropertiesConfig} +import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig +import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner import java.net.URI import java.util.UUID @@ -48,7 +50,8 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode private val memory: TrieMap[ProcessName, StatusDetails] = TrieMap[ProcessName, StatusDetails]() private val random = new scala.util.Random() - private lazy val flinkTestRunner = new FlinkProcessTestRunner(modelData.asInvokableModelData) + private lazy val flinkTestRunner = + new FlinkProcessTestRunner(modelData.asInvokableModelData, parallelism = 1, new Configuration()) implicit private class ProcessStateExpandable(processState: StatusDetails) { @@ -83,7 +86,10 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode case command: DMRunOffScheduleCommand => runOffSchedule(command) case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult("")) case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) => - flinkTestRunner.test(canonicalProcess, scenarioTestData) // it's just for streaming e2e tests from file purposes + flinkTestRunner.runTestsAsync( + canonicalProcess, + scenarioTestData + ) // it's just for streaming e2e tests from file purposes } private def description(canonicalProcess: CanonicalProcess) = { diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index 7627c005fb7..7110bd056df 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -4,16 +4,17 @@ import cats.data.Validated.valid import cats.data.ValidatedNel import com.typesafe.config.Config import io.circe.Json +import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager import pl.touk.nussknacker.engine.ModelData.BaseModelDataExt import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig -import pl.touk.nussknacker.engine.api.definition.{NotBlankParameterValidator, StringParameterEditor} import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId -import pl.touk.nussknacker.engine.management.{FlinkProcessTestRunner, FlinkStreamingPropertiesConfig} +import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig +import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner import pl.touk.nussknacker.engine.newdeployment.DeploymentId import pl.touk.nussknacker.engine.testing.StubbingCommands import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults @@ -59,7 +60,9 @@ object MockableDeploymentManagerProvider { with StubbingCommands { private lazy val testRunnerOpt = - modelDataOpt.map(modelData => new FlinkProcessTestRunner(modelData.asInvokableModelData)) + modelDataOpt.map(modelData => + new FlinkProcessTestRunner(modelData.asInvokableModelData, parallelism = 1, new Configuration()) + ) override def resolve( idWithName: ProcessIdWithName, @@ -102,7 +105,7 @@ object MockableDeploymentManagerProvider { .get() .get(processVersion.processName.value) .map(Future.successful) - .orElse(testRunnerOpt.map(_.test(scenario, testData))) + .orElse(testRunnerOpt.map(_.runTestsAsync(scenario, testData))) .getOrElse( throw new IllegalArgumentException( s"Tests results not mocked for scenario [${processVersion.processName.value}] and no model data provided" diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala deleted file mode 100644 index 77b0e19e2af..00000000000 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala +++ /dev/null @@ -1,100 +0,0 @@ -package pl.touk.nussknacker.engine.process.runner - -import org.apache.flink.configuration._ -import org.apache.flink.core.fs.FileSystem -import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointRestoreSettings} -import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration} -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import pl.touk.nussknacker.engine.api.process.ProcessName -import pl.touk.nussknacker.engine.util.loader.ModelClassLoader - -import java.net.{MalformedURLException, URL} -import scala.jdk.CollectionConverters._ -import scala.util.Using - -final class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration: Configuration) { - - def createEnv(parallelism: Int): StreamExecutionEnvironment = { - val env = StreamExecutionEnvironment.createLocalEnvironment( - parallelism, - configuration - ) - // Checkpoints are disabled to prevent waiting for checkpoint to happen - // before finishing execution. - env.getCheckpointConfig.disableCheckpointing() - env - } - - private def createMiniCluster(configuration: Configuration, numSlotsPerTaskManager: Int) = { - val miniCluster = new MiniCluster( - new MiniClusterConfiguration.Builder() - .setNumSlotsPerTaskManager(numSlotsPerTaskManager) - .setConfiguration(configuration) - .build() - ) - miniCluster.start() - miniCluster - } - - // we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath... - def execute( - env: StreamExecutionEnvironment, - parallelism: Int, - scenarioName: ProcessName, - savepointRestoreSettings: SavepointRestoreSettings - ): Unit = { - val streamGraph = env.getStreamGraph - streamGraph.setJobName(scenarioName.value) - val jobGraph = streamGraph.getJobGraph - setupJobGraph(jobGraph, savepointRestoreSettings) - - val miniClusterConfiguration = prepareMiniClusterConfiguration(numTaskSlots = parallelism, jobGraph) - - // it is required for proper working of HadoopFileSystem - FileSystem.initialize(miniClusterConfiguration, null) - - val miniCluster = createMiniCluster(miniClusterConfiguration, numSlotsPerTaskManager = parallelism) - - Using.resource(miniCluster) { miniCluster => - val id = miniCluster.submitJob(jobGraph).get().getJobID - miniCluster.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader) - } - } - - private def setupJobGraph( - jobGraph: JobGraph, - savepointRestoreSettings: SavepointRestoreSettings - ): Unit = { - jobGraph.setClasspaths(classpathsFromModelWithFallbackToConfiguration) - jobGraph.setSavepointRestoreSettings(savepointRestoreSettings) - } - - private def classpathsFromModelWithFallbackToConfiguration = { - // The class is also used in some scala tests - // and this fallback is to work with a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265 - // see details in pl.touk.nussknacker.engine.flink.test.MiniClusterExecutionEnvironment#execute - modelClassLoader.urls match { - // FIXME abr: is it necessary? -// case Nil => -// ConfigUtils.decodeListFromConfig[String, URL, MalformedURLException]( -// configuration, -// PipelineOptions.CLASSPATHS, -// new URL(_) -// ) - case list => list.asJava - } - } - - private def prepareMiniClusterConfiguration(numTaskSlots: Int, jobGraph: JobGraph) = { - val configuration: Configuration = new Configuration - // FIXME abr: is it necessary? -// configuration.addAll(jobGraph.getJobConfiguration) - configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, numTaskSlots) - configuration.set[Integer](RestOptions.PORT, 0) - - // FIXME: reversing flink default order - configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") - configuration - } - -} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index 93acbb61921..eee9b7afb2e 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -1,16 +1,18 @@ package pl.touk.nussknacker.engine.process.runner import io.circe.Json -import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings +import org.apache.flink.runtime.minicluster.MiniCluster +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.test.ScenarioTestData -import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion, StreamMetaData} +import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData} import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar +import pl.touk.nussknacker.engine.process.testmechanism.FlinkStubbedRunner import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults import pl.touk.nussknacker.engine.testmode.{ @@ -18,57 +20,54 @@ import pl.touk.nussknacker.engine.testmode.{ ResultsCollectingListenerHolder, TestServiceInvocationCollector } -import pl.touk.nussknacker.engine.util.MetaDataExtractor object FlinkTestMain extends FlinkRunner { def run( + miniCluster: MiniCluster, + env: StreamExecutionEnvironment, modelData: ModelData, process: CanonicalProcess, - scenarioTestData: ScenarioTestData, - configuration: Configuration, + scenarioTestData: ScenarioTestData ): TestResults[Json] = { val processVersion = ProcessVersion.empty.copy(processName = ProcessName("snapshot version") ) // testing process may be unreleased, so it has no version new FlinkTestMain( + miniCluster, + env, modelData, process, scenarioTestData, processVersion, DeploymentData.empty.copy(additionalModelConfigs = AdditionalModelConfigs(modelData.additionalConfigsFromProvider) - ), - configuration + ) ).runTest } } class FlinkTestMain( + miniCluster: MiniCluster, + env: StreamExecutionEnvironment, modelData: ModelData, process: CanonicalProcess, scenarioTestData: ScenarioTestData, processVersion: ProcessVersion, - deploymentData: DeploymentData, - configuration: Configuration + deploymentData: DeploymentData ) { - private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration) + private val stubbedRunner = new FlinkStubbedRunner(miniCluster, env) def runTest: TestResults[Json] = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener try { val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar(collectingListener, scenarioTestData) - val parallelism = MetaDataExtractor - .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) - .parallelism - .getOrElse(1) - val env = stubbedRunner.createEnv(parallelism) registrar.register(env, process, processVersion, deploymentData, resultCollector) - stubbedRunner.execute(env, parallelism, process.name, SavepointRestoreSettings.none()) + stubbedRunner.execute(process.name, SavepointRestoreSettings.none(), modelData.modelClassLoader) collectingListener.results } finally { collectingListener.clean() diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala index f1dfc19fcff..86147380cda 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala @@ -1,55 +1,64 @@ package pl.touk.nussknacker.engine.process.runner -import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings +import org.apache.flink.runtime.minicluster.MiniCluster +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData} +import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar +import pl.touk.nussknacker.engine.process.testmechanism.FlinkStubbedRunner import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} -import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestRunId, TestServiceInvocationCollector} -import pl.touk.nussknacker.engine.util.MetaDataExtractor +import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestServiceInvocationCollector} object FlinkVerificationMain extends FlinkRunner { def run( + miniCluster: MiniCluster, + env: StreamExecutionEnvironment, modelData: ModelData, process: CanonicalProcess, processVersion: ProcessVersion, deploymentData: DeploymentData, - savepointPath: String, - configuration: Configuration + savepointPath: String ): Unit = - new FlinkVerificationMain(modelData, process, processVersion, deploymentData, savepointPath, configuration) - .runTest() + new FlinkVerificationMain( + miniCluster, + env, + modelData, + process, + processVersion, + deploymentData, + savepointPath + ).runTest() } class FlinkVerificationMain( + miniCluster: MiniCluster, + env: StreamExecutionEnvironment, modelData: ModelData, process: CanonicalProcess, processVersion: ProcessVersion, deploymentData: DeploymentData, - savepointPath: String, - configuration: Configuration + savepointPath: String ) { - private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration) + private val stubbedRunner = new FlinkStubbedRunner(miniCluster, env) def runTest(): Unit = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar() - val parallelism = MetaDataExtractor - .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) - .parallelism - .getOrElse(1) - val env = stubbedRunner.createEnv(parallelism) registrar.register(env, process, processVersion, deploymentData, resultCollector) - stubbedRunner.execute(env, parallelism, process.name, SavepointRestoreSettings.forPath(savepointPath, true)) + stubbedRunner.execute( + process.name, + SavepointRestoreSettings.forPath(savepointPath, true), + modelData.modelClassLoader + ) } protected def prepareRegistrar(): FlinkProcessRegistrar = { diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/testmechanism/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/testmechanism/FlinkStubbedRunner.scala new file mode 100644 index 00000000000..446a182879d --- /dev/null +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/testmechanism/FlinkStubbedRunner.scala @@ -0,0 +1,38 @@ +package pl.touk.nussknacker.engine.process.testmechanism + +import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointRestoreSettings} +import org.apache.flink.runtime.minicluster.MiniCluster +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.engine.util.loader.ModelClassLoader + +import scala.jdk.CollectionConverters._ + +// FIXME abr: rename +// we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath... +final class FlinkStubbedRunner(miniCluster: MiniCluster, env: StreamExecutionEnvironment) { + + def execute( + scenarioName: ProcessName, + savepointRestoreSettings: SavepointRestoreSettings, + modelClassLoader: ModelClassLoader + ): Unit = { + val streamGraph = env.getStreamGraph + streamGraph.setJobName(scenarioName.value) + val jobGraph = streamGraph.getJobGraph + setupJobGraph(jobGraph, savepointRestoreSettings, modelClassLoader) + + val id = miniCluster.submitJob(jobGraph).get().getJobID + miniCluster.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader) + } + + private def setupJobGraph( + jobGraph: JobGraph, + savepointRestoreSettings: SavepointRestoreSettings, + modelClassLoader: ModelClassLoader + ): Unit = { + jobGraph.setClasspaths(modelClassLoader.urls.asJava) + jobGraph.setSavepointRestoreSettings(savepointRestoreSettings) + } + +} diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala index cdbdb6b652e..8689d06c41e 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala @@ -7,18 +7,9 @@ import org.scalatest.Inside import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api._ -import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration -import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ -import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder -import pl.touk.nussknacker.engine.process.runner.SimpleProcessConfigCreator.{ - sinkForIntsResultsHolder, - valueMonitorResultsHolder -} - -import java.net.ConnectException class FlinkStreamingProcessMainSpec extends AnyFlatSpec with Matchers with Inside { @@ -51,50 +42,3 @@ class FlinkStreamingProcessMainSpec extends AnyFlatSpec with Matchers with Insid } } - -class SimpleProcessConfigCreator extends EmptyProcessConfigCreator { - - override def services(modelDependencies: ProcessObjectDependencies): Map[String, WithCategories[Service]] = - Map( - "logService" -> WithCategories(LogService, "c1"), - "throwingService" -> WithCategories(new ThrowingService(new RuntimeException("Thrown as expected")), "c1"), - "throwingTransientService" -> WithCategories(new ThrowingService(new ConnectException()), "c1"), - "returningDependentTypeService" -> WithCategories(ReturningDependentTypeService, "c1"), - "collectingEager" -> WithCategories(CollectingEagerService, "c1"), - "returningComponentUseCaseService" -> WithCategories(ReturningComponentUseCaseService, "c1") - ) - - override def sinkFactories( - modelDependencies: ProcessObjectDependencies - ): Map[String, WithCategories[SinkFactory]] = Map( - "monitor" -> WithCategories(SinkFactory.noParam(MonitorEmptySink), "c2"), - "valueMonitor" -> WithCategories(SinkForAny(valueMonitorResultsHolder), "c2"), - "sinkForInts" -> WithCategories.anyCategory(SinkForInts(sinkForIntsResultsHolder)) - ) - - override def customStreamTransformers( - modelDependencies: ProcessObjectDependencies - ): Map[String, WithCategories[CustomStreamTransformer]] = Map( - "stateCustom" -> WithCategories.anyCategory(StateCustomNode), - "transformWithTime" -> WithCategories.anyCategory(TransformerWithTime), - "joinBranchExpression" -> WithCategories.anyCategory(CustomJoinUsingBranchExpressions), - "transformerAddingComponentUseCase" -> WithCategories.anyCategory(TransformerAddingComponentUseCase) - ) - - override def sourceFactories( - modelDependencies: ProcessObjectDependencies - ): Map[String, WithCategories[SourceFactory]] = Map( - "input" -> WithCategories(simpleRecordSource(Nil), "cat2"), - "jsonInput" -> WithCategories(jsonSource, "cat2"), - "typedJsonInput" -> WithCategories(TypedJsonSource, "cat2"), - "genericSourceWithCustomVariables" -> WithCategories.anyCategory(GenericSourceWithCustomVariables) - ) - -} - -object SimpleProcessConfigCreator extends Serializable { - - val valueMonitorResultsHolder = new TestResultsHolder[AnyRef] - val sinkForIntsResultsHolder = new TestResultsHolder[java.lang.Integer] - -} diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala new file mode 100644 index 00000000000..dc4206f898c --- /dev/null +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala @@ -0,0 +1,59 @@ +package pl.touk.nussknacker.engine.process.runner + +import pl.touk.nussknacker.engine.api.process._ +import pl.touk.nussknacker.engine.api.{CustomStreamTransformer, Service} +import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ +import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder +import pl.touk.nussknacker.engine.process.runner.SimpleProcessConfigCreator.{ + sinkForIntsResultsHolder, + valueMonitorResultsHolder +} + +import java.net.ConnectException + +class SimpleProcessConfigCreator extends EmptyProcessConfigCreator { + + override def services(modelDependencies: ProcessObjectDependencies): Map[String, WithCategories[Service]] = + Map( + "logService" -> WithCategories(LogService, "c1"), + "throwingService" -> WithCategories(new ThrowingService(new RuntimeException("Thrown as expected")), "c1"), + "throwingTransientService" -> WithCategories(new ThrowingService(new ConnectException()), "c1"), + "returningDependentTypeService" -> WithCategories(ReturningDependentTypeService, "c1"), + "collectingEager" -> WithCategories(CollectingEagerService, "c1"), + "returningComponentUseCaseService" -> WithCategories(ReturningComponentUseCaseService, "c1") + ) + + override def sinkFactories( + modelDependencies: ProcessObjectDependencies + ): Map[String, WithCategories[SinkFactory]] = Map( + "monitor" -> WithCategories(SinkFactory.noParam(MonitorEmptySink), "c2"), + "valueMonitor" -> WithCategories(SinkForAny(valueMonitorResultsHolder), "c2"), + "sinkForInts" -> WithCategories.anyCategory(SinkForInts(sinkForIntsResultsHolder)) + ) + + override def customStreamTransformers( + modelDependencies: ProcessObjectDependencies + ): Map[String, WithCategories[CustomStreamTransformer]] = Map( + "stateCustom" -> WithCategories.anyCategory(StateCustomNode), + "transformWithTime" -> WithCategories.anyCategory(TransformerWithTime), + "joinBranchExpression" -> WithCategories.anyCategory(CustomJoinUsingBranchExpressions), + "transformerAddingComponentUseCase" -> WithCategories.anyCategory(TransformerAddingComponentUseCase) + ) + + override def sourceFactories( + modelDependencies: ProcessObjectDependencies + ): Map[String, WithCategories[SourceFactory]] = Map( + "input" -> WithCategories(simpleRecordSource(Nil), "cat2"), + "jsonInput" -> WithCategories(jsonSource, "cat2"), + "typedJsonInput" -> WithCategories(TypedJsonSource, "cat2"), + "genericSourceWithCustomVariables" -> WithCategories.anyCategory(GenericSourceWithCustomVariables) + ) + +} + +object SimpleProcessConfigCreator extends Serializable { + + val valueMonitorResultsHolder = new TestResultsHolder[AnyRef] + val sinkForIntsResultsHolder = new TestResultsHolder[java.lang.Integer] + +} diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index b6f768ee7c1..9b995c45791 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.management import cats.implicits._ import com.typesafe.scalalogging.LazyLogging import io.circe.syntax.EncoderOps +import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.engine.ModelData._ import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy @@ -13,6 +14,7 @@ import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, V import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId} import pl.touk.nussknacker.engine.management.FlinkDeploymentManager.prepareProgramArgs +import pl.touk.nussknacker.engine.management.testsmechanism.{FlinkProcessTestRunner, FlinkProcessVerifier} import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment} import scala.concurrent.Future @@ -27,7 +29,8 @@ abstract class FlinkDeploymentManager( import dependencies._ - private lazy val testRunner = new FlinkProcessTestRunner(modelData.asInvokableModelData) + private lazy val testRunner = + new FlinkProcessTestRunner(modelData.asInvokableModelData, parallelism = 1, new Configuration) private lazy val verification = new FlinkProcessVerifier(modelData.asInvokableModelData) @@ -115,7 +118,7 @@ abstract class FlinkDeploymentManager( makeSavepoint(_, savepointDir) } case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) => - testRunner.test(canonicalProcess, scenarioTestData) + testRunner.runTestsAsync(canonicalProcess, scenarioTestData) case _: DMRunOffScheduleCommand => notImplemented } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala deleted file mode 100644 index f1522ecdfe6..00000000000 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala +++ /dev/null @@ -1,29 +0,0 @@ -package pl.touk.nussknacker.engine.management - -import io.circe.Json -import org.apache.flink.configuration.Configuration -import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.test.ScenarioTestData -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults -import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker - -import scala.concurrent.{ExecutionContext, Future} - -class FlinkProcessTestRunner(modelData: ModelData) { - - private val methodInvoker = new ReflectiveMethodInvoker[TestResults[Json]]( - modelData.modelClassLoader.classLoader, - "pl.touk.nussknacker.engine.process.runner.FlinkTestMain", - "run" - ) - - // NU-1455: We encode variable on the engine, because of classLoader's problems - def test(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)( - implicit ec: ExecutionContext - ): Future[TestResults[Json]] = - Future { - methodInvoker.invokeStaticMethod(modelData, canonicalProcess, scenarioTestData, new Configuration()) - } - -} diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala new file mode 100644 index 00000000000..1bc413a1909 --- /dev/null +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala @@ -0,0 +1,45 @@ +package pl.touk.nussknacker.engine.management.testsmechanism + +import io.circe.Json +import org.apache.flink.configuration.Configuration +import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.api.test.ScenarioTestData +import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults +import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker + +import scala.concurrent.{ExecutionContext, Future} + +class FlinkProcessTestRunner(modelData: ModelData, parallelism: Int, streamExecutionConfig: Configuration) { + + private val streamExecutionEnvironment = + TestsMechanismStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment(parallelism, streamExecutionConfig) + + private val miniCluster = TestsMechanismMiniClusterFactory.createConfiguredMiniCluster(parallelism) + + // We use reflection to avoid bundling of flinkExecutor.jar inside flinkDeploymentManager assembly jar + // TODO: use provided dependency instead + private val methodInvoker = new ReflectiveMethodInvoker[TestResults[Json]]( + modelData.modelClassLoader.classLoader, + "pl.touk.nussknacker.engine.process.runner.FlinkTestMain", + "run" + ) + + def runTestsAsync(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)( + implicit ec: ExecutionContext + ): Future[TestResults[Json]] = + Future { + runTests(canonicalProcess, scenarioTestData) + } + + // NU-1455: We encode variable on the engine, because of classLoader's problems + def runTests(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = + methodInvoker.invokeStaticMethod( + miniCluster, + streamExecutionEnvironment, + modelData, + canonicalProcess, + scenarioTestData + ) + +} diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessVerifier.scala similarity index 52% rename from engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala rename to engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessVerifier.scala index 82691385dc1..2df63253ac1 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessVerifier.scala @@ -1,18 +1,21 @@ -package pl.touk.nussknacker.engine.management +package pl.touk.nussknacker.engine.management.testsmechanism import com.typesafe.scalalogging.LazyLogging import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData -import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker +import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ReflectiveMethodInvoker} import scala.concurrent.Future +import scala.util.Using import scala.util.control.NonFatal class FlinkProcessVerifier(modelData: ModelData) extends LazyLogging { + // We use reflection to avoid bundling of flinkExecutor.jar inside flinkDeploymentManager assembly jar + // TODO: use provided dependency instead private val methodInvoker = new ReflectiveMethodInvoker[Unit]( modelData.modelClassLoader.classLoader, "pl.touk.nussknacker.engine.process.runner.FlinkVerificationMain", @@ -24,17 +27,30 @@ class FlinkProcessVerifier(modelData: ModelData) extends LazyLogging { canonicalProcess: CanonicalProcess, savepointPath: String ): Future[Unit] = { + val parallelism = MetaDataExtractor + .extractTypeSpecificDataOrDefault[StreamMetaData](canonicalProcess.metaData, StreamMetaData()) + .parallelism + .getOrElse(1) val processId = processVersion.processName try { logger.info(s"Starting to verify $processId") - methodInvoker.invokeStaticMethod( - modelData, - canonicalProcess, - processVersion, - DeploymentData.empty, - savepointPath, - new Configuration() - ) + // TODO: reuse a single mini cluster between each verifications + Using.resource(TestsMechanismMiniClusterFactory.createConfiguredMiniCluster(parallelism)) { miniCluster => + val env = TestsMechanismStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment( + parallelism, + new Configuration() + ) + + methodInvoker.invokeStaticMethod( + miniCluster, + env, + modelData, + canonicalProcess, + processVersion, + DeploymentData.empty, + savepointPath + ) + } logger.info(s"Verification of $processId successful") Future.successful(()) } catch { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismMiniClusterFactory.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismMiniClusterFactory.scala new file mode 100644 index 00000000000..f0a4aed9437 --- /dev/null +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismMiniClusterFactory.scala @@ -0,0 +1,39 @@ +package pl.touk.nussknacker.engine.management.testsmechanism + +import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions} +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration} + +object TestsMechanismMiniClusterFactory { + + def createConfiguredMiniCluster(parallelism: Int): MiniCluster = { + val miniClusterConfiguration = prepareMiniClusterConfiguration(numTaskSlots = parallelism) + + // it is required for proper working of HadoopFileSystem + FileSystem.initialize(miniClusterConfiguration, null) + + createMiniCluster(miniClusterConfiguration, numSlotsPerTaskManager = parallelism) + } + + private def prepareMiniClusterConfiguration(numTaskSlots: Int) = { + val configuration: Configuration = new Configuration + configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, numTaskSlots) + configuration.set[Integer](RestOptions.PORT, 0) + + // FIXME: reversing flink default order + configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") + configuration + } + + private def createMiniCluster(configuration: Configuration, numSlotsPerTaskManager: Int) = { + val miniCluster = new MiniCluster( + new MiniClusterConfiguration.Builder() + .setNumSlotsPerTaskManager(numSlotsPerTaskManager) + .setConfiguration(configuration) + .build() + ) + miniCluster.start() + miniCluster + } + +} diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismStreamExecutionEnvironmentFactory.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismStreamExecutionEnvironmentFactory.scala new file mode 100644 index 00000000000..172ff53f26d --- /dev/null +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/TestsMechanismStreamExecutionEnvironmentFactory.scala @@ -0,0 +1,19 @@ +package pl.touk.nussknacker.engine.management.testsmechanism + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment + +object TestsMechanismStreamExecutionEnvironmentFactory { + + def createStreamExecutionEnvironment(parallelism: Int, configuration: Configuration): StreamExecutionEnvironment = { + val env = StreamExecutionEnvironment.createLocalEnvironment( + parallelism, + configuration + ) + // Checkpoints are disabled to prevent waiting for checkpoint to happen + // before finishing execution. + env.getCheckpointConfig.disableCheckpointing() + env + } + +} diff --git a/engine/flink/management/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.api.process.ProcessConfigCreator b/engine/flink/management/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.api.process.ProcessConfigCreator new file mode 100644 index 00000000000..dc12680791f --- /dev/null +++ b/engine/flink/management/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.api.process.ProcessConfigCreator @@ -0,0 +1 @@ +pl.touk.nussknacker.engine.management.testsmechanism.SimpleProcessConfigCreator diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala similarity index 96% rename from engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala rename to engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala index fc4f607876f..5e53e6e7223 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.process.runner +package pl.touk.nussknacker.engine.management.testsmechanism import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import io.circe.Json @@ -7,50 +7,38 @@ import org.apache.flink.runtime.client.JobExecutionException import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfterEach, Inside, OptionValues} -import pl.touk.nussknacker.engine.api.component.{ - ComponentAdditionalConfig, - DesignerWideComponentId, - ParameterAdditionalUIConfig -} -import pl.touk.nussknacker.engine.api.parameter.{ - ParameterName, - ParameterValueCompileTimeValidation, - ValueInputWithDictEditor -} +import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId, ParameterAdditionalUIConfig} +import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ParameterValueCompileTimeValidation, ValueInputWithDictEditor} import pl.touk.nussknacker.engine.api.process.ComponentUseCase import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} -import pl.touk.nussknacker.engine.api.{DisplayJsonWithEncoder, FragmentSpecificData, MetaData} +import pl.touk.nussknacker.engine.api.{DisplayJsonWithEncoder, FragmentSpecificData, MetaData, StreamMetaData} import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.canonicalgraph.canonicalnode.FlatNode import pl.touk.nussknacker.engine.compile.FragmentResolver -import pl.touk.nussknacker.engine.flink.test.{ - FlinkTestConfiguration, - RecordingExceptionConsumer, - RecordingExceptionConsumerProvider -} +import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs +import pl.touk.nussknacker.engine.flink.test.{FlinkTestConfiguration, RecordingExceptionConsumer, RecordingExceptionConsumerProvider} import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter} import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition} +import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunnerSpec.{fragmentWithValidationName, processWithFragmentParameterValidation} import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ -import pl.touk.nussknacker.engine.process.runner.FlinkTestMainSpec.{ - fragmentWithValidationName, - processWithFragmentParameterValidation -} import pl.touk.nussknacker.engine.testmode.TestProcess._ -import pl.touk.nussknacker.engine.util.ThreadUtils -import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} -import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs -import pl.touk.nussknacker.engine.testing.LocalModelData +import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ThreadUtils} import pl.touk.nussknacker.engine.util.loader.ModelClassLoader +import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} import java.util.{Date, UUID} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} -import scala.jdk.CollectionConverters._ -class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with BeforeAndAfterEach with OptionValues { +class FlinkProcessTestRunnerSpec + extends AnyWordSpec + with Matchers + with Inside + with BeforeAndAfterEach + with OptionValues { import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap @@ -244,7 +232,6 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor val nodeResults = results.nodeResults nodeResults(sourceNodeId) should have length 5 - } "detect errors" in { @@ -776,7 +763,13 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor resolveConfigs = false ) ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - FlinkTestMain.run(modelData, process, scenarioTestData, FlinkTestConfiguration.setupMemory(new Configuration)) + // TODO: reuse this instance between all test cases + val parallelism = MetaDataExtractor + .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) + .parallelism + .getOrElse(1) + new FlinkProcessTestRunner(modelData, parallelism, FlinkTestConfiguration.setupMemory(new Configuration)) + .runTests(process, scenarioTestData) } } @@ -812,7 +805,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor } -object FlinkTestMainSpec { +object FlinkProcessTestRunnerSpec { private val fragmentWithValidationName = "fragmentWithValidation" private val processWithFragmentParameterValidation: CanonicalProcess = { diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/SimpleProcessConfigCreator.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/SimpleProcessConfigCreator.scala new file mode 100644 index 00000000000..4e142dcdf25 --- /dev/null +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/SimpleProcessConfigCreator.scala @@ -0,0 +1,59 @@ +package pl.touk.nussknacker.engine.management.testsmechanism + +import pl.touk.nussknacker.engine.api.process._ +import pl.touk.nussknacker.engine.api.{CustomStreamTransformer, Service} +import pl.touk.nussknacker.engine.management.testsmechanism.SimpleProcessConfigCreator.{ + sinkForIntsResultsHolder, + valueMonitorResultsHolder +} +import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ +import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder + +import java.net.ConnectException + +class SimpleProcessConfigCreator extends EmptyProcessConfigCreator { + + override def services(modelDependencies: ProcessObjectDependencies): Map[String, WithCategories[Service]] = + Map( + "logService" -> WithCategories(LogService, "c1"), + "throwingService" -> WithCategories(new ThrowingService(new RuntimeException("Thrown as expected")), "c1"), + "throwingTransientService" -> WithCategories(new ThrowingService(new ConnectException()), "c1"), + "returningDependentTypeService" -> WithCategories(ReturningDependentTypeService, "c1"), + "collectingEager" -> WithCategories(CollectingEagerService, "c1"), + "returningComponentUseCaseService" -> WithCategories(ReturningComponentUseCaseService, "c1") + ) + + override def sinkFactories( + modelDependencies: ProcessObjectDependencies + ): Map[String, WithCategories[SinkFactory]] = Map( + "monitor" -> WithCategories(SinkFactory.noParam(MonitorEmptySink), "c2"), + "valueMonitor" -> WithCategories(SinkForAny(valueMonitorResultsHolder), "c2"), + "sinkForInts" -> WithCategories.anyCategory(SinkForInts(sinkForIntsResultsHolder)) + ) + + override def customStreamTransformers( + modelDependencies: ProcessObjectDependencies + ): Map[String, WithCategories[CustomStreamTransformer]] = Map( + "stateCustom" -> WithCategories.anyCategory(StateCustomNode), + "transformWithTime" -> WithCategories.anyCategory(TransformerWithTime), + "joinBranchExpression" -> WithCategories.anyCategory(CustomJoinUsingBranchExpressions), + "transformerAddingComponentUseCase" -> WithCategories.anyCategory(TransformerAddingComponentUseCase) + ) + + override def sourceFactories( + modelDependencies: ProcessObjectDependencies + ): Map[String, WithCategories[SourceFactory]] = Map( + "input" -> WithCategories(simpleRecordSource(Nil), "cat2"), + "jsonInput" -> WithCategories(jsonSource, "cat2"), + "typedJsonInput" -> WithCategories(TypedJsonSource, "cat2"), + "genericSourceWithCustomVariables" -> WithCategories.anyCategory(GenericSourceWithCustomVariables) + ) + +} + +object SimpleProcessConfigCreator extends Serializable { + + val valueMonitorResultsHolder = new TestResultsHolder[AnyRef] + val sinkForIntsResultsHolder = new TestResultsHolder[java.lang.Integer] + +} diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/TestFromFileSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/testsmechanism/kafka/TestFromFileSpec.scala similarity index 92% rename from engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/TestFromFileSpec.scala rename to engine/flink/tests/src/test/scala/pl/touk/nussknacker/testsmechanism/kafka/TestFromFileSpec.scala index 19ffbbfacc7..1eeed0a4a3c 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/TestFromFileSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/testsmechanism/kafka/TestFromFileSpec.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.kafka.source.flink +package pl.touk.nussknacker.testsmechanism.kafka import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory.fromAnyRef @@ -18,8 +18,9 @@ import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleValueParamName import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName import pl.touk.nussknacker.engine.kafka.source.InputMeta +import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator.ResultsHolders -import pl.touk.nussknacker.engine.process.runner.FlinkTestMain +import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults @@ -28,7 +29,6 @@ import pl.touk.nussknacker.engine.util.json.ToJsonEncoder import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties} -import java.net.URL import java.util.Collections class TestFromFileSpec @@ -133,12 +133,9 @@ class TestFromFileSpec private def run(process: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = { ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - FlinkTestMain.run( - modelData, - process, - scenarioTestData, - FlinkTestConfiguration.setupMemory(new Configuration), - ) + // TODO: reuse this instance between all test cases + new FlinkProcessTestRunner(modelData, parallelism = 1, FlinkTestConfiguration.setupMemory(new Configuration)) + .runTests(process, scenarioTestData) } } diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TestWithTestDataSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/testsmechanism/schemedkafka/TestWithTestDataSpec.scala similarity index 91% rename from engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TestWithTestDataSpec.scala rename to engine/flink/tests/src/test/scala/pl/touk/nussknacker/testsmechanism/schemedkafka/TestWithTestDataSpec.scala index 0e9f4f160cb..be6af1b5bb5 100644 --- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TestWithTestDataSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/testsmechanism/schemedkafka/TestWithTestDataSpec.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.schemedkafka.source.flink +package pl.touk.nussknacker.testsmechanism.schemedkafka import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory.fromAnyRef @@ -8,20 +8,21 @@ import io.circe.Json._ import org.apache.avro.Schema import org.apache.flink.configuration.Configuration import org.apache.kafka.common.record.TimestampType -import org.scalatest.{LoneElement, OptionValues} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import org.scalatest.{LoneElement, OptionValues} import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleValueParamName +import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName import pl.touk.nussknacker.engine.kafka.source.InputMeta +import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder -import pl.touk.nussknacker.engine.process.runner.FlinkTestMain import pl.touk.nussknacker.engine.schemedkafka.KafkaAvroIntegrationMockSchemaRegistry.schemaRegistryMockClient import pl.touk.nussknacker.engine.schemedkafka.KafkaAvroTestProcessConfigCreator import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer.{ @@ -32,7 +33,6 @@ import pl.touk.nussknacker.engine.schemedkafka.schema.{Address, Company} import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.MockSchemaRegistryClientFactory import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClientFactory, SchemaVersionOption} -import pl.touk.nussknacker.engine.schemedkafka.source.flink.TestWithTestDataSpec.sinkForInputMetaResultsHolder import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.TestProcess._ @@ -40,8 +40,8 @@ import pl.touk.nussknacker.engine.util.ThreadUtils import pl.touk.nussknacker.engine.util.json.ToJsonEncoder import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties} +import pl.touk.nussknacker.testsmechanism.schemedkafka.TestWithTestDataSpec.sinkForInputMetaResultsHolder -import java.net.URL import java.util.Collections class TestWithTestDataSpec @@ -202,17 +202,15 @@ class TestWithTestDataSpec private def run(process: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = { ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - FlinkTestMain.run( - LocalModelData( - config, - List.empty, - configCreator = creator, - modelClassLoader = new ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround) - ), - process, - scenarioTestData, - FlinkTestConfiguration.setupMemory(new Configuration), + val modelData = LocalModelData( + config, + List.empty, + configCreator = creator, + modelClassLoader = new ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround) ) + // TODO: reuse this instance between all test cases + new FlinkProcessTestRunner(modelData, parallelism = 1, FlinkTestConfiguration.setupMemory(new Configuration)) + .runTests(process, scenarioTestData) } }