Skip to content

Commit

Permalink
[NU-1979] Scenario testing parallelism not related with mini cluster …
Browse files Browse the repository at this point in the history
…total slots
  • Loading branch information
arkadius committed Jan 29, 2025
1 parent 854e024 commit 526c5e5
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 77 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,47 @@
package pl.touk.nussknacker.engine.process.minicluster

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.configuration.{Configuration, TaskManagerOptions}
import org.apache.flink.configuration._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader

object MiniClusterFactory extends LazyLogging {

// It is method instead of value because Configuration is mutable
private def defaultMiniClusterConfig: Configuration = {
val config = new Configuration
// To avoid ports collisions
config.set[Integer](JobManagerOptions.PORT, 0)
config.set[Integer](RestOptions.PORT, 0)
// FIXME abr: verify if needed
// FIXME: reverse flink default order
// config.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")
// In some setups we create a few Flink DMs. Each of them creates its own mini cluster.
// To reduce footprint we decrease off-heap memory buffers size and managed memory
config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m"))
config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m"))
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("100m"))
// Reasonable number of available parallel slots
config.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, 8)
config
}

def createMiniClusterWithStreamExecutionEnvironmentFactory(
modelClassLoader: ModelClassLoader,
miniClusterConfig: Configuration,
streamExecutionConfig: Configuration
miniClusterConfigOverrides: Configuration,
streamExecutionConfigOverrides: Configuration
): (MiniCluster, Boolean => StreamExecutionEnvironment) = {
val miniClusterConfig = defaultMiniClusterConfig
miniClusterConfig.addAll(miniClusterConfigOverrides)
logger.debug(s"Creating MiniCluster with configuration: $miniClusterConfig")
val miniCluster = createMiniCluster(miniClusterConfig)
def createStreamExecutionEnv(attached: Boolean): StreamExecutionEnvironment = {
MiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment(
miniCluster,
modelClassLoader,
streamExecutionConfig,
streamExecutionConfigOverrides,
attached
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import scala.concurrent.{ExecutionContext, Future}
class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCaseForDebug: String) extends LazyLogging {

def handleAdHocMniClusterFallbackAsync[R](
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots],
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride],
scenario: CanonicalProcess
)(f: StreamExecutionEnvironmentWithTotalSlots => Future[R])(implicit ec: ExecutionContext): Future[R] = {
)(f: StreamExecutionEnvironmentWithParallelismOverride => Future[R])(implicit ec: ExecutionContext): Future[R] = {
val (allocatedMiniClusterResourcesOpt, streamEnvWithMaxParallelism) =
useSharedMiniClusterOrAdHoc(sharedMiniClusterServicesOpt, scenario)
val resultFuture = f(streamEnvWithMaxParallelism)
Expand All @@ -28,9 +28,9 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas
}

def handleAdHocMniClusterFallback[R](
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots],
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride],
scenario: CanonicalProcess
)(f: StreamExecutionEnvironmentWithTotalSlots => R): R = {
)(f: StreamExecutionEnvironmentWithParallelismOverride => R): R = {
val (allocatedMiniClusterResourcesOpt, streamEnvWithMaxParallelism) =
useSharedMiniClusterOrAdHoc(sharedMiniClusterServicesOpt, scenario)
try {
Expand All @@ -41,9 +41,9 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas
}

private def useSharedMiniClusterOrAdHoc[R](
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots],
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride],
scenario: CanonicalProcess
): (Option[AdHocMiniClusterResources], StreamExecutionEnvironmentWithTotalSlots) = {
): (Option[AdHocMiniClusterResources], StreamExecutionEnvironmentWithParallelismOverride) = {
sharedMiniClusterServicesOpt
.map { sharedMiniClusterServices =>
logger.debug(s"Shared MiniCluster used for $useCaseForDebug")
Expand All @@ -52,7 +52,7 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas
.getOrElse {
logger.debug(s"Shared MiniCluster not used for $useCaseForDebug. Creating ad-hoc MiniCluster")
val resources = createAdHocMiniClusterResources(scenario)
(Some(resources), StreamExecutionEnvironmentWithTotalSlots.withoutMaxParallelism(resources.env))
(Some(resources), StreamExecutionEnvironmentWithParallelismOverride.withoutParallelismOverriding(resources.env))
}
}

Expand All @@ -61,14 +61,13 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
val legacyMiniClusterConfig = new Configuration
legacyMiniClusterConfig.set[Integer](RestOptions.PORT, 0)
// FIXME: reversing flink default order
legacyMiniClusterConfig.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")
legacyMiniClusterConfig.set[java.lang.Integer](TaskManagerOptions.NUM_TASK_SLOTS, scenarioParallelism)
val legacyMiniClusterConfigOverride = new Configuration
// It is left as it was before
legacyMiniClusterConfigOverride.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")
legacyMiniClusterConfigOverride.set[java.lang.Integer](TaskManagerOptions.NUM_TASK_SLOTS, scenarioParallelism)
val (miniCluster, createEnv) = MiniClusterFactory.createMiniClusterWithStreamExecutionEnvironmentFactory(
modelClassLoader,
legacyMiniClusterConfig,
legacyMiniClusterConfigOverride,
new Configuration()
)
AdHocMiniClusterResources(miniCluster, createEnv(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object FlinkTestMain {
scenarioTestData: ScenarioTestData
): Future[TestResults[Json]] = {
new FlinkTestMain(
sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithTotalSlots.apply _ tupled),
sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithParallelismOverride.apply _ tupled),
modelData
)
.testScenario(scenario, scenarioTestData)
Expand All @@ -39,7 +39,7 @@ object FlinkTestMain {
}

class FlinkTestMain(
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots],
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride],
modelData: ModelData
) {

Expand All @@ -53,26 +53,27 @@ class FlinkTestMain(
scenario,
) { streamExecutionEnvWithMaxParallelism =>
import streamExecutionEnvWithMaxParallelism._
val alignedScenario = streamExecutionEnvWithMaxParallelism.alignParallelismIfNeeded(scenario)
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val scenarioWithOverrodeParallelism = streamExecutionEnvWithMaxParallelism.overrideParallelismIfNeeded(scenario)
val resultCollector = new TestServiceInvocationCollector(collectingListener)
// ProcessVersion can't be passed from DM because testing mechanism can be used with not saved scenario
val processVersion = ProcessVersion.empty.copy(processName = alignedScenario.name)
val processVersion = ProcessVersion.empty.copy(processName = scenarioWithOverrodeParallelism.name)
val deploymentData = DeploymentData.empty.copy(additionalModelConfigs =
AdditionalModelConfigs(modelData.additionalConfigsFromProvider)
)
val registrar = prepareRegistrar(collectingListener, alignedScenario, scenarioTestData, processVersion)
val registrar =
prepareRegistrar(collectingListener, scenarioWithOverrodeParallelism, scenarioTestData, processVersion)
streamExecutionEnv.getCheckpointConfig.disableCheckpointing()
registrar.register(
streamExecutionEnv,
alignedScenario,
scenarioWithOverrodeParallelism,
processVersion,
deploymentData,
resultCollector
)
// TODO: Non-blocking future periodically checking if job is finished
val resultFuture = Future {
blocking {
streamExecutionEnv.execute(alignedScenario.name.value)
streamExecutionEnv.execute(scenarioWithOverrodeParallelism.name.value)
collectingListener.results
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object FlinkVerificationMain {
savepointPath: String
): Unit =
new FlinkVerificationMain(
sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithTotalSlots.apply _ tupled),
sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithParallelismOverride.apply _ tupled),
modelData
).runTest(
scenario,
Expand All @@ -32,7 +32,7 @@ object FlinkVerificationMain {
}

class FlinkVerificationMain(
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots],
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride],
modelData: ModelData,
) {

Expand All @@ -47,15 +47,15 @@ class FlinkVerificationMain(
scenario,
) { streamExecutionEnvWithMaxParallelism =>
import streamExecutionEnvWithMaxParallelism._
val alignedScenario = streamExecutionEnvWithMaxParallelism.alignParallelismIfNeeded(scenario)
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar(alignedScenario)
val deploymentData = DeploymentData.empty
val scenarioWithOverrodeParallelism = streamExecutionEnvWithMaxParallelism.overrideParallelismIfNeeded(scenario)
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar(scenarioWithOverrodeParallelism)
val deploymentData = DeploymentData.empty

streamExecutionEnv.getCheckpointConfig.disableCheckpointing()
registrar.register(
streamExecutionEnv,
alignedScenario,
scenarioWithOverrodeParallelism,
processVersion,
deploymentData,
resultCollector
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.util.MetaDataExtractor

class StreamExecutionEnvironmentWithTotalSlots private (
class StreamExecutionEnvironmentWithParallelismOverride private(
val streamExecutionEnv: StreamExecutionEnvironment,
maxParallelismOpt: Option[Int]
parallelismOverride: Option[Int]
) {

def alignParallelismIfNeeded(canonicalProcess: CanonicalProcess): CanonicalProcess = {
maxParallelismOpt.map { maxParallelism =>
def overrideParallelismIfNeeded(canonicalProcess: CanonicalProcess): CanonicalProcess = {
parallelismOverride.map { maxParallelism =>
val scenarioParallelism = MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](canonicalProcess.metaData, StreamMetaData())
.parallelism
Expand All @@ -31,12 +31,12 @@ class StreamExecutionEnvironmentWithTotalSlots private (

}

object StreamExecutionEnvironmentWithTotalSlots {
object StreamExecutionEnvironmentWithParallelismOverride {

def apply(env: StreamExecutionEnvironment, maxParallelism: Int): StreamExecutionEnvironmentWithTotalSlots =
new StreamExecutionEnvironmentWithTotalSlots(env, Some(maxParallelism))
def apply(env: StreamExecutionEnvironment, maxParallelism: Int): StreamExecutionEnvironmentWithParallelismOverride =
new StreamExecutionEnvironmentWithParallelismOverride(env, Some(maxParallelism))

def withoutMaxParallelism(env: StreamExecutionEnvironment) =
new StreamExecutionEnvironmentWithTotalSlots(env, None)
def withoutParallelismOverriding(env: StreamExecutionEnvironment) =
new StreamExecutionEnvironmentWithParallelismOverride(env, None)

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.management

import net.ceedubs.ficus.Ficus
import net.ceedubs.ficus.readers.ValueReader
import org.apache.flink.configuration.{Configuration, CoreOptions, MemorySize, RestOptions, TaskManagerOptions}
import org.apache.flink.configuration.Configuration

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -60,7 +60,7 @@ final case class MiniClusterConfig(
// TODO: remove after fully migration, see AdHocMiniClusterFallbackHandler
useForScenarioTesting: Boolean = true,
useForScenarioStateVerification: Boolean = true,
config: Configuration = MiniClusterConfig.defaultMiniClusterConfig,
config: Configuration = new Configuration,
streamExecutionEnvConfig: Configuration = new Configuration
)

Expand All @@ -71,17 +71,4 @@ object MiniClusterConfig {
implicit val flinkConfigurationValueReader: ValueReader[Configuration] =
Ficus.mapValueReader[String].map(map => Configuration.fromMap(map.asJava))

private[nussknacker] val defaultMiniClusterConfig: Configuration = {
val config = new Configuration
config.set[Integer](RestOptions.PORT, 0)
// FIXME: reversing flink default order
config.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")
// In some setups we create a few Flink DMs. Each of them creates its own mini cluster.
// To reduce footprint we decrease off-heap memory buffers size and managed memory
config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m"))
config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m"))
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("50m"))
config
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,11 @@ object MiniClusterFactoryReflectiveInvoker {
)
val (miniCluster, streamExecutionEnvironmentFactory) =
methodInvoker.invokeStaticMethod(modelClassLoader, miniClusterConfig, streamExecutionEnvConfig)
val totalSlots =
miniClusterConfig.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS) *
miniClusterConfig.get(TaskManagerOptions.NUM_TASK_SLOTS)
new MiniClusterWithServices(miniCluster, totalSlots, streamExecutionEnvironmentFactory)
new MiniClusterWithServices(miniCluster, streamExecutionEnvironmentFactory)
}

class MiniClusterWithServices(
miniCluster: MiniCluster,
val totalSlots: Int,
streamExecutionEnvironmentFactory: Boolean => StreamExecutionEnvironment
) extends AutoCloseable {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.management.scenariotesting

import io.circe.Json
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
Expand All @@ -16,6 +15,9 @@ class FlinkProcessTestRunner(
miniClusterWithServicesOpt: Option[MiniClusterWithServices]
) {

// TODO: configurable?
private val ScenarioTestingParallelism = 1

// We use reflection, because we don't want to bundle flinkExecutor.jar inside flinkDeploymentManager assembly jar
// because it is already in separate assembly for purpose of sending it to Flink during deployment.
// Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded
Expand All @@ -30,7 +32,7 @@ class FlinkProcessTestRunner(
implicit ec: ExecutionContext
): Future[TestResults[Json]] = {
val streamExecutionEnvWithTotalSlots = miniClusterWithServicesOpt.map(miniClusterWithServices =>
(miniClusterWithServices.createStreamExecutionEnvironment(attached = true), miniClusterWithServices.totalSlots)
(miniClusterWithServices.createStreamExecutionEnvironment(attached = true), ScenarioTestingParallelism)
)
val resultFuture = mainRunner.invokeStaticMethod(
streamExecutionEnvWithTotalSlots,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class FlinkProcessVerifier(
): Try[Unit] = {
val processId = processVersion.processName
val streamExecutionEnvWithTotalSlots = miniClusterWithServicesOpt.map(miniClusterWithServices =>
(miniClusterWithServices.createStreamExecutionEnvironment(attached = true), miniClusterWithServices.totalSlots)
(miniClusterWithServices.createStreamExecutionEnvironment(attached = true), 1)
)
try {
logger.info(s"Starting to verify $processId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import pl.touk.nussknacker.engine.flink.test.{
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter}
import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition}
import pl.touk.nussknacker.engine.management.MiniClusterConfig
import pl.touk.nussknacker.engine.management.minicluster.MiniClusterFactoryReflectiveInvoker
import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunnerSpec.{
fragmentWithValidationName,
Expand Down Expand Up @@ -80,7 +79,7 @@ class FlinkProcessTestRunnerSpec

private val miniClusterWithServices = MiniClusterFactoryReflectiveInvoker.create(
modelClassLoader,
miniClusterConfig = MiniClusterConfig.defaultMiniClusterConfig,
miniClusterConfig = new Configuration,
streamExecutionEnvConfig = new Configuration
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName
import pl.touk.nussknacker.engine.kafka.source.InputMeta
import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator
import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator.ResultsHolders
import pl.touk.nussknacker.engine.management.MiniClusterConfig
import pl.touk.nussknacker.engine.management.minicluster.MiniClusterFactoryReflectiveInvoker
import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunner
import pl.touk.nussknacker.engine.spel.SpelExtension._
Expand Down Expand Up @@ -68,7 +67,7 @@ class KafkaScenarioTestingSpec

private val miniClusterWithServices = MiniClusterFactoryReflectiveInvoker.create(
modelData.modelClassLoader,
miniClusterConfig = MiniClusterConfig.defaultMiniClusterConfig,
miniClusterConfig = new Configuration,
streamExecutionEnvConfig = new Configuration
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleV
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName
import pl.touk.nussknacker.engine.kafka.source.InputMeta
import pl.touk.nussknacker.engine.management.MiniClusterConfig
import pl.touk.nussknacker.engine.management.minicluster.MiniClusterFactoryReflectiveInvoker
import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunner
import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder
Expand Down Expand Up @@ -89,7 +88,7 @@ class SchemedKafkaScenarioTestingSpec

private val miniClusterWithServices = MiniClusterFactoryReflectiveInvoker.create(
modelData.modelClassLoader,
miniClusterConfig = MiniClusterConfig.defaultMiniClusterConfig,
miniClusterConfig = new Configuration,
streamExecutionEnvConfig = new Configuration
)

Expand Down

0 comments on commit 526c5e5

Please sign in to comment.