Skip to content

Commit

Permalink
[NU-1979] Scenario testing classes moved to minicluster module + remo…
Browse files Browse the repository at this point in the history
…ved flinkExecutor -> miniCluster dependency
  • Loading branch information
arkadius committed Jan 29, 2025
1 parent 3698088 commit 8b9c668
Show file tree
Hide file tree
Showing 19 changed files with 203 additions and 126 deletions.
24 changes: 13 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -618,13 +618,12 @@ lazy val flinkDeploymentManager = (project in flink("management"))
dependencyOverrides += "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV
)
.dependsOn(
flinkMiniCluster,
deploymentManagerApi % Provided,
flinkMiniCluster,
commonUtils % Provided,
utilsInternal % Provided,
httpUtils % Provided,
flinkScalaUtils % Provided,
// test->test dependency is needed to load SimpleProcessConfigCreator
flinkExecutor % "test,test->test",
flinkExecutor % Test,
flinkTestUtils % "it,test",
kafkaTestUtils % "it,test"
)
Expand Down Expand Up @@ -705,7 +704,6 @@ lazy val flinkTests = (project in flink("tests"))
)
.dependsOn(
defaultModel % Test,
flinkExecutor % Test,
flinkKafkaComponents % Test,
flinkBaseComponents % Test,
flinkBaseUnboundedComponents % Test,
Expand All @@ -715,6 +713,7 @@ lazy val flinkTests = (project in flink("tests"))
flinkComponentsTestkit % Test,
flinkDeploymentManager % Test,
// test->test dependencies are needed to load components from these modules
flinkExecutor % "test,test->test",
flinkKafkaComponentsUtils % "test->test",
flinkSchemedKafkaComponentsUtils % "test->test",
// for local development
Expand Down Expand Up @@ -752,12 +751,11 @@ lazy val flinkExecutor = (project in flink("executor"))
Seq(
// Dependencies below are provided by flink-dist jar in production flink or by flink DM for scenario testing/state verification purpose
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"org.apache.flink" % "flink-runtime" % flinkV % Provided,
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV % Provided,
// This dependency must be provided, because some cloud providers, such as Ververica, already have it on their classpath, which may cause a conflict
"org.apache.flink" % "flink-metrics-dropwizard" % flinkV % Provided,
)
} ++ flinkLibScalaDeps(scalaVersion.value, Some(Provided)),
},
prepareItLibs := {
val workTarget = (ThisScope / baseDirectory).value / "target" / "it-libs"
val artifacts = (ThisScope / additionalBundledArtifacts).value
Expand All @@ -781,9 +779,8 @@ lazy val flinkExecutor = (project in flink("executor"))
.dependsOn(
flinkComponentsUtils,
flinkExtensionsApi,
// TODO: it can be removed when fully switch to shared mini cluster, see AdHocMiniClusterFallbackHandler
flinkMiniCluster,
scenarioCompiler,
utilsInternal,
// Various components uses one of library in stack: sttp -> async-http-client -> netty
// Different versions of netty which is on the bottom of this stack causes NoClassDefFoundError.
// To overcome this problem and reduce size of model jar bundle, we add http utils as a compile time dependency.
Expand Down Expand Up @@ -1208,12 +1205,17 @@ lazy val flinkMiniCluster = (project in flink("minicluster"))
ExclusionRule("org.slf4j", "slf4j-log4j12"),
ExclusionRule("com.esotericsoftware", "kryo-shaded"),
),
"org.apache.flink" % "flink-runtime" % flinkV,
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV,
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV,
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV,
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV % Provided,
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV % Provided,
) ++ flinkLibScalaDeps(scalaVersion.value)
}
)
.dependsOn(
extensionsApi % Provided,
utilsInternal % Provided,
)

lazy val flinkTestUtils = (project in flink("test-utils"))
.settings(commonSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterConfig
import pl.touk.nussknacker.engine.management.{FlinkDeploymentManager, FlinkStreamingDeploymentManagerProvider}
import pl.touk.nussknacker.engine.process.minicluster.MiniClusterConfig
import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ModelClassLoader}
import pl.touk.nussknacker.test.config.ConfigWithScalaVersion
import pl.touk.nussknacker.test.utils.domain.TestFactory
Expand Down Expand Up @@ -85,7 +85,7 @@ class MockDeploymentManager private (
),
shouldVerifyBeforeDeploy = false,
mainClassName = "UNUSED",
scenarioTestingConfig = MiniClusterConfig()
scenarioTestingConfig = FlinkMiniClusterConfig()
) {

import MockDeploymentManager._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.ActorSystem
import cats.data.{Validated, ValidatedNel}
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
Expand All @@ -12,9 +13,9 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner
import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig
import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunner
import pl.touk.nussknacker.engine.process.minicluster.{MiniClusterConfig, MiniClusterFactory}

import java.util.UUID
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -42,15 +43,15 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode
private val random = new scala.util.Random()

private val miniClusterWithServices =
MiniClusterFactory
.createMiniClusterWithServicesIfConfigured(
FlinkMiniClusterFactory
.createMiniClusterWithServices(
modelData.modelClassLoader,
MiniClusterConfig()
new Configuration,
new Configuration
)
.get

private lazy val flinkTestRunner =
new FlinkProcessTestRunner(
new FlinkMiniClusterScenarioTestRunner(
modelData,
Some(miniClusterWithServices)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import cats.data.Validated.valid
import cats.data.ValidatedNel
import com.typesafe.config.Config
import io.circe.Json
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus}
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner
import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig
import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunner
import pl.touk.nussknacker.engine.newdeployment.DeploymentId
import pl.touk.nussknacker.engine.process.minicluster.{MiniClusterConfig, MiniClusterFactory}
import pl.touk.nussknacker.engine.testing.StubbingCommands
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults

Expand Down Expand Up @@ -58,16 +59,17 @@ object MockableDeploymentManagerProvider {
with ManagerSpecificScenarioActivitiesStoredByManager
with StubbingCommands {

private lazy val miniClusterWithServicesOpt = modelDataOpt.flatMap { modelData =>
MiniClusterFactory.createMiniClusterWithServicesIfConfigured(
private lazy val miniClusterWithServicesOpt = modelDataOpt.map { modelData =>
FlinkMiniClusterFactory.createMiniClusterWithServices(
modelData.modelClassLoader,
MiniClusterConfig()
new Configuration,
new Configuration
)
}

private lazy val testRunnerOpt =
modelDataOpt.map { modelData =>
new FlinkProcessTestRunner(
new FlinkMiniClusterScenarioTestRunner(
modelData,
miniClusterWithServicesOpt
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData}
import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.scenariotesting.legacyadhocminicluster.LegacyAdHocMiniClusterFallbackHandler
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig}
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.engine.testmode.{
Expand Down Expand Up @@ -44,7 +45,7 @@ class FlinkTestMain(
) {

private val adHocMiniClusterFallbackHandler =
new AdHocMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario testing")
new LegacyAdHocMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario testing")

def testScenario(scenario: CanonicalProcess, scenarioTestData: ScenarioTestData): Future[TestResults[Json]] = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.scenariotesting.legacyadhocminicluster.LegacyAdHocMiniClusterFallbackHandler
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig}
import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestServiceInvocationCollector}

Expand All @@ -23,7 +24,7 @@ object FlinkVerificationMain {
new FlinkVerificationMain(
sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithParallelismOverride.apply _ tupled),
modelData
).runTest(
).verifyScenarioState(
scenario,
processVersion,
savepointPath
Expand All @@ -37,9 +38,9 @@ class FlinkVerificationMain(
) {

private val adHocMiniClusterFallbackHandler =
new AdHocMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario state verification")
new LegacyAdHocMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario state verification")

def runTest(scenario: CanonicalProcess, processVersion: ProcessVersion, savepointPath: String): Unit = {
def verifyScenarioState(scenario: CanonicalProcess, processVersion: ProcessVersion, savepointPath: String): Unit = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
try {
adHocMiniClusterFallbackHandler.handleAdHocMniClusterFallback(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package pl.touk.nussknacker.engine.process.scenariotesting.legacyadhocminicluster

import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.minicluster.MiniCluster
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker

import java.net.URLClassLoader

object FlinkMiniClusterFactoryReflectiveInvoker {

private val factoryInvoker = new ReflectiveMethodInvoker[(MiniCluster, Boolean => StreamExecutionEnvironment)](
getClass.getClassLoader,
"pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory",
"createMiniClusterWithServicesRaw"
)

def createMiniClusterWithServicesRaw(
modelClassLoader: URLClassLoader,
miniClusterConfigOverrides: Configuration,
streamExecutionConfigOverrides: Configuration
): (MiniCluster, Boolean => StreamExecutionEnvironment) =
factoryInvoker.invokeStaticMethod(modelClassLoader, miniClusterConfigOverrides, streamExecutionConfigOverrides)

}
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package pl.touk.nussknacker.engine.process.scenariotesting
package pl.touk.nussknacker.engine.process.scenariotesting.legacyadhocminicluster

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
import org.apache.flink.configuration.{Configuration, TaskManagerOptions}
import org.apache.flink.runtime.minicluster.MiniCluster
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.process.minicluster.MiniClusterFactory
import pl.touk.nussknacker.engine.process.scenariotesting.StreamExecutionEnvironmentWithParallelismOverride
import pl.touk.nussknacker.engine.util.MetaDataExtractor
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader

import scala.concurrent.{ExecutionContext, Future}

// This class handles a legacy ad-hoc way to create minicluster.
// TODO: After we fully switch to shared mini cluster approach, it should be removed
class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCaseForDebug: String) extends LazyLogging {
class LegacyAdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCaseForDebug: String) extends LazyLogging {

def handleAdHocMniClusterFallbackAsync[R](
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithParallelismOverride],
Expand Down Expand Up @@ -63,15 +63,12 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas
.getOrElse(1)
val legacyMiniClusterConfigOverrides = new Configuration
legacyMiniClusterConfigOverrides.set[java.lang.Integer](TaskManagerOptions.NUM_TASK_SLOTS, scenarioParallelism)
val miniClusterWithServices = MiniClusterFactory.createMiniClusterWithServices(
val (miniCluster, createStreamExecutionEnvironment) = FlinkMiniClusterFactoryReflectiveInvoker.createMiniClusterWithServicesRaw(
modelClassLoader,
legacyMiniClusterConfigOverrides,
new Configuration()
)
AdHocMiniClusterResources(
miniClusterWithServices.miniCluster,
miniClusterWithServices.createStreamExecutionEnvironment(attached = true)
)
AdHocMiniClusterResources(miniCluster, createStreamExecutionEnvironment(true))
}

private case class AdHocMiniClusterResources(miniCluster: MiniCluster, env: StreamExecutionEnvironment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.management
import net.ceedubs.ficus.Ficus
import net.ceedubs.ficus.readers.ValueReader
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.engine.process.minicluster.MiniClusterConfig
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterConfig

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters._
Expand All @@ -25,7 +25,7 @@ final case class FlinkConfig(
FlinkWaitForDuringDeployFinishedConfig(enabled = true, Some(180), Some(1 second)),
scenarioStateRequestTimeout: FiniteDuration = 3 seconds,
jobConfigsCacheSize: Int = 1000,
miniCluster: MiniClusterConfig = MiniClusterConfig()
miniCluster: FlinkMiniClusterConfig = FlinkMiniClusterConfig()
)

object FlinkConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId}
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.{
FlinkMiniClusterScenarioStateVerifier,
FlinkMiniClusterScenarioTestRunner
}
import pl.touk.nussknacker.engine.flink.minicluster.{FlinkMiniClusterConfig, FlinkMiniClusterFactory}
import pl.touk.nussknacker.engine.management.FlinkDeploymentManager.prepareProgramArgs
import pl.touk.nussknacker.engine.management.scenariotesting.{FlinkProcessTestRunner, FlinkProcessVerifier}
import pl.touk.nussknacker.engine.process.minicluster.{MiniClusterConfig, MiniClusterFactory}
import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment}

import scala.concurrent.Future
Expand All @@ -23,26 +26,26 @@ abstract class FlinkDeploymentManager(
dependencies: DeploymentManagerDependencies,
shouldVerifyBeforeDeploy: Boolean,
mainClassName: String,
scenarioTestingConfig: MiniClusterConfig
scenarioTestingConfig: FlinkMiniClusterConfig
) extends DeploymentManager
with LazyLogging {

import dependencies._

private val miniClusterWithServicesOpt = {
MiniClusterFactory.createMiniClusterWithServicesIfConfigured(
FlinkMiniClusterFactory.createMiniClusterWithServicesIfConfigured(
modelData.modelClassLoader,
scenarioTestingConfig
)
}

private val testRunner = new FlinkProcessTestRunner(
private val testRunner = new FlinkMiniClusterScenarioTestRunner(
modelData,
miniClusterWithServicesOpt
.filter(_ => scenarioTestingConfig.useForScenarioTesting)
)

private val verification = new FlinkProcessVerifier(
private val verification = new FlinkMiniClusterScenarioStateVerifier(
modelData,
miniClusterWithServicesOpt
.filter(_ => scenarioTestingConfig.useForScenarioStateVerification)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package pl.touk.nussknacker.engine.process.minicluster
package pl.touk.nussknacker.engine.flink.minicluster

import org.apache.flink.configuration.Configuration

final case class MiniClusterConfig(
// TODO: remove after fully migration, see AdHocMiniClusterFallbackHandler
final case class FlinkMiniClusterConfig(
// TODO: remove after fully migration, see LegacyAdHocMiniClusterFallbackHandler
useForScenarioTesting: Boolean = true,
useForScenarioStateVerification: Boolean = true,
config: Configuration = new Configuration,
Expand Down
Loading

0 comments on commit 8b9c668

Please sign in to comment.