diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala index 0473bd6c0cc..9acb33b3e9d 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/minicluster/MiniClusterFactory.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.engine.process.minicluster import com.typesafe.scalalogging.LazyLogging -import org.apache.flink.configuration.{Configuration, TaskManagerOptions} +import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -9,18 +9,39 @@ import pl.touk.nussknacker.engine.util.loader.ModelClassLoader object MiniClusterFactory extends LazyLogging { + // It is method instead of value because Configuration is mutable + private def defaultMiniClusterConfig: Configuration = { + val config = new Configuration + // To avoid ports collisions + config.set[Integer](JobManagerOptions.PORT, 0) + config.set[Integer](RestOptions.PORT, 0) + // FIXME abr: verify if needed + // FIXME: reverse flink default order +// config.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") + // In some setups we create a few Flink DMs. Each of them creates its own mini cluster. + // To reduce footprint we decrease off-heap memory buffers size and managed memory + config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m")) + config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m")) + config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("100m")) + // Reasonable number of available parallel slots + config.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, 8) + config + } + def createMiniClusterWithStreamExecutionEnvironmentFactory( modelClassLoader: ModelClassLoader, - miniClusterConfig: Configuration, - streamExecutionConfig: Configuration + miniClusterConfigOverrides: Configuration, + streamExecutionConfigOverrides: Configuration ): (MiniCluster, Boolean => StreamExecutionEnvironment) = { + val miniClusterConfig = defaultMiniClusterConfig + miniClusterConfig.addAll(miniClusterConfigOverrides) logger.debug(s"Creating MiniCluster with configuration: $miniClusterConfig") val miniCluster = createMiniCluster(miniClusterConfig) def createStreamExecutionEnv(attached: Boolean): StreamExecutionEnvironment = { MiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment( miniCluster, modelClassLoader, - streamExecutionConfig, + streamExecutionConfigOverrides, attached ) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala index 855aad0085c..844c6a1489f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala @@ -17,9 +17,9 @@ import scala.concurrent.{ExecutionContext, Future} class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCaseForDebug: String) extends LazyLogging { def handleAdHocMniClusterFallbackAsync[R]( - sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots], + sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride], scenario: CanonicalProcess - )(f: StreamExecutionEnvironmentWithTotalSlots => Future[R])(implicit ec: ExecutionContext): Future[R] = { + )(f: StreamExecutionEnvironmentWithParallelismOverride => Future[R])(implicit ec: ExecutionContext): Future[R] = { val (allocatedMiniClusterResourcesOpt, streamEnvWithMaxParallelism) = useSharedMiniClusterOrAdHoc(sharedMiniClusterServicesOpt, scenario) val resultFuture = f(streamEnvWithMaxParallelism) @@ -28,9 +28,9 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas } def handleAdHocMniClusterFallback[R]( - sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots], + sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride], scenario: CanonicalProcess - )(f: StreamExecutionEnvironmentWithTotalSlots => R): R = { + )(f: StreamExecutionEnvironmentWithParallelismOverride => R): R = { val (allocatedMiniClusterResourcesOpt, streamEnvWithMaxParallelism) = useSharedMiniClusterOrAdHoc(sharedMiniClusterServicesOpt, scenario) try { @@ -41,9 +41,9 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas } private def useSharedMiniClusterOrAdHoc[R]( - sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots], + sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride], scenario: CanonicalProcess - ): (Option[AdHocMiniClusterResources], StreamExecutionEnvironmentWithTotalSlots) = { + ): (Option[AdHocMiniClusterResources], StreamExecutionEnvironmentWithParallelismOverride) = { sharedMiniClusterServicesOpt .map { sharedMiniClusterServices => logger.debug(s"Shared MiniCluster used for $useCaseForDebug") @@ -52,7 +52,7 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas .getOrElse { logger.debug(s"Shared MiniCluster not used for $useCaseForDebug. Creating ad-hoc MiniCluster") val resources = createAdHocMiniClusterResources(scenario) - (Some(resources), StreamExecutionEnvironmentWithTotalSlots.withoutMaxParallelism(resources.env)) + (Some(resources), StreamExecutionEnvironmentWithParallelismOverride.withoutParallelismOverriding(resources.env)) } } @@ -61,14 +61,13 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) .parallelism .getOrElse(1) - val legacyMiniClusterConfig = new Configuration - legacyMiniClusterConfig.set[Integer](RestOptions.PORT, 0) - // FIXME: reversing flink default order - legacyMiniClusterConfig.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") - legacyMiniClusterConfig.set[java.lang.Integer](TaskManagerOptions.NUM_TASK_SLOTS, scenarioParallelism) + val legacyMiniClusterConfigOverride = new Configuration + // It is left as it was before + legacyMiniClusterConfigOverride.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") + legacyMiniClusterConfigOverride.set[java.lang.Integer](TaskManagerOptions.NUM_TASK_SLOTS, scenarioParallelism) val (miniCluster, createEnv) = MiniClusterFactory.createMiniClusterWithStreamExecutionEnvironmentFactory( modelClassLoader, - legacyMiniClusterConfig, + legacyMiniClusterConfigOverride, new Configuration() ) AdHocMiniClusterResources(miniCluster, createEnv(true)) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala index b4d73f917c5..640d70d588b 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala @@ -30,7 +30,7 @@ object FlinkTestMain { scenarioTestData: ScenarioTestData ): Future[TestResults[Json]] = { new FlinkTestMain( - sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithTotalSlots.apply _ tupled), + sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithParallelismOverride.apply _ tupled), modelData ) .testScenario(scenario, scenarioTestData) @@ -39,7 +39,7 @@ object FlinkTestMain { } class FlinkTestMain( - sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots], + sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride], modelData: ModelData ) { @@ -53,18 +53,19 @@ class FlinkTestMain( scenario, ) { streamExecutionEnvWithMaxParallelism => import streamExecutionEnvWithMaxParallelism._ - val alignedScenario = streamExecutionEnvWithMaxParallelism.alignParallelismIfNeeded(scenario) - val resultCollector = new TestServiceInvocationCollector(collectingListener) + val scenarioWithOverrodeParallelism = streamExecutionEnvWithMaxParallelism.overrideParallelismIfNeeded(scenario) + val resultCollector = new TestServiceInvocationCollector(collectingListener) // ProcessVersion can't be passed from DM because testing mechanism can be used with not saved scenario - val processVersion = ProcessVersion.empty.copy(processName = alignedScenario.name) + val processVersion = ProcessVersion.empty.copy(processName = scenarioWithOverrodeParallelism.name) val deploymentData = DeploymentData.empty.copy(additionalModelConfigs = AdditionalModelConfigs(modelData.additionalConfigsFromProvider) ) - val registrar = prepareRegistrar(collectingListener, alignedScenario, scenarioTestData, processVersion) + val registrar = + prepareRegistrar(collectingListener, scenarioWithOverrodeParallelism, scenarioTestData, processVersion) streamExecutionEnv.getCheckpointConfig.disableCheckpointing() registrar.register( streamExecutionEnv, - alignedScenario, + scenarioWithOverrodeParallelism, processVersion, deploymentData, resultCollector @@ -72,7 +73,7 @@ class FlinkTestMain( // TODO: Non-blocking future periodically checking if job is finished val resultFuture = Future { blocking { - streamExecutionEnv.execute(alignedScenario.name.value) + streamExecutionEnv.execute(scenarioWithOverrodeParallelism.name.value) collectingListener.results } } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala index 065fe51ad4d..102458e89c7 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala @@ -21,7 +21,7 @@ object FlinkVerificationMain { savepointPath: String ): Unit = new FlinkVerificationMain( - sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithTotalSlots.apply _ tupled), + sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithParallelismOverride.apply _ tupled), modelData ).runTest( scenario, @@ -32,7 +32,7 @@ object FlinkVerificationMain { } class FlinkVerificationMain( - sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots], + sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride], modelData: ModelData, ) { @@ -47,15 +47,15 @@ class FlinkVerificationMain( scenario, ) { streamExecutionEnvWithMaxParallelism => import streamExecutionEnvWithMaxParallelism._ - val alignedScenario = streamExecutionEnvWithMaxParallelism.alignParallelismIfNeeded(scenario) - val resultCollector = new TestServiceInvocationCollector(collectingListener) - val registrar = prepareRegistrar(alignedScenario) - val deploymentData = DeploymentData.empty + val scenarioWithOverrodeParallelism = streamExecutionEnvWithMaxParallelism.overrideParallelismIfNeeded(scenario) + val resultCollector = new TestServiceInvocationCollector(collectingListener) + val registrar = prepareRegistrar(scenarioWithOverrodeParallelism) + val deploymentData = DeploymentData.empty streamExecutionEnv.getCheckpointConfig.disableCheckpointing() registrar.register( streamExecutionEnv, - alignedScenario, + scenarioWithOverrodeParallelism, processVersion, deploymentData, resultCollector diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioParallelismAligner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioParallelismAligner.scala deleted file mode 100644 index 8284904a22a..00000000000 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioParallelismAligner.scala +++ /dev/null @@ -1,7 +0,0 @@ -package pl.touk.nussknacker.engine.process.scenariotesting - -import pl.touk.nussknacker.engine.api.StreamMetaData -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.util.MetaDataExtractor - -class ScenarioParallelismAligner(maxParallelism: Int) {} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/StreamExecutionEnvironmentWithTotalSlots.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/StreamExecutionEnvironmentWithParallelismOverride.scala similarity index 65% rename from engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/StreamExecutionEnvironmentWithTotalSlots.scala rename to engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/StreamExecutionEnvironmentWithParallelismOverride.scala index 0c55fc29551..03e768b6b5c 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/StreamExecutionEnvironmentWithTotalSlots.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/StreamExecutionEnvironmentWithParallelismOverride.scala @@ -5,13 +5,13 @@ import pl.touk.nussknacker.engine.api.StreamMetaData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.util.MetaDataExtractor -class StreamExecutionEnvironmentWithTotalSlots private ( +class StreamExecutionEnvironmentWithParallelismOverride private( val streamExecutionEnv: StreamExecutionEnvironment, - maxParallelismOpt: Option[Int] + parallelismOverride: Option[Int] ) { - def alignParallelismIfNeeded(canonicalProcess: CanonicalProcess): CanonicalProcess = { - maxParallelismOpt.map { maxParallelism => + def overrideParallelismIfNeeded(canonicalProcess: CanonicalProcess): CanonicalProcess = { + parallelismOverride.map { maxParallelism => val scenarioParallelism = MetaDataExtractor .extractTypeSpecificDataOrDefault[StreamMetaData](canonicalProcess.metaData, StreamMetaData()) .parallelism @@ -31,12 +31,12 @@ class StreamExecutionEnvironmentWithTotalSlots private ( } -object StreamExecutionEnvironmentWithTotalSlots { +object StreamExecutionEnvironmentWithParallelismOverride { - def apply(env: StreamExecutionEnvironment, maxParallelism: Int): StreamExecutionEnvironmentWithTotalSlots = - new StreamExecutionEnvironmentWithTotalSlots(env, Some(maxParallelism)) + def apply(env: StreamExecutionEnvironment, maxParallelism: Int): StreamExecutionEnvironmentWithParallelismOverride = + new StreamExecutionEnvironmentWithParallelismOverride(env, Some(maxParallelism)) - def withoutMaxParallelism(env: StreamExecutionEnvironment) = - new StreamExecutionEnvironmentWithTotalSlots(env, None) + def withoutParallelismOverriding(env: StreamExecutionEnvironment) = + new StreamExecutionEnvironmentWithParallelismOverride(env, None) } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala index 80ea0c4d313..6307c944ca1 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.management import net.ceedubs.ficus.Ficus import net.ceedubs.ficus.readers.ValueReader -import org.apache.flink.configuration.{Configuration, CoreOptions, MemorySize, RestOptions, TaskManagerOptions} +import org.apache.flink.configuration.Configuration import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.jdk.CollectionConverters._ @@ -60,7 +60,7 @@ final case class MiniClusterConfig( // TODO: remove after fully migration, see AdHocMiniClusterFallbackHandler useForScenarioTesting: Boolean = true, useForScenarioStateVerification: Boolean = true, - config: Configuration = MiniClusterConfig.defaultMiniClusterConfig, + config: Configuration = new Configuration, streamExecutionEnvConfig: Configuration = new Configuration ) @@ -71,17 +71,4 @@ object MiniClusterConfig { implicit val flinkConfigurationValueReader: ValueReader[Configuration] = Ficus.mapValueReader[String].map(map => Configuration.fromMap(map.asJava)) - private[nussknacker] val defaultMiniClusterConfig: Configuration = { - val config = new Configuration - config.set[Integer](RestOptions.PORT, 0) - // FIXME: reversing flink default order - config.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") - // In some setups we create a few Flink DMs. Each of them creates its own mini cluster. - // To reduce footprint we decrease off-heap memory buffers size and managed memory - config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m")) - config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m")) - config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("50m")) - config - } - } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/minicluster/MiniClusterFactoryReflectiveInvoker.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/minicluster/MiniClusterFactoryReflectiveInvoker.scala index 9e96cb35bf8..576cc065b79 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/minicluster/MiniClusterFactoryReflectiveInvoker.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/minicluster/MiniClusterFactoryReflectiveInvoker.scala @@ -35,15 +35,11 @@ object MiniClusterFactoryReflectiveInvoker { ) val (miniCluster, streamExecutionEnvironmentFactory) = methodInvoker.invokeStaticMethod(modelClassLoader, miniClusterConfig, streamExecutionEnvConfig) - val totalSlots = - miniClusterConfig.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS) * - miniClusterConfig.get(TaskManagerOptions.NUM_TASK_SLOTS) - new MiniClusterWithServices(miniCluster, totalSlots, streamExecutionEnvironmentFactory) + new MiniClusterWithServices(miniCluster, streamExecutionEnvironmentFactory) } class MiniClusterWithServices( miniCluster: MiniCluster, - val totalSlots: Int, streamExecutionEnvironmentFactory: Boolean => StreamExecutionEnvironment ) extends AutoCloseable { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala index c9e83ede196..70a0a95855f 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.management.scenariotesting import io.circe.Json -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.test.ScenarioTestData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -16,6 +15,9 @@ class FlinkProcessTestRunner( miniClusterWithServicesOpt: Option[MiniClusterWithServices] ) { + // TODO: configurable? + private val ScenarioTestingParallelism = 1 + // We use reflection, because we don't want to bundle flinkExecutor.jar inside flinkDeploymentManager assembly jar // because it is already in separate assembly for purpose of sending it to Flink during deployment. // Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded @@ -30,7 +32,7 @@ class FlinkProcessTestRunner( implicit ec: ExecutionContext ): Future[TestResults[Json]] = { val streamExecutionEnvWithTotalSlots = miniClusterWithServicesOpt.map(miniClusterWithServices => - (miniClusterWithServices.createStreamExecutionEnvironment(attached = true), miniClusterWithServices.totalSlots) + (miniClusterWithServices.createStreamExecutionEnvironment(attached = true), ScenarioTestingParallelism) ) val resultFuture = mainRunner.invokeStaticMethod( streamExecutionEnvWithTotalSlots, diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala index e9812dbc5d3..95c681e74fa 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala @@ -31,7 +31,7 @@ class FlinkProcessVerifier( ): Try[Unit] = { val processId = processVersion.processName val streamExecutionEnvWithTotalSlots = miniClusterWithServicesOpt.map(miniClusterWithServices => - (miniClusterWithServices.createStreamExecutionEnvironment(attached = true), miniClusterWithServices.totalSlots) + (miniClusterWithServices.createStreamExecutionEnvironment(attached = true), 1) ) try { logger.info(s"Starting to verify $processId") diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala index 563f6e99c6a..c49db3b5b11 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala @@ -34,7 +34,6 @@ import pl.touk.nussknacker.engine.flink.test.{ import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter} import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition} -import pl.touk.nussknacker.engine.management.MiniClusterConfig import pl.touk.nussknacker.engine.management.minicluster.MiniClusterFactoryReflectiveInvoker import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunnerSpec.{ fragmentWithValidationName, @@ -80,7 +79,7 @@ class FlinkProcessTestRunnerSpec private val miniClusterWithServices = MiniClusterFactoryReflectiveInvoker.create( modelClassLoader, - miniClusterConfig = MiniClusterConfig.defaultMiniClusterConfig, + miniClusterConfig = new Configuration, streamExecutionEnvConfig = new Configuration ) diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala index a7bc7873e08..c21757a8d20 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala @@ -21,7 +21,6 @@ import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName import pl.touk.nussknacker.engine.kafka.source.InputMeta import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator.ResultsHolders -import pl.touk.nussknacker.engine.management.MiniClusterConfig import pl.touk.nussknacker.engine.management.minicluster.MiniClusterFactoryReflectiveInvoker import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunner import pl.touk.nussknacker.engine.spel.SpelExtension._ @@ -68,7 +67,7 @@ class KafkaScenarioTestingSpec private val miniClusterWithServices = MiniClusterFactoryReflectiveInvoker.create( modelData.modelClassLoader, - miniClusterConfig = MiniClusterConfig.defaultMiniClusterConfig, + miniClusterConfig = new Configuration, streamExecutionEnvConfig = new Configuration ) diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala index e7d70299b08..2384b2fbfbf 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala @@ -21,7 +21,6 @@ import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleV import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName import pl.touk.nussknacker.engine.kafka.source.InputMeta -import pl.touk.nussknacker.engine.management.MiniClusterConfig import pl.touk.nussknacker.engine.management.minicluster.MiniClusterFactoryReflectiveInvoker import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunner import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder @@ -89,7 +88,7 @@ class SchemedKafkaScenarioTestingSpec private val miniClusterWithServices = MiniClusterFactoryReflectiveInvoker.create( modelData.modelClassLoader, - miniClusterConfig = MiniClusterConfig.defaultMiniClusterConfig, + miniClusterConfig = new Configuration, streamExecutionEnvConfig = new Configuration )