Skip to content

Commit

Permalink
StreamExecutionEnvironment and MiniCluster created once
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 20, 2025
1 parent a2f1899 commit 8118f73
Show file tree
Hide file tree
Showing 20 changed files with 409 additions and 305 deletions.
31 changes: 18 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,8 @@ lazy val flinkDeploymentManager = (project in flink("management"))
componentsApi % Provided,
httpUtils % Provided,
flinkScalaUtils % Provided,
flinkTestUtils % IntegrationTest,
flinkExecutor % Test,
flinkTestUtils % "it,test",
kafkaTestUtils % "it,test"
)

Expand Down Expand Up @@ -742,18 +743,22 @@ lazy val flinkTests = (project in flink("tests"))
}
)
.dependsOn(
defaultModel % Test,
flinkExecutor % Test,
flinkKafkaComponents % Test,
flinkBaseComponents % Test,
flinkBaseUnboundedComponents % Test,
flinkTableApiComponents % Test,
flinkTestUtils % Test,
kafkaTestUtils % Test,
flinkComponentsTestkit % Test,
defaultModel % Test,
flinkExecutor % Test,
flinkKafkaComponents % Test,
flinkBaseComponents % Test,
flinkBaseUnboundedComponents % Test,
flinkTableApiComponents % Test,
flinkTestUtils % Test,
kafkaTestUtils % Test,
flinkComponentsTestkit % Test,
flinkDeploymentManager % Test,
// TODO: cleanup kafka testsmechanism tests in order to remove test->test dependency
flinkKafkaComponentsUtils % "test->test",
flinkSchemedKafkaComponentsUtils % "test->test",
// for local development
designer % Test,
deploymentManagerApi % Test
designer % Test,
deploymentManagerApi % Test
)

lazy val defaultModel = (project in (file("defaultModel")))
Expand Down Expand Up @@ -997,7 +1002,7 @@ lazy val flinkSchemedKafkaComponentsUtils = (project in flink("schemed-kafka-com
componentsUtils % Provided,
kafkaTestUtils % Test,
flinkTestUtils % Test,
flinkExecutor % Test
flinkExecutor % Test,
)

lazy val flinkKafkaComponentsUtils = (project in flink("kafka-components-utils"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.ActorSystem
import cats.data.{Validated, ValidatedNel}
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.development.manager.DevelopmentStateStatus._
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.ProcessVersion
Expand All @@ -19,7 +20,8 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.management.{FlinkProcessTestRunner, FlinkStreamingPropertiesConfig}
import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig
import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner

import java.net.URI
import java.util.UUID
Expand Down Expand Up @@ -48,7 +50,8 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode
private val memory: TrieMap[ProcessName, StatusDetails] = TrieMap[ProcessName, StatusDetails]()
private val random = new scala.util.Random()

private lazy val flinkTestRunner = new FlinkProcessTestRunner(modelData.asInvokableModelData)
private lazy val flinkTestRunner =
new FlinkProcessTestRunner(modelData.asInvokableModelData, parallelism = 1, new Configuration())

implicit private class ProcessStateExpandable(processState: StatusDetails) {

Expand Down Expand Up @@ -83,7 +86,10 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode
case command: DMRunOffScheduleCommand => runOffSchedule(command)
case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult(""))
case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) =>
flinkTestRunner.test(canonicalProcess, scenarioTestData) // it's just for streaming e2e tests from file purposes
flinkTestRunner.runTestsAsync(
canonicalProcess,
scenarioTestData
) // it's just for streaming e2e tests from file purposes
}

private def description(canonicalProcess: CanonicalProcess) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import cats.data.Validated.valid
import cats.data.ValidatedNel
import com.typesafe.config.Config
import io.circe.Json
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager
import pl.touk.nussknacker.engine.ModelData.BaseModelDataExt
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
import pl.touk.nussknacker.engine.api.definition.{NotBlankParameterValidator, StringParameterEditor}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus}
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId
import pl.touk.nussknacker.engine.management.{FlinkProcessTestRunner, FlinkStreamingPropertiesConfig}
import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig
import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner
import pl.touk.nussknacker.engine.newdeployment.DeploymentId
import pl.touk.nussknacker.engine.testing.StubbingCommands
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
Expand Down Expand Up @@ -59,7 +60,9 @@ object MockableDeploymentManagerProvider {
with StubbingCommands {

private lazy val testRunnerOpt =
modelDataOpt.map(modelData => new FlinkProcessTestRunner(modelData.asInvokableModelData))
modelDataOpt.map(modelData =>
new FlinkProcessTestRunner(modelData.asInvokableModelData, parallelism = 1, new Configuration())
)

override def resolve(
idWithName: ProcessIdWithName,
Expand Down Expand Up @@ -102,7 +105,7 @@ object MockableDeploymentManagerProvider {
.get()
.get(processVersion.processName.value)
.map(Future.successful)
.orElse(testRunnerOpt.map(_.test(scenario, testData)))
.orElse(testRunnerOpt.map(_.runTestsAsync(scenario, testData)))
.getOrElse(
throw new IllegalArgumentException(
s"Tests results not mocked for scenario [${processVersion.processName.value}] and no model data provided"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,74 +1,73 @@
package pl.touk.nussknacker.engine.process.runner

import io.circe.Json
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import org.apache.flink.runtime.minicluster.MiniCluster
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData}
import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.testmechanism.FlinkStubbedRunner
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig}
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.engine.testmode.{
ResultsCollectingListener,
ResultsCollectingListenerHolder,
TestServiceInvocationCollector
}
import pl.touk.nussknacker.engine.util.MetaDataExtractor

object FlinkTestMain extends FlinkRunner {

def run(
miniCluster: MiniCluster,
env: StreamExecutionEnvironment,
modelData: ModelData,
process: CanonicalProcess,
scenarioTestData: ScenarioTestData,
configuration: Configuration,
scenarioTestData: ScenarioTestData
): TestResults[Json] = {
val processVersion = ProcessVersion.empty.copy(processName =
ProcessName("snapshot version")
) // testing process may be unreleased, so it has no version
new FlinkTestMain(
miniCluster,
env,
modelData,
process,
scenarioTestData,
processVersion,
DeploymentData.empty.copy(additionalModelConfigs =
AdditionalModelConfigs(modelData.additionalConfigsFromProvider)
),
configuration
)
).runTest
}

}

class FlinkTestMain(
miniCluster: MiniCluster,
env: StreamExecutionEnvironment,
modelData: ModelData,
process: CanonicalProcess,
scenarioTestData: ScenarioTestData,
processVersion: ProcessVersion,
deploymentData: DeploymentData,
configuration: Configuration
deploymentData: DeploymentData
) {

private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration)
private val stubbedRunner = new FlinkStubbedRunner(miniCluster, env)

def runTest: TestResults[Json] = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
try {
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar(collectingListener, scenarioTestData)
val parallelism = MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
val env = stubbedRunner.createEnv(parallelism)

registrar.register(env, process, processVersion, deploymentData, resultCollector)
stubbedRunner.execute(env, parallelism, process.name, SavepointRestoreSettings.none())
stubbedRunner.execute(process.name, SavepointRestoreSettings.none(), modelData.modelClassLoader)
collectingListener.results
} finally {
collectingListener.clean()
Expand Down
Loading

0 comments on commit 8118f73

Please sign in to comment.