diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala index be94c8dca9d..076a0e60380 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala @@ -89,7 +89,7 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode case command: DMRunOffScheduleCommand => runOffSchedule(command) case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult("")) case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) => - flinkTestRunner.runTestsAsync( + flinkTestRunner.runTests( canonicalProcess, scenarioTestData ) // it's just for streaming e2e tests from file purposes diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index 9657b8bc5d5..c429eb4b5a1 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -115,7 +115,7 @@ object MockableDeploymentManagerProvider { .get() .get(processVersion.processName.value) .map(Future.successful) - .orElse(testRunnerOpt.map(_.runTestsAsync(scenario, testData))) + .orElse(testRunnerOpt.map(_.runTests(scenario, testData))) .getOrElse( throw new IllegalArgumentException( s"Tests results not mocked for scenario [${processVersion.processName.value}] and no model data provided" diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala index 93b472efda0..0473bd6c0cc 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala @@ -13,18 +13,18 @@ object MiniClusterFactory extends LazyLogging { modelClassLoader: ModelClassLoader, miniClusterConfig: Configuration, streamExecutionConfig: Configuration - ): (MiniCluster, () => StreamExecutionEnvironment) = { + ): (MiniCluster, Boolean => StreamExecutionEnvironment) = { logger.debug(s"Creating MiniCluster with configuration: $miniClusterConfig") val miniCluster = createMiniCluster(miniClusterConfig) - logger.debug( - s"Creating local StreamExecutionEnvironment with configuration = $streamExecutionConfig" - ) - def createStreamExecutionEnv(): StreamExecutionEnvironment = + def createStreamExecutionEnv(attached: Boolean): StreamExecutionEnvironment = { MiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment( miniCluster, modelClassLoader, - streamExecutionConfig + streamExecutionConfig, + attached ) + } + (miniCluster, createStreamExecutionEnv) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterStreamExecutionEnvironmentFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterStreamExecutionEnvironmentFactory.scala index 1fdc4ea8040..f16ed40159d 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterStreamExecutionEnvironmentFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterStreamExecutionEnvironmentFactory.scala @@ -20,13 +20,13 @@ object MiniClusterStreamExecutionEnvironmentFactory { def createStreamExecutionEnvironment( miniCluster: MiniCluster, modelClassLoader: ModelClassLoader, - configuration: Configuration + configuration: Configuration, + attached: Boolean ): StreamExecutionEnvironment = { val pipelineExecutorServiceLoader = createPipelineExecutorServiceLoader(miniCluster, modelClassLoader) configuration.set(DeploymentOptions.TARGET, pipelineExecutorName) configuration.set(PipelineOptions.CLASSPATHS, modelClassLoader.urls.map(_.toString).asJava) - // FIXME abr: move to FlinkTestMain? - configuration.set[java.lang.Boolean](DeploymentOptions.ATTACHED, true) + configuration.set[java.lang.Boolean](DeploymentOptions.ATTACHED, attached) new StreamExecutionEnvironment(pipelineExecutorServiceLoader, configuration, modelClassLoader) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala index d5bcfc663e5..855aad0085c 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala @@ -10,15 +10,41 @@ import pl.touk.nussknacker.engine.process.minicluster.MiniClusterFactory import pl.touk.nussknacker.engine.util.MetaDataExtractor import pl.touk.nussknacker.engine.util.loader.ModelClassLoader +import scala.concurrent.{ExecutionContext, Future} + // This class handles a legacy ad-hoc way to create minicluster. // TODO: After we fully switch to shared mini cluster approach, it should be removed class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCaseForDebug: String) extends LazyLogging { + def handleAdHocMniClusterFallbackAsync[R]( + sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots], + scenario: CanonicalProcess + )(f: StreamExecutionEnvironmentWithTotalSlots => Future[R])(implicit ec: ExecutionContext): Future[R] = { + val (allocatedMiniClusterResourcesOpt, streamEnvWithMaxParallelism) = + useSharedMiniClusterOrAdHoc(sharedMiniClusterServicesOpt, scenario) + val resultFuture = f(streamEnvWithMaxParallelism) + resultFuture.onComplete(_ => allocatedMiniClusterResourcesOpt.foreach(_.close())) + resultFuture + } + def handleAdHocMniClusterFallback[R]( sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots], scenario: CanonicalProcess )(f: StreamExecutionEnvironmentWithTotalSlots => R): R = { - val (allocatedMiniClusterResourcesOpt, streamEnvWithMaxParallelism) = sharedMiniClusterServicesOpt + val (allocatedMiniClusterResourcesOpt, streamEnvWithMaxParallelism) = + useSharedMiniClusterOrAdHoc(sharedMiniClusterServicesOpt, scenario) + try { + f(streamEnvWithMaxParallelism) + } finally { + allocatedMiniClusterResourcesOpt.foreach(_.close()) + } + } + + private def useSharedMiniClusterOrAdHoc[R]( + sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots], + scenario: CanonicalProcess + ): (Option[AdHocMiniClusterResources], StreamExecutionEnvironmentWithTotalSlots) = { + sharedMiniClusterServicesOpt .map { sharedMiniClusterServices => logger.debug(s"Shared MiniCluster used for $useCaseForDebug") (None, sharedMiniClusterServices) @@ -28,11 +54,6 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas val resources = createAdHocMiniClusterResources(scenario) (Some(resources), StreamExecutionEnvironmentWithTotalSlots.withoutMaxParallelism(resources.env)) } - try { - f(streamEnvWithMaxParallelism) - } finally { - allocatedMiniClusterResourcesOpt.foreach(_.close()) - } } private def createAdHocMiniClusterResources(process: CanonicalProcess) = { @@ -50,7 +71,7 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas legacyMiniClusterConfig, new Configuration() ) - AdHocMiniClusterResources(miniCluster, createEnv()) + AdHocMiniClusterResources(miniCluster, createEnv(true)) } private case class AdHocMiniClusterResources(miniCluster: MiniCluster, env: StreamExecutionEnvironment) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala index 64002be8b4e..b4d73f917c5 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala @@ -17,6 +17,9 @@ import pl.touk.nussknacker.engine.testmode.{ TestServiceInvocationCollector } +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.{Future, blocking} + object FlinkTestMain { // This method is invoked via reflection from module (Flink DM) without shared API classes, so simple types should be used @@ -25,7 +28,7 @@ object FlinkTestMain { modelData: ModelData, scenario: CanonicalProcess, scenarioTestData: ScenarioTestData - ): TestResults[Json] = { + ): Future[TestResults[Json]] = { new FlinkTestMain( sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithTotalSlots.apply _ tupled), modelData @@ -43,35 +46,40 @@ class FlinkTestMain( private val adHocMiniClusterFallbackHandler = new AdHocMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario testing") - def testScenario(scenario: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = { + def testScenario(scenario: CanonicalProcess, scenarioTestData: ScenarioTestData): Future[TestResults[Json]] = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener - try { - adHocMiniClusterFallbackHandler.handleAdHocMniClusterFallback( - sharedMiniClusterServicesOpt, - scenario, - ) { streamExecutionEnvWithMaxParallelism => - import streamExecutionEnvWithMaxParallelism._ - val alignedScenario = streamExecutionEnvWithMaxParallelism.alignParallelismIfNeeded(scenario) - val resultCollector = new TestServiceInvocationCollector(collectingListener) - // ProcessVersion can't be passed from DM because testing mechanism can be used with not saved scenario - val processVersion = ProcessVersion.empty.copy(processName = alignedScenario.name) - val deploymentData = DeploymentData.empty.copy(additionalModelConfigs = - AdditionalModelConfigs(modelData.additionalConfigsFromProvider) - ) - val registrar = prepareRegistrar(collectingListener, alignedScenario, scenarioTestData, processVersion) - streamExecutionEnv.getCheckpointConfig.disableCheckpointing() - registrar.register( - streamExecutionEnv, - alignedScenario, - processVersion, - deploymentData, - resultCollector - ) - streamExecutionEnv.execute(alignedScenario.name.value) - collectingListener.results + adHocMiniClusterFallbackHandler.handleAdHocMniClusterFallbackAsync( + sharedMiniClusterServicesOpt, + scenario, + ) { streamExecutionEnvWithMaxParallelism => + import streamExecutionEnvWithMaxParallelism._ + val alignedScenario = streamExecutionEnvWithMaxParallelism.alignParallelismIfNeeded(scenario) + val resultCollector = new TestServiceInvocationCollector(collectingListener) + // ProcessVersion can't be passed from DM because testing mechanism can be used with not saved scenario + val processVersion = ProcessVersion.empty.copy(processName = alignedScenario.name) + val deploymentData = DeploymentData.empty.copy(additionalModelConfigs = + AdditionalModelConfigs(modelData.additionalConfigsFromProvider) + ) + val registrar = prepareRegistrar(collectingListener, alignedScenario, scenarioTestData, processVersion) + streamExecutionEnv.getCheckpointConfig.disableCheckpointing() + registrar.register( + streamExecutionEnv, + alignedScenario, + processVersion, + deploymentData, + resultCollector + ) + // TODO: Non-blocking future periodically checking if job is finished + val resultFuture = Future { + blocking { + streamExecutionEnv.execute(alignedScenario.name.value) + collectingListener.results + } + } + resultFuture.onComplete { _ => + collectingListener.clean() } - } finally { - collectingListener.clean() + resultFuture } } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index 419ad56217a..1355cc41997 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -132,7 +132,7 @@ abstract class FlinkDeploymentManager( makeSavepoint(_, savepointDir) } case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) => - testRunner.runTestsAsync(canonicalProcess, scenarioTestData) + testRunner.runTests(canonicalProcess, scenarioTestData) case _: DMRunOffScheduleCommand => notImplemented } @@ -241,7 +241,7 @@ abstract class FlinkDeploymentManager( processVersion: ProcessVersion ): Future[Unit] = if (shouldVerifyBeforeDeploy) - verification.verify(processVersion, canonicalProcess, savepointPath) + Future.fromTry(verification.verify(processVersion, canonicalProcess, savepointPath)) else Future.successful(()) private def stopSavingSavepoint( diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/minicluster/MiniClusterFactoryReflectiveInvoker.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/minicluster/MiniClusterFactoryReflectiveInvoker.scala index c69ea70470a..9e96cb35bf8 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/minicluster/MiniClusterFactoryReflectiveInvoker.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/minicluster/MiniClusterFactoryReflectiveInvoker.scala @@ -28,7 +28,7 @@ object MiniClusterFactoryReflectiveInvoker { miniClusterConfig: Configuration, streamExecutionEnvConfig: Configuration ): MiniClusterWithServices = { - val methodInvoker = new ReflectiveMethodInvoker[(MiniCluster, () => StreamExecutionEnvironment)]( + val methodInvoker = new ReflectiveMethodInvoker[(MiniCluster, Boolean => StreamExecutionEnvironment)]( modelClassLoader, "pl.touk.nussknacker.engine.process.minicluster.MiniClusterFactory", "createMiniClusterWithStreamExecutionEnvironmentFactory" @@ -38,15 +38,18 @@ object MiniClusterFactoryReflectiveInvoker { val totalSlots = miniClusterConfig.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS) * miniClusterConfig.get(TaskManagerOptions.NUM_TASK_SLOTS) - MiniClusterWithServices(miniCluster, totalSlots, streamExecutionEnvironmentFactory) + new MiniClusterWithServices(miniCluster, totalSlots, streamExecutionEnvironmentFactory) } - case class MiniClusterWithServices( + class MiniClusterWithServices( miniCluster: MiniCluster, - totalSlots: Int, - streamExecutionEnvironmentFactory: () => StreamExecutionEnvironment + val totalSlots: Int, + streamExecutionEnvironmentFactory: Boolean => StreamExecutionEnvironment ) extends AutoCloseable { + def createStreamExecutionEnvironment(attached: Boolean): StreamExecutionEnvironment = + streamExecutionEnvironmentFactory(attached) + override def close(): Unit = miniCluster.close() } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala index d736dfe86a6..c9e83ede196 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala @@ -19,36 +19,29 @@ class FlinkProcessTestRunner( // We use reflection, because we don't want to bundle flinkExecutor.jar inside flinkDeploymentManager assembly jar // because it is already in separate assembly for purpose of sending it to Flink during deployment. // Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded - private val mainRunner = new ReflectiveMethodInvoker[TestResults[Json]]( + private val mainRunner = new ReflectiveMethodInvoker[Future[TestResults[Json]]]( modelData.modelClassLoader, "pl.touk.nussknacker.engine.process.scenariotesting.FlinkTestMain", "run" ) - def runTestsAsync(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)( + // NU-1455: We encode variable on the engine, because of classLoader's problems + def runTests(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)( implicit ec: ExecutionContext ): Future[TestResults[Json]] = { - // FIXME: make only execution asynchronous - Future { - runTests(canonicalProcess, scenarioTestData) - } - } - - // NU-1455: We encode variable on the engine, because of classLoader's problems - def runTests(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = { val streamExecutionEnvWithTotalSlots = miniClusterWithServicesOpt.map(miniClusterWithServices => - (miniClusterWithServices.streamExecutionEnvironmentFactory(), miniClusterWithServices.totalSlots) + (miniClusterWithServices.createStreamExecutionEnvironment(attached = true), miniClusterWithServices.totalSlots) + ) + val resultFuture = mainRunner.invokeStaticMethod( + streamExecutionEnvWithTotalSlots, + modelData, + canonicalProcess, + scenarioTestData ) - try { - mainRunner.invokeStaticMethod( - streamExecutionEnvWithTotalSlots, - modelData, - canonicalProcess, - scenarioTestData - ) - } finally { + resultFuture.onComplete { _ => streamExecutionEnvWithTotalSlots.foreach(_._1.close()) } + resultFuture } } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala index 94b1f627197..e9812dbc5d3 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala @@ -7,8 +7,8 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.management.minicluster.MiniClusterFactoryReflectiveInvoker.MiniClusterWithServices import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker -import scala.concurrent.Future import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} class FlinkProcessVerifier( modelData: ModelData, @@ -28,10 +28,10 @@ class FlinkProcessVerifier( processVersion: ProcessVersion, canonicalProcess: CanonicalProcess, savepointPath: String - ): Future[Unit] = { + ): Try[Unit] = { val processId = processVersion.processName val streamExecutionEnvWithTotalSlots = miniClusterWithServicesOpt.map(miniClusterWithServices => - (miniClusterWithServices.streamExecutionEnvironmentFactory(), miniClusterWithServices.totalSlots) + (miniClusterWithServices.createStreamExecutionEnvironment(attached = true), miniClusterWithServices.totalSlots) ) try { logger.info(s"Starting to verify $processId") @@ -43,11 +43,11 @@ class FlinkProcessVerifier( savepointPath ) logger.info(s"Verification of $processId successful") - Future.successful(()) + Success(()) } catch { case NonFatal(e) => logger.info(s"Failed to verify $processId", e) - Future.failed( + Failure( new IllegalArgumentException( "State is incompatible, please stop scenario and start again with clean state", e diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala index 8f144e98b70..563f6e99c6a 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala @@ -1,10 +1,10 @@ package pl.touk.nussknacker.engine.management.scenariotesting -import cats.effect.unsafe.implicits.global +import cats.effect.unsafe.IORuntime import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import io.circe.Json import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.client.JobExecutionException +import org.apache.flink.runtime.JobException import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Inside, OptionValues} @@ -44,8 +44,10 @@ import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ModelClassLoader} import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} +import pl.touk.nussknacker.test.PatientScalaFutures import java.util.{Date, UUID} +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ class FlinkProcessTestRunnerSpec @@ -54,7 +56,8 @@ class FlinkProcessTestRunnerSpec with Inside with BeforeAndAfterEach with BeforeAndAfterAll - with OptionValues { + with OptionValues + with PatientScalaFutures { import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap @@ -67,7 +70,7 @@ class FlinkProcessTestRunnerSpec DeploymentManagersClassLoader .create(List.empty) .allocated - .unsafeRunSync() + .unsafeRunSync()(IORuntime.global) private val modelClassLoader = ModelClassLoader( FlinkTestConfiguration.classpathWorkaround, @@ -90,7 +93,7 @@ class FlinkProcessTestRunnerSpec override protected def afterAll(): Unit = { super.afterAll() miniClusterWithServices.close() - releaseDeploymentManagersClassLoaderResources.unsafeRunSync() + releaseDeploymentManagersClassLoaderResources.unsafeRunSync()(IORuntime.global) } "A scenario run on Flink engine" when { @@ -117,15 +120,17 @@ class FlinkProcessTestRunnerSpec val input = SimpleRecord("0", 1, "2", new Date(3), Some(4), 5, "6") val input2 = SimpleRecord("0", 11, "2", new Date(3), Some(4), 5, "6") - val results = prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData( - List( - ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")), - ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6")) + val results = prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData( + List( + ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")), + ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6")) + ) ) - ) - ) + )(ExecutionContext.global) + .futureValue val nodeResults = results.nodeResults @@ -179,14 +184,16 @@ class FlinkProcessTestRunnerSpec val testRunner = prepareTestRunner(useIOMonadInInterpreter) def runTestAndVerify() = { - val results = testRunner.runTests( - process, - ScenarioTestData( - List( - ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6")) + val results = testRunner + .runTests( + process, + ScenarioTestData( + List( + ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6")) + ) ) - ) - ) + )(ExecutionContext.global) + .futureValue val nodeResults = results.nodeResults @@ -214,10 +221,12 @@ class FlinkProcessTestRunnerSpec .source(sourceNodeId, "input") .split("splitId1", GraphBuilder.emptySink("out1", "monitor"), GraphBuilder.emptySink("out2", "monitor")) - val results = prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))), - ) + val results = prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))), + )(ExecutionContext.global) + .futureValue results.nodeResults("splitId1") shouldBe List( nodeResult( @@ -247,10 +256,12 @@ class FlinkProcessTestRunnerSpec val aggregate = SimpleRecordWithPreviousValue(input, 0, "s") val aggregate2 = SimpleRecordWithPreviousValue(input2, 1, "s") - val results = prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))), - ) + val results = prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))), + )(ExecutionContext.global) + .futureValue val nodeResults = results.nodeResults @@ -300,10 +311,12 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "monitor") val results = - prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData(createTestRecord() :: List.fill(4)(createTestRecord(value1 = 11))), - ) + prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData(createTestRecord() :: List.fill(4)(createTestRecord(value1 = 11))), + )(ExecutionContext.global) + .futureValue val nodeResults = results.nodeResults @@ -319,17 +332,19 @@ class FlinkProcessTestRunnerSpec .filter("filter", "1 / #input.value1 >= 0".spel) .emptySink("out", "monitor") - val results = prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData( - List( - createTestRecord(id = "0", value1 = 1), - createTestRecord(id = "1", value1 = 0), - createTestRecord(id = "2", value1 = 2), - createTestRecord(id = "3", value1 = 4) - ) - ), - ) + val results = prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData( + List( + createTestRecord(id = "0", value1 = 1), + createTestRecord(id = "1", value1 = 0), + createTestRecord(id = "2", value1 = 2), + createTestRecord(id = "3", value1 = 4) + ) + ), + )(ExecutionContext.global) + .futureValue val nodeResults = results.nodeResults @@ -388,7 +403,8 @@ class FlinkProcessTestRunnerSpec createTestRecord(id = "3", value1 = 4) ) ) - ) + )(ExecutionContext.global) + .futureValue val nodeResults = results.nodeResults @@ -407,12 +423,15 @@ class FlinkProcessTestRunnerSpec .processor("failing", "throwingTransientService", "throw" -> "#input.value1 == 2".spel) .emptySink("out", "monitor") - intercept[JobExecutionException] { - prepareTestRunner(useIOMonadInInterpreter).runTests( + val runner = prepareTestRunner(useIOMonadInInterpreter) + runner + .runTests( process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))) - ) - } + )(ExecutionContext.global) + .failed + .futureValue + .getCause shouldBe a[JobException] } "handle json input" in { @@ -438,7 +457,8 @@ class FlinkProcessTestRunnerSpec ) ) - val results = prepareTestRunner(useIOMonadInInterpreter).runTests(process, testData) + val results = + prepareTestRunner(useIOMonadInInterpreter).runTests(process, testData)(ExecutionContext.global).futureValue results.nodeResults(sourceNodeId) should have size 3 results.externalInvocationResults("out") shouldBe @@ -468,7 +488,8 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "#additionalOne + '|' + #additionalTwo".spel) val testData = ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("abc")))) - val results = prepareTestRunner(useIOMonadInInterpreter).runTests(process, testData) + val results = + prepareTestRunner(useIOMonadInInterpreter).runTests(process, testData)(ExecutionContext.global).futureValue results.nodeResults(sourceNodeId) should have size 1 results.externalInvocationResults("out") shouldBe @@ -489,10 +510,12 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "sinkForInts", "Value" -> "15 / {0, 1}[0]".spel) val results = - prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))) - ) + prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))) + )(ExecutionContext.global) + .futureValue results.exceptions should have length 1 results.exceptions.head.nodeId shouldBe Some("out") @@ -510,18 +533,20 @@ class FlinkProcessTestRunnerSpec def recordWithSeconds(duration: FiniteDuration) = ScenarioTestJsonRecord(sourceNodeId, Json.fromString(s"0|0|0|${duration.toMillis}|0|0|0")) - val results = prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData( - List( - recordWithSeconds(1 second), - recordWithSeconds(2 second), - recordWithSeconds(5 second), - recordWithSeconds(9 second), - recordWithSeconds(20 second) + val results = prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData( + List( + recordWithSeconds(1 second), + recordWithSeconds(2 second), + recordWithSeconds(5 second), + recordWithSeconds(9 second), + recordWithSeconds(20 second) + ) ) - ) - ) + )(ExecutionContext.global) + .futureValue val nodeResults = results.nodeResults @@ -540,15 +565,17 @@ class FlinkProcessTestRunnerSpec ) .emptySink("out", "valueMonitor", "Value" -> "#input.field1 + #input.field2".spel) - val results = prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData( - ScenarioTestJsonRecord( - sourceNodeId, - Json.obj("field1" -> Json.fromString("abc"), "field2" -> Json.fromString("def")) - ) :: Nil - ) - ) + val results = prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData( + ScenarioTestJsonRecord( + sourceNodeId, + Json.obj("field1" -> Json.fromString("abc"), "field2" -> Json.fromString("def")) + ) :: Nil + ) + )(ExecutionContext.global) + .futureValue results.invocationResults("out").map(_.value) shouldBe List(variable("abcdef")) } @@ -571,10 +598,12 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "#parsed.size + ' ' + #parsed[0].field2".spel) val results = - prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData(List(createTestRecord(value1 = valueToReturn))) - ) + prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData(List(createTestRecord(value1 = valueToReturn))) + )(ExecutionContext.global) + .futureValue results.invocationResults("out").map(_.value) shouldBe List(variable(s"$countToPass $valueToReturn")) } @@ -598,7 +627,9 @@ class FlinkProcessTestRunnerSpec val recordFalse = createTestRecord(id = "bela") val results = - prepareTestRunner(useIOMonadInInterpreter).runTests(process, ScenarioTestData(List(recordTrue, recordFalse))) + prepareTestRunner(useIOMonadInInterpreter) + .runTests(process, ScenarioTestData(List(recordTrue, recordFalse)))(ExecutionContext.global) + .futureValue val invocationResults = results.invocationResults @@ -637,7 +668,9 @@ class FlinkProcessTestRunnerSpec val recC = createTestRecord(id = "c") val results = - prepareTestRunner(useIOMonadInInterpreter).runTests(process, ScenarioTestData(List(recA, recB, recC))) + prepareTestRunner(useIOMonadInInterpreter) + .runTests(process, ScenarioTestData(List(recA, recB, recC)))(ExecutionContext.global) + .futureValue results.invocationResults("proc2").map(_.contextId) should contain only ( s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-1-end1", @@ -694,7 +727,9 @@ class FlinkProcessTestRunnerSpec val recordC = recordA.copy(id = "c") val recordD = recordA.copy(id = "d") - val results = prepareTestRunner(useIOMonadInInterpreter).runTests(process, scenarioTestData) + val results = prepareTestRunner(useIOMonadInInterpreter) + .runTests(process, scenarioTestData)(ExecutionContext.global) + .futureValue val nodeResults = results.nodeResults nodeResults("source1") shouldBe List( @@ -741,10 +776,12 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "{#componentUseCaseService, #componentUseCaseCustomNode}".spel) val results = - prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData(List(createTestRecord(sourceId = "start"))) - ) + prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData(List(createTestRecord(sourceId = "start"))) + )(ExecutionContext.global) + .futureValue results.invocationResults("out").map(_.value) shouldBe List( variable(List(ComponentUseCase.TestRuntime, ComponentUseCase.TestRuntime)) @@ -765,10 +802,12 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) val dictEditorException = intercept[IllegalArgumentException] { - prepareTestRunner(useIOMonadInInterpreter).runTests( - process, - ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))) - ) + prepareTestRunner(useIOMonadInInterpreter) + .runTests( + process, + ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))) + )(ExecutionContext.global) + .futureValue } dictEditorException.getMessage.startsWith( "Compilation errors: IncompatibleParameterDefinitionModification(ParameterName(static),dictKeyWithLabel,Some(DualParameterEditor(StringParameterEditor,RAW))" @@ -805,7 +844,8 @@ class FlinkProcessTestRunnerSpec ) ) ) - ).runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2)))) + ).runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))))(ExecutionContext.global) + .futureValue results.exceptions should have length 0 } @@ -818,10 +858,12 @@ class FlinkProcessTestRunnerSpec val resolved = FragmentResolver(List(processWithFragmentParameterValidation)).resolve(scenario) - val results = prepareTestRunner(useIOMonadInInterpreter).runTests( - resolved.valueOr { _ => throw new IllegalArgumentException("Won't happen") }, - ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")))), - ) + val results = prepareTestRunner(useIOMonadInInterpreter) + .runTests( + resolved.valueOr { _ => throw new IllegalArgumentException("Won't happen") }, + ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")))), + )(ExecutionContext.global) + .futureValue results.exceptions.length shouldBe 0 } } diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala index 4e4b9201699..a7bc7873e08 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.scenariotesting.kafka +import cats.effect.unsafe.IORuntime import cats.effect.unsafe.implicits.global import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory.fromAnyRef @@ -27,9 +28,10 @@ import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.util.json.ToJsonEncoder import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ModelClassLoader} -import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties} +import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties, PatientScalaFutures} import java.util.Collections +import scala.concurrent.ExecutionContext class KafkaScenarioTestingSpec extends AnyFunSuite @@ -37,7 +39,8 @@ class KafkaScenarioTestingSpec with LazyLogging with EitherValuesDetailedMessage with OptionValues - with BeforeAndAfterAll { + with BeforeAndAfterAll + with PatientScalaFutures { private val config = ConfigFactory .empty() @@ -52,7 +55,7 @@ class KafkaScenarioTestingSpec DeploymentManagersClassLoader .create(List.empty) .allocated - .unsafeRunSync() + .unsafeRunSync()(IORuntime.global) private val modelData: ModelData = LocalModelData( @@ -120,7 +123,11 @@ class KafkaScenarioTestingSpec _.add("value", obj("id" -> fromString("fooId"), "field" -> fromString("fooField"))) ) - val results = testRunner.runTests(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil)) + val results = testRunner + .runTests(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil))( + ExecutionContext.global + ) + .futureValue val testResultVars = results.nodeResults("end").head.variables testResultVars("extractedTimestamp").hcursor.downField("pretty").as[Long].rightValue shouldBe expectedTimestamp @@ -149,7 +156,11 @@ class KafkaScenarioTestingSpec .add("value", obj("id" -> fromString("1234"), "field" -> fromString("abcd"))) ) - val results = testRunner.runTests(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil)) + val results = testRunner + .runTests(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil))( + ExecutionContext.global + ) + .futureValue results.nodeResults shouldBe Symbol("nonEmpty") } diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala index e0ea1a5f612..e7d70299b08 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.scenariotesting.schemedkafka +import cats.effect.unsafe.IORuntime import cats.effect.unsafe.implicits.global import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory.fromAnyRef @@ -40,9 +41,10 @@ import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.json.ToJsonEncoder import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ModelClassLoader} import pl.touk.nussknacker.scenariotesting.schemedkafka.SchemedKafkaScenarioTestingSpec.sinkForInputMetaResultsHolder -import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties} +import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties, PatientScalaFutures} import java.util.Collections +import scala.concurrent.ExecutionContext class SchemedKafkaScenarioTestingSpec extends AnyFunSuite @@ -51,7 +53,8 @@ class SchemedKafkaScenarioTestingSpec with EitherValuesDetailedMessage with OptionValues with LoneElement - with BeforeAndAfterAll { + with BeforeAndAfterAll + with PatientScalaFutures { private val creator: KafkaAvroTestProcessConfigCreator = new KafkaAvroTestProcessConfigCreator(sinkForInputMetaResultsHolder) { @@ -73,7 +76,7 @@ class SchemedKafkaScenarioTestingSpec DeploymentManagersClassLoader .create(List.empty) .allocated - .unsafeRunSync() + .unsafeRunSync()(IORuntime.global) private val modelData = LocalModelData( @@ -145,7 +148,7 @@ class SchemedKafkaScenarioTestingSpec val testRecordJson = obj("keySchemaId" -> Null, "valueSchemaId" -> fromInt(id), "consumerRecord" -> consumerRecord) val scenarioTestData = ScenarioTestData(ScenarioTestJsonRecord("start", testRecordJson) :: Nil) - val results = testRunner.runTests(process, scenarioTestData) + val results = testRunner.runTests(process, scenarioTestData)(ExecutionContext.global).futureValue val testResultVars = results.nodeResults("end").head.variables testResultVars("extractedTimestamp").hcursor.downField("pretty").as[Long].rightValue shouldBe expectedTimestamp @@ -177,7 +180,7 @@ class SchemedKafkaScenarioTestingSpec ) val scenarioTestData = ScenarioTestData("start", parameterExpressions) - val results = testRunner.runTests(process, scenarioTestData) + val results = testRunner.runTests(process, scenarioTestData)(ExecutionContext.global).futureValue results .invocationResults("end") .head @@ -199,7 +202,7 @@ class SchemedKafkaScenarioTestingSpec ParameterName("in") -> Expression.spel("'some-text-id'") ) val scenarioTestData = ScenarioTestData("fragment1", parameterExpressions) - val results = testRunner.runTests(fragment, scenarioTestData) + val results = testRunner.runTests(fragment, scenarioTestData)(ExecutionContext.global).futureValue results.nodeResults("fragment1").loneElement shouldBe ResultContext( "fragment1-fragment1-0-0",