diff --git a/build.sbt b/build.sbt index 48ce2637864..209d408d782 100644 --- a/build.sbt +++ b/build.sbt @@ -618,13 +618,12 @@ lazy val flinkDeploymentManager = (project in flink("management")) dependencyOverrides += "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV ) .dependsOn( - flinkMiniCluster, deploymentManagerApi % Provided, + flinkMiniCluster, + commonUtils % Provided, utilsInternal % Provided, httpUtils % Provided, - flinkScalaUtils % Provided, - // test->test dependency is needed to load SimpleProcessConfigCreator - flinkExecutor % "test,test->test", + flinkExecutor % Test, flinkTestUtils % "it,test", kafkaTestUtils % "it,test" ) @@ -752,12 +751,11 @@ lazy val flinkExecutor = (project in flink("executor")) Seq( // Dependencies below are provided by flink-dist jar in production flink or by flink DM for scenario testing/state verification purpose "org.apache.flink" % "flink-streaming-java" % flinkV % Provided, - "org.apache.flink" % "flink-runtime" % flinkV % Provided, "org.apache.flink" % "flink-statebackend-rocksdb" % flinkV % Provided, // This dependency must be provided, because some cloud providers, such as Ververica, already have it on their classpath, which may cause a conflict "org.apache.flink" % "flink-metrics-dropwizard" % flinkV % Provided, ) - } ++ flinkLibScalaDeps(scalaVersion.value, Some(Provided)), + }, prepareItLibs := { val workTarget = (ThisScope / baseDirectory).value / "target" / "it-libs" val artifacts = (ThisScope / additionalBundledArtifacts).value @@ -781,9 +779,8 @@ lazy val flinkExecutor = (project in flink("executor")) .dependsOn( flinkComponentsUtils, flinkExtensionsApi, - // TODO: it can be removed when fully switch to shared mini cluster, see AdHocMiniClusterFallbackHandler - flinkMiniCluster, scenarioCompiler, + utilsInternal, // Various components uses one of library in stack: sttp -> async-http-client -> netty // Different versions of netty which is on the bottom of this stack causes NoClassDefFoundError. // To overcome this problem and reduce size of model jar bundle, we add http utils as a compile time dependency. @@ -1208,12 +1205,20 @@ lazy val flinkMiniCluster = (project in flink("minicluster")) ExclusionRule("org.slf4j", "slf4j-log4j12"), ExclusionRule("com.esotericsoftware", "kryo-shaded"), ), + "org.apache.flink" % "flink-runtime" % flinkV, "org.apache.flink" % "flink-statebackend-rocksdb" % flinkV, "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV, "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV, ) ++ flinkLibScalaDeps(scalaVersion.value) } ) + .dependsOn( + extensionsApi % Provided, + utilsInternal % Provided, + // test->test dependency is needed to load SimpleProcessConfigCreator + flinkExecutor % "test,test->test", + testUtils % Test, + ) lazy val flinkTestUtils = (project in flink("test-utils")) .settings(commonSettings) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala index a33b52d03c6..2cdb3b959ec 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala @@ -15,8 +15,8 @@ import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment._ +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterConfig import pl.touk.nussknacker.engine.management.{FlinkDeploymentManager, FlinkStreamingDeploymentManagerProvider} -import pl.touk.nussknacker.engine.process.minicluster.MiniClusterConfig import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ModelClassLoader} import pl.touk.nussknacker.test.config.ConfigWithScalaVersion import pl.touk.nussknacker.test.utils.domain.TestFactory @@ -85,7 +85,7 @@ class MockDeploymentManager private ( ), shouldVerifyBeforeDeploy = false, mainClassName = "UNUSED", - scenarioTestingConfig = MiniClusterConfig() + scenarioTestingConfig = FlinkMiniClusterConfig() ) { import MockDeploymentManager._ diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala index 0f8ed48b004..0129c079481 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala @@ -4,6 +4,7 @@ import akka.actor.ActorSystem import cats.data.{Validated, ValidatedNel} import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging +import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig @@ -12,9 +13,9 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment._ +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory +import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig -import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunner -import pl.touk.nussknacker.engine.process.minicluster.{MiniClusterConfig, MiniClusterFactory} import java.util.UUID import java.util.concurrent.TimeUnit @@ -42,15 +43,15 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode private val random = new scala.util.Random() private val miniClusterWithServices = - MiniClusterFactory - .createMiniClusterWithServicesIfConfigured( + FlinkMiniClusterFactory + .createMiniClusterWithServices( modelData.modelClassLoader, - MiniClusterConfig() + new Configuration, + new Configuration ) - .get private lazy val flinkTestRunner = - new FlinkProcessTestRunner( + new FlinkMiniClusterScenarioTestRunner( modelData, Some(miniClusterWithServices) ) diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index a1cdc2fed03..3158e3d9fe0 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -4,6 +4,7 @@ import cats.data.Validated.valid import cats.data.ValidatedNel import com.typesafe.config.Config import io.circe.Json +import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig @@ -11,10 +12,10 @@ import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory +import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig -import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunner import pl.touk.nussknacker.engine.newdeployment.DeploymentId -import pl.touk.nussknacker.engine.process.minicluster.{MiniClusterConfig, MiniClusterFactory} import pl.touk.nussknacker.engine.testing.StubbingCommands import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults @@ -58,16 +59,17 @@ object MockableDeploymentManagerProvider { with ManagerSpecificScenarioActivitiesStoredByManager with StubbingCommands { - private lazy val miniClusterWithServicesOpt = modelDataOpt.flatMap { modelData => - MiniClusterFactory.createMiniClusterWithServicesIfConfigured( + private lazy val miniClusterWithServicesOpt = modelDataOpt.map { modelData => + FlinkMiniClusterFactory.createMiniClusterWithServices( modelData.modelClassLoader, - MiniClusterConfig() + new Configuration, + new Configuration ) } private lazy val testRunnerOpt = modelDataOpt.map { modelData => - new FlinkProcessTestRunner( + new FlinkMiniClusterScenarioTestRunner( modelData, miniClusterWithServicesOpt ) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala index 640d70d588b..0c602882e1d 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData} import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar +import pl.touk.nussknacker.engine.process.scenariotesting.legacyadhocminicluster.LegacyAdHocMiniClusterFallbackHandler import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults import pl.touk.nussknacker.engine.testmode.{ @@ -44,7 +45,7 @@ class FlinkTestMain( ) { private val adHocMiniClusterFallbackHandler = - new AdHocMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario testing") + new LegacyAdHocMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario testing") def testScenario(scenario: CanonicalProcess, scenarioTestData: ScenarioTestData): Future[TestResults[Json]] = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala index 102458e89c7..6562d8e193f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala @@ -8,6 +8,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar +import pl.touk.nussknacker.engine.process.scenariotesting.legacyadhocminicluster.LegacyAdHocMiniClusterFallbackHandler import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestServiceInvocationCollector} @@ -23,7 +24,7 @@ object FlinkVerificationMain { new FlinkVerificationMain( sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithParallelismOverride.apply _ tupled), modelData - ).runTest( + ).verifyScenarioState( scenario, processVersion, savepointPath @@ -37,9 +38,9 @@ class FlinkVerificationMain( ) { private val adHocMiniClusterFallbackHandler = - new AdHocMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario state verification") + new LegacyAdHocMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario state verification") - def runTest(scenario: CanonicalProcess, processVersion: ProcessVersion, savepointPath: String): Unit = { + def verifyScenarioState(scenario: CanonicalProcess, processVersion: ProcessVersion, savepointPath: String): Unit = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener try { adHocMiniClusterFallbackHandler.handleAdHocMniClusterFallback( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/legacyadhocminicluster/FlinkMiniClusterFactoryReflectiveInvoker.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/legacyadhocminicluster/FlinkMiniClusterFactoryReflectiveInvoker.scala new file mode 100644 index 00000000000..81b41fd822f --- /dev/null +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/legacyadhocminicluster/FlinkMiniClusterFactoryReflectiveInvoker.scala @@ -0,0 +1,25 @@ +package pl.touk.nussknacker.engine.process.scenariotesting.legacyadhocminicluster + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.MiniCluster +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker + +import java.net.URLClassLoader + +object FlinkMiniClusterFactoryReflectiveInvoker { + + private val factoryInvoker = new ReflectiveMethodInvoker[(MiniCluster, Boolean => StreamExecutionEnvironment)]( + getClass.getClassLoader, + "pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory", + "createMiniClusterWithServicesRaw" + ) + + def createMiniClusterWithServicesRaw( + modelClassLoader: URLClassLoader, + miniClusterConfigOverrides: Configuration, + streamExecutionConfigOverrides: Configuration + ): (MiniCluster, Boolean => StreamExecutionEnvironment) = + factoryInvoker.invokeStaticMethod(modelClassLoader, miniClusterConfigOverrides, streamExecutionConfigOverrides) + +} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/legacyadhocminicluster/LegacyAdHocMiniClusterFallbackHandler.scala similarity index 84% rename from engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala rename to engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/legacyadhocminicluster/LegacyAdHocMiniClusterFallbackHandler.scala index 1b943fb0c4a..81c2b2406d9 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/legacyadhocminicluster/LegacyAdHocMiniClusterFallbackHandler.scala @@ -1,12 +1,12 @@ -package pl.touk.nussknacker.engine.process.scenariotesting +package pl.touk.nussknacker.engine.process.scenariotesting.legacyadhocminicluster import com.typesafe.scalalogging.LazyLogging -import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions} +import org.apache.flink.configuration.{Configuration, TaskManagerOptions} import org.apache.flink.runtime.minicluster.MiniCluster import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import pl.touk.nussknacker.engine.api.StreamMetaData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.process.minicluster.MiniClusterFactory +import pl.touk.nussknacker.engine.process.scenariotesting.StreamExecutionEnvironmentWithParallelismOverride import pl.touk.nussknacker.engine.util.MetaDataExtractor import pl.touk.nussknacker.engine.util.loader.ModelClassLoader @@ -14,7 +14,7 @@ import scala.concurrent.{ExecutionContext, Future} // This class handles a legacy ad-hoc way to create minicluster. // TODO: After we fully switch to shared mini cluster approach, it should be removed -class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCaseForDebug: String) extends LazyLogging { +class LegacyAdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCaseForDebug: String) extends LazyLogging { def handleAdHocMniClusterFallbackAsync[R]( sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride], @@ -63,15 +63,12 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas .getOrElse(1) val legacyMiniClusterConfigOverrides = new Configuration legacyMiniClusterConfigOverrides.set[java.lang.Integer](TaskManagerOptions.NUM_TASK_SLOTS, scenarioParallelism) - val miniClusterWithServices = MiniClusterFactory.createMiniClusterWithServices( + val (miniCluster, createStreamExecutionEnvironment) = FlinkMiniClusterFactoryReflectiveInvoker.createMiniClusterWithServicesRaw( modelClassLoader, legacyMiniClusterConfigOverrides, new Configuration() ) - AdHocMiniClusterResources( - miniClusterWithServices.miniCluster, - miniClusterWithServices.createStreamExecutionEnvironment(attached = true) - ) + AdHocMiniClusterResources(miniCluster, createStreamExecutionEnvironment(true)) } private case class AdHocMiniClusterResources(miniCluster: MiniCluster, env: StreamExecutionEnvironment) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala index 3e47fd1632c..3bb658e9f42 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.management import net.ceedubs.ficus.Ficus import net.ceedubs.ficus.readers.ValueReader import org.apache.flink.configuration.Configuration -import pl.touk.nussknacker.engine.process.minicluster.MiniClusterConfig +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterConfig import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.jdk.CollectionConverters._ @@ -25,7 +25,7 @@ final case class FlinkConfig( FlinkWaitForDuringDeployFinishedConfig(enabled = true, Some(180), Some(1 second)), scenarioStateRequestTimeout: FiniteDuration = 3 seconds, jobConfigsCacheSize: Int = 1000, - miniCluster: MiniClusterConfig = MiniClusterConfig() + miniCluster: FlinkMiniClusterConfig = FlinkMiniClusterConfig() ) object FlinkConfig { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index 35a3c89a87a..2de6814d0c4 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -11,9 +11,12 @@ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId} +import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.{ + FlinkMiniClusterScenarioStateVerifier, + FlinkMiniClusterScenarioTestRunner +} +import pl.touk.nussknacker.engine.flink.minicluster.{FlinkMiniClusterConfig, FlinkMiniClusterFactory} import pl.touk.nussknacker.engine.management.FlinkDeploymentManager.prepareProgramArgs -import pl.touk.nussknacker.engine.management.scenariotesting.{FlinkProcessTestRunner, FlinkProcessVerifier} -import pl.touk.nussknacker.engine.process.minicluster.{MiniClusterConfig, MiniClusterFactory} import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment} import scala.concurrent.Future @@ -23,26 +26,26 @@ abstract class FlinkDeploymentManager( dependencies: DeploymentManagerDependencies, shouldVerifyBeforeDeploy: Boolean, mainClassName: String, - scenarioTestingConfig: MiniClusterConfig + scenarioTestingConfig: FlinkMiniClusterConfig ) extends DeploymentManager with LazyLogging { import dependencies._ private val miniClusterWithServicesOpt = { - MiniClusterFactory.createMiniClusterWithServicesIfConfigured( + FlinkMiniClusterFactory.createMiniClusterWithServicesIfConfigured( modelData.modelClassLoader, scenarioTestingConfig ) } - private val testRunner = new FlinkProcessTestRunner( + private val testRunner = new FlinkMiniClusterScenarioTestRunner( modelData, miniClusterWithServicesOpt .filter(_ => scenarioTestingConfig.useForScenarioTesting) ) - private val verification = new FlinkProcessVerifier( + private val verification = new FlinkMiniClusterScenarioStateVerifier( modelData, miniClusterWithServicesOpt .filter(_ => scenarioTestingConfig.useForScenarioStateVerification) diff --git a/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterConfig.scala b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterConfig.scala similarity index 59% rename from engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterConfig.scala rename to engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterConfig.scala index d38edc2b4d8..a50931fa19f 100644 --- a/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterConfig.scala +++ b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterConfig.scala @@ -1,9 +1,9 @@ -package pl.touk.nussknacker.engine.process.minicluster +package pl.touk.nussknacker.engine.flink.minicluster import org.apache.flink.configuration.Configuration -final case class MiniClusterConfig( - // TODO: remove after fully migration, see AdHocMiniClusterFallbackHandler +final case class FlinkMiniClusterConfig( + // TODO: remove after fully migration, see LegacyAdHocMiniClusterFallbackHandler useForScenarioTesting: Boolean = true, useForScenarioStateVerification: Boolean = true, config: Configuration = new Configuration, diff --git a/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterFactory.scala similarity index 73% rename from engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala rename to engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterFactory.scala index a70e8fed082..9584490cd21 100644 --- a/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala +++ b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterFactory.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.process.minicluster +package pl.touk.nussknacker.engine.flink.minicluster import com.typesafe.scalalogging.LazyLogging import org.apache.flink.configuration._ @@ -8,10 +8,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import java.net.URLClassLoader -object MiniClusterFactory extends LazyLogging { +object FlinkMiniClusterFactory extends LazyLogging { + + private[minicluster] val DefaultTaskSlots = 8 // It is method instead of value because Configuration is mutable - private def defaultMiniClusterConfig: Configuration = { + private def DefaultMiniClusterConfig: Configuration = { val config = new Configuration // To avoid ports collisions config.set[Integer](JobManagerOptions.PORT, 0) @@ -25,13 +27,13 @@ object MiniClusterFactory extends LazyLogging { config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m")) config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("100m")) // Reasonable number of available parallel slots - config.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, 8) + config.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, DefaultTaskSlots) config } def createMiniClusterWithServicesIfConfigured( modelClassLoader: URLClassLoader, - config: MiniClusterConfig + config: FlinkMiniClusterConfig ): Option[MiniClusterWithServices] = { if (config.useForScenarioTesting || config.useForScenarioStateVerification) { Some(createMiniClusterWithServices(modelClassLoader, config.config, config.streamExecutionEnvConfig)) @@ -45,14 +47,27 @@ object MiniClusterFactory extends LazyLogging { miniClusterConfigOverrides: Configuration, streamExecutionConfigOverrides: Configuration ): MiniClusterWithServices = { - val miniClusterConfig = defaultMiniClusterConfig + val (miniCluster, createStreamExecutionEnv) = + createMiniClusterWithServicesRaw(modelClassLoader, miniClusterConfigOverrides, streamExecutionConfigOverrides) + new MiniClusterWithServices(miniCluster, createStreamExecutionEnv) + } + + // This class return raw objects (tuple, library classes) because is used by reflection and it simplify it + // TODO: Remove raw variant of this method after fully switch to shared mini cluster, see LegacyAdHocMiniClusterFallbackHandler + def createMiniClusterWithServicesRaw( + modelClassLoader: URLClassLoader, + miniClusterConfigOverrides: Configuration, + streamExecutionConfigOverrides: Configuration + ): (MiniCluster, Boolean => StreamExecutionEnvironment) = { + val miniClusterConfig = DefaultMiniClusterConfig miniClusterConfig.addAll(miniClusterConfigOverrides) logger.debug(s"Creating MiniCluster with configuration: $miniClusterConfig") val miniCluster = createMiniCluster(miniClusterConfig) + miniCluster.start() def createStreamExecutionEnv(attached: Boolean): StreamExecutionEnvironment = { - MiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment( + FlinkMiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment( miniCluster, modelClassLoader, streamExecutionConfigOverrides, @@ -60,21 +75,19 @@ object MiniClusterFactory extends LazyLogging { ) } - new MiniClusterWithServices(miniCluster, createStreamExecutionEnv) + (miniCluster, createStreamExecutionEnv) } private def createMiniCluster(configuration: Configuration) = { // it is required for proper working of HadoopFileSystem FileSystem.initialize(configuration, null) - val miniCluster = new MiniCluster( + new MiniCluster( new MiniClusterConfiguration.Builder() .setNumTaskManagers(configuration.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS)) .setNumSlotsPerTaskManager(configuration.get(TaskManagerOptions.NUM_TASK_SLOTS)) .setConfiguration(configuration) .build() ) - miniCluster.start() - miniCluster } } diff --git a/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterStreamExecutionEnvironmentFactory.scala b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterStreamExecutionEnvironmentFactory.scala similarity index 96% rename from engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterStreamExecutionEnvironmentFactory.scala rename to engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterStreamExecutionEnvironmentFactory.scala index 9ffc3a0cb2c..58b91447fa6 100644 --- a/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterStreamExecutionEnvironmentFactory.scala +++ b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/FlinkMiniClusterStreamExecutionEnvironmentFactory.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.process.minicluster +package pl.touk.nussknacker.engine.flink.minicluster import org.apache.flink.api.common.JobSubmissionResult import org.apache.flink.api.dag.Pipeline @@ -13,7 +13,7 @@ import java.net.URLClassLoader import java.util.{stream => jstream} import scala.jdk.CollectionConverters._ -object MiniClusterStreamExecutionEnvironmentFactory { +object FlinkMiniClusterStreamExecutionEnvironmentFactory { private val pipelineExecutorName = "minicluster" diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioStateVerifier.scala similarity index 78% rename from engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala rename to engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioStateVerifier.scala index 60fae610e78..fc1100cfe34 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala +++ b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioStateVerifier.scala @@ -1,23 +1,23 @@ -package pl.touk.nussknacker.engine.management.scenariotesting +package pl.touk.nussknacker.engine.flink.minicluster.scenariotesting import com.typesafe.scalalogging.LazyLogging import pl.touk.nussknacker.engine.BaseModelData import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.process.minicluster.MiniClusterWithServices +import pl.touk.nussknacker.engine.flink.minicluster.MiniClusterWithServices import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} -class FlinkProcessVerifier( +class FlinkMiniClusterScenarioStateVerifier( modelData: BaseModelData, miniClusterWithServicesOpt: Option[MiniClusterWithServices] ) extends LazyLogging { - // We use reflection, because we don't want to bundle flinkExecutor.jar inside flinkDeploymentManager assembly jar + // We use reflection, because we don't want to bundle flinkExecutor.jar inside deployment manager assembly jar // because it is already in separate assembly for purpose of sending it to Flink during deployment. - // Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded + // Other option would be to add flinkExecutor.jar to classpath from which DM is loaded private val mainRunner = new ReflectiveMethodInvoker[Unit]( modelData.modelClassLoader, "pl.touk.nussknacker.engine.process.scenariotesting.FlinkVerificationMain", @@ -30,13 +30,13 @@ class FlinkProcessVerifier( savepointPath: String ): Try[Unit] = { val processId = processVersion.processName - val streamExecutionEnvWithTotalSlots = miniClusterWithServicesOpt.map(miniClusterWithServices => + val streamExecutionEnvWithParallelismOverride = miniClusterWithServicesOpt.map(miniClusterWithServices => (miniClusterWithServices.createStreamExecutionEnvironment(attached = true), 1) ) try { logger.info(s"Starting to verify $processId") mainRunner.invokeStaticMethod( - streamExecutionEnvWithTotalSlots, + streamExecutionEnvWithParallelismOverride, modelData, canonicalProcess, processVersion, @@ -54,7 +54,7 @@ class FlinkProcessVerifier( ) ) } finally { - streamExecutionEnvWithTotalSlots.foreach(_._1.close()) + streamExecutionEnvWithParallelismOverride.foreach(_._1.close()) } } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunner.scala similarity index 75% rename from engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala rename to engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunner.scala index 7bb3cb73ade..3780561aaa8 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala +++ b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunner.scala @@ -1,16 +1,16 @@ -package pl.touk.nussknacker.engine.management.scenariotesting +package pl.touk.nussknacker.engine.flink.minicluster.scenariotesting import io.circe.Json import pl.touk.nussknacker.engine.BaseModelData import pl.touk.nussknacker.engine.api.test.ScenarioTestData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.process.minicluster.MiniClusterWithServices +import pl.touk.nussknacker.engine.flink.minicluster.MiniClusterWithServices import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker import scala.concurrent.{ExecutionContext, Future} -class FlinkProcessTestRunner( +class FlinkMiniClusterScenarioTestRunner( modelData: BaseModelData, miniClusterWithServicesOpt: Option[MiniClusterWithServices] ) { @@ -18,9 +18,9 @@ class FlinkProcessTestRunner( // TODO: configurable? private val ScenarioTestingParallelism = 1 - // We use reflection, because we don't want to bundle flinkExecutor.jar inside flinkDeploymentManager assembly jar + // We use reflection, because we don't want to bundle flinkExecutor.jar inside deployment manager assembly jar // because it is already in separate assembly for purpose of sending it to Flink during deployment. - // Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded + // Other option would be to add flinkExecutor.jar to classpath from which DM is loaded private val mainRunner = new ReflectiveMethodInvoker[Future[TestResults[Json]]]( modelData.modelClassLoader, "pl.touk.nussknacker.engine.process.scenariotesting.FlinkTestMain", @@ -31,17 +31,17 @@ class FlinkProcessTestRunner( def runTests(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)( implicit ec: ExecutionContext ): Future[TestResults[Json]] = { - val streamExecutionEnvWithTotalSlots = miniClusterWithServicesOpt.map(miniClusterWithServices => + val streamExecutionEnvWithParallelismOverride = miniClusterWithServicesOpt.map(miniClusterWithServices => (miniClusterWithServices.createStreamExecutionEnvironment(attached = true), ScenarioTestingParallelism) ) val resultFuture = mainRunner.invokeStaticMethod( - streamExecutionEnvWithTotalSlots, + streamExecutionEnvWithParallelismOverride, modelData, canonicalProcess, scenarioTestData ) resultFuture.onComplete { _ => - streamExecutionEnvWithTotalSlots.foreach(_._1.close()) + streamExecutionEnvWithParallelismOverride.foreach(_._1.close()) } resultFuture } diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala b/engine/flink/minicluster/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunnerSpec.scala similarity index 94% rename from engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala rename to engine/flink/minicluster/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunnerSpec.scala index 709f73401ae..515bb944cf3 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala +++ b/engine/flink/minicluster/src/test/scala/pl/touk/nussknacker/engine/flink/minicluster/scenariotesting/FlinkMiniClusterScenarioTestRunnerSpec.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.management.scenariotesting +package pl.touk.nussknacker.engine.flink.minicluster.scenariotesting import cats.effect.unsafe.IORuntime import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} @@ -26,6 +26,11 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.canonicalgraph.canonicalnode.FlatNode import pl.touk.nussknacker.engine.compile.FragmentResolver import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs +import pl.touk.nussknacker.engine.flink.minicluster.{FlinkMiniClusterConfig, FlinkMiniClusterFactory} +import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunnerSpec.{ + fragmentWithValidationName, + processWithFragmentParameterValidation +} import pl.touk.nussknacker.engine.flink.test.{ FlinkTestConfiguration, RecordingExceptionConsumer, @@ -34,12 +39,7 @@ import pl.touk.nussknacker.engine.flink.test.{ import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter} import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition} -import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunnerSpec.{ - fragmentWithValidationName, - processWithFragmentParameterValidation -} import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ -import pl.touk.nussknacker.engine.process.minicluster.MiniClusterFactory import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ModelClassLoader} import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} @@ -49,7 +49,7 @@ import java.util.{Date, UUID} import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -class FlinkProcessTestRunnerSpec +class FlinkMiniClusterScenarioTestRunnerSpec extends AnyWordSpec with Matchers with Inside @@ -77,7 +77,7 @@ class FlinkProcessTestRunnerSpec deploymentManagersClassLoaderInstance ) - private val miniClusterWithServices = MiniClusterFactory.createMiniClusterWithServices( + private val miniClusterWithServices = FlinkMiniClusterFactory.createMiniClusterWithServices( modelClassLoader, miniClusterConfigOverrides = new Configuration, streamExecutionConfigOverrides = new Configuration @@ -213,6 +213,43 @@ class FlinkProcessTestRunnerSpec runTestAndVerify() } + "be able to run tests with legacy ad-hoc mini cluster" in { + val process = + ScenarioBuilder + .streaming(scenarioName) + .source(sourceNodeId, "input") + .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) + + val input = SimpleRecord("0", 11, "2", new Date(3), Some(4), 5, "6") + + val testRunner = prepareTestRunner(useIOMonadInInterpreter, useLegacyAdHocMiniCluster = true) + + val results = testRunner + .runTests( + process, + ScenarioTestData( + List( + ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6")) + ) + ) + )(ExecutionContext.global) + .futureValue + + val nodeResults = results.nodeResults + + nodeResults(sourceNodeId) shouldBe List(nodeResult(0, "input" -> input)) + nodeResults("out") shouldBe List(nodeResult(0, "input" -> input)) + + val invocationResults = results.invocationResults + + invocationResults("out") shouldBe + List(ExpressionInvocationResult(s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-0", "Value", variable(11))) + + results.externalInvocationResults("out") shouldBe List( + ExternalInvocationResult(s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-0", "valueMonitor", variable(11)) + ) + } + "collect results for split" in { val process = ScenarioBuilder @@ -305,7 +342,7 @@ class FlinkProcessTestRunnerSpec val process = ScenarioBuilder .streaming(scenarioName) - .parallelism(4) + .parallelism(FlinkMiniClusterFactory.DefaultTaskSlots + 1) .source(sourceNodeId, "input") .emptySink("out", "monitor") @@ -877,8 +914,9 @@ class FlinkProcessTestRunnerSpec private def prepareTestRunner( useIOMonadInInterpreter: Boolean, enrichDefaultConfig: Config => Config = identity, - additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty - ): FlinkProcessTestRunner = { + additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty, + useLegacyAdHocMiniCluster: Boolean = false + ): FlinkMiniClusterScenarioTestRunner = { val config = enrichDefaultConfig(ConfigFactory.load("application.conf")) .withValue("globalParameters.useIOMonadInInterpreter", ConfigValueFactory.fromAnyRef(useIOMonadInInterpreter)) @@ -888,7 +926,7 @@ class FlinkProcessTestRunnerSpec modelClassLoader, resolveConfigs = false ) - new FlinkProcessTestRunner(modelData, Some(miniClusterWithServices)) + new FlinkMiniClusterScenarioTestRunner(modelData, Some(miniClusterWithServices).filterNot(_ => useLegacyAdHocMiniCluster)) } private def nodeResult(count: Int, vars: (String, Any)*): ResultContext[_] = @@ -923,7 +961,7 @@ class FlinkProcessTestRunnerSpec } -object FlinkProcessTestRunnerSpec { +object FlinkMiniClusterScenarioTestRunnerSpec { private val fragmentWithValidationName = "fragmentWithValidation" private val processWithFragmentParameterValidation: CanonicalProcess = { diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala index 681edfd3297..c20e6dc814f 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala @@ -15,14 +15,14 @@ import org.scalatest.{BeforeAndAfterAll, OptionValues} import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory +import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleValueParamName import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName import pl.touk.nussknacker.engine.kafka.source.InputMeta import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator.ResultsHolders -import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunner -import pl.touk.nussknacker.engine.process.minicluster.MiniClusterFactory import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.util.json.ToJsonEncoder @@ -65,14 +65,14 @@ class KafkaScenarioTestingSpec ModelClassLoader(FlinkTestConfiguration.classpathWorkaround, None, deploymentManagersClassLoaderInstance) ) - private val miniClusterWithServices = MiniClusterFactory.createMiniClusterWithServices( + private val miniClusterWithServices = FlinkMiniClusterFactory.createMiniClusterWithServices( modelData.modelClassLoader, miniClusterConfigOverrides = new Configuration, streamExecutionConfigOverrides = new Configuration ) private val testRunner = - new FlinkProcessTestRunner(modelData, Some(miniClusterWithServices)) + new FlinkMiniClusterScenarioTestRunner(modelData, Some(miniClusterWithServices)) override protected def afterAll(): Unit = { super.afterAll() diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala index 94a7766f468..2ad346dd24e 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala @@ -16,14 +16,14 @@ import org.scalatest.{BeforeAndAfterAll, LoneElement, OptionValues} import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory +import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleValueParamName import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName import pl.touk.nussknacker.engine.kafka.source.InputMeta -import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunner import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder -import pl.touk.nussknacker.engine.process.minicluster.MiniClusterFactory import pl.touk.nussknacker.engine.schemedkafka.KafkaAvroIntegrationMockSchemaRegistry.schemaRegistryMockClient import pl.touk.nussknacker.engine.schemedkafka.KafkaAvroTestProcessConfigCreator import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer.{ @@ -86,14 +86,14 @@ class SchemedKafkaScenarioTestingSpec ModelClassLoader(FlinkTestConfiguration.classpathWorkaround, None, deploymentManagersClassLoaderInstance) ) - private val miniClusterWithServices = MiniClusterFactory.createMiniClusterWithServices( + private val miniClusterWithServices = FlinkMiniClusterFactory.createMiniClusterWithServices( modelData.modelClassLoader, miniClusterConfigOverrides = new Configuration, streamExecutionConfigOverrides = new Configuration ) private val testRunner = - new FlinkProcessTestRunner(modelData, Some(miniClusterWithServices)) + new FlinkMiniClusterScenarioTestRunner(modelData, Some(miniClusterWithServices)) override protected def afterAll(): Unit = { super.afterAll() diff --git a/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala b/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala index 0e013712222..9e8dca7ea1e 100644 --- a/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala +++ b/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala @@ -1,19 +1,28 @@ package pl.touk.nussknacker.engine.util +import com.typesafe.scalalogging.LazyLogging + import java.lang.reflect.InvocationTargetException -final class ReflectiveMethodInvoker[Result](classLoader: ClassLoader, className: String, methodName: String) { +final class ReflectiveMethodInvoker[Result](classLoader: ClassLoader, className: String, methodName: String) + extends LazyLogging { import scala.reflect.runtime.{universe => ru} private val invoker: ru.MethodMirror = { - val m = ru.runtimeMirror(classLoader) - val module = m.staticModule(className) - val im = m.reflectModule(module) - val method = im.symbol.info.decl(ru.TermName(methodName)).asMethod - val objMirror = m.reflect(im.instance) - val r = objMirror.reflectMethod(method) - r + try { + val m = ru.runtimeMirror(classLoader) + val module = m.staticModule(className) + val im = m.reflectModule(module) + val method = im.symbol.info.decl(ru.TermName(methodName)).asMethod + val objMirror = m.reflect(im.instance) + val r = objMirror.reflectMethod(method) + r + } catch { + case e: ScalaReflectionException => + logger.error(s"Error while invoking method $className.$methodName with classloader: $classLoader", e) + throw e + } } // we have to use context loader, as in UI we have don't have e.g. nussknacker-process or user model on classpath...