Skip to content

Commit

Permalink
[NU-1979] StreamExecutionEnv attached job execution optional
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 28, 2025
1 parent 02b3168 commit 854e024
Show file tree
Hide file tree
Showing 13 changed files with 263 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode
case command: DMRunOffScheduleCommand => runOffSchedule(command)
case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult(""))
case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) =>
flinkTestRunner.runTestsAsync(
flinkTestRunner.runTests(
canonicalProcess,
scenarioTestData
) // it's just for streaming e2e tests from file purposes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ object MockableDeploymentManagerProvider {
.get()
.get(processVersion.processName.value)
.map(Future.successful)
.orElse(testRunnerOpt.map(_.runTestsAsync(scenario, testData)))
.orElse(testRunnerOpt.map(_.runTests(scenario, testData)))
.getOrElse(
throw new IllegalArgumentException(
s"Tests results not mocked for scenario [${processVersion.processName.value}] and no model data provided"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ object MiniClusterFactory extends LazyLogging {
modelClassLoader: ModelClassLoader,
miniClusterConfig: Configuration,
streamExecutionConfig: Configuration
): (MiniCluster, () => StreamExecutionEnvironment) = {
): (MiniCluster, Boolean => StreamExecutionEnvironment) = {
logger.debug(s"Creating MiniCluster with configuration: $miniClusterConfig")
val miniCluster = createMiniCluster(miniClusterConfig)
logger.debug(
s"Creating local StreamExecutionEnvironment with configuration = $streamExecutionConfig"
)
def createStreamExecutionEnv(): StreamExecutionEnvironment =
def createStreamExecutionEnv(attached: Boolean): StreamExecutionEnvironment = {
MiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment(
miniCluster,
modelClassLoader,
streamExecutionConfig
streamExecutionConfig,
attached
)
}

(miniCluster, createStreamExecutionEnv)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ object MiniClusterStreamExecutionEnvironmentFactory {
def createStreamExecutionEnvironment(
miniCluster: MiniCluster,
modelClassLoader: ModelClassLoader,
configuration: Configuration
configuration: Configuration,
attached: Boolean
): StreamExecutionEnvironment = {
val pipelineExecutorServiceLoader = createPipelineExecutorServiceLoader(miniCluster, modelClassLoader)
configuration.set(DeploymentOptions.TARGET, pipelineExecutorName)
configuration.set(PipelineOptions.CLASSPATHS, modelClassLoader.urls.map(_.toString).asJava)
// FIXME abr: move to FlinkTestMain?
configuration.set[java.lang.Boolean](DeploymentOptions.ATTACHED, true)
configuration.set[java.lang.Boolean](DeploymentOptions.ATTACHED, attached)
new StreamExecutionEnvironment(pipelineExecutorServiceLoader, configuration, modelClassLoader)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,41 @@ import pl.touk.nussknacker.engine.process.minicluster.MiniClusterFactory
import pl.touk.nussknacker.engine.util.MetaDataExtractor
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader

import scala.concurrent.{ExecutionContext, Future}

// This class handles a legacy ad-hoc way to create minicluster.
// TODO: After we fully switch to shared mini cluster approach, it should be removed
class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCaseForDebug: String) extends LazyLogging {

def handleAdHocMniClusterFallbackAsync[R](
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots],
scenario: CanonicalProcess
)(f: StreamExecutionEnvironmentWithTotalSlots => Future[R])(implicit ec: ExecutionContext): Future[R] = {
val (allocatedMiniClusterResourcesOpt, streamEnvWithMaxParallelism) =
useSharedMiniClusterOrAdHoc(sharedMiniClusterServicesOpt, scenario)
val resultFuture = f(streamEnvWithMaxParallelism)
resultFuture.onComplete(_ => allocatedMiniClusterResourcesOpt.foreach(_.close()))
resultFuture
}

def handleAdHocMniClusterFallback[R](
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots],
scenario: CanonicalProcess
)(f: StreamExecutionEnvironmentWithTotalSlots => R): R = {
val (allocatedMiniClusterResourcesOpt, streamEnvWithMaxParallelism) = sharedMiniClusterServicesOpt
val (allocatedMiniClusterResourcesOpt, streamEnvWithMaxParallelism) =
useSharedMiniClusterOrAdHoc(sharedMiniClusterServicesOpt, scenario)
try {
f(streamEnvWithMaxParallelism)
} finally {
allocatedMiniClusterResourcesOpt.foreach(_.close())
}
}

private def useSharedMiniClusterOrAdHoc[R](
sharedMiniClusterServicesOpt: Option[StreamExecutionEnvironmentWithTotalSlots],
scenario: CanonicalProcess
): (Option[AdHocMiniClusterResources], StreamExecutionEnvironmentWithTotalSlots) = {
sharedMiniClusterServicesOpt
.map { sharedMiniClusterServices =>
logger.debug(s"Shared MiniCluster used for $useCaseForDebug")
(None, sharedMiniClusterServices)
Expand All @@ -28,11 +54,6 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas
val resources = createAdHocMiniClusterResources(scenario)
(Some(resources), StreamExecutionEnvironmentWithTotalSlots.withoutMaxParallelism(resources.env))
}
try {
f(streamEnvWithMaxParallelism)
} finally {
allocatedMiniClusterResourcesOpt.foreach(_.close())
}
}

private def createAdHocMiniClusterResources(process: CanonicalProcess) = {
Expand All @@ -50,7 +71,7 @@ class AdHocMiniClusterFallbackHandler(modelClassLoader: ModelClassLoader, useCas
legacyMiniClusterConfig,
new Configuration()
)
AdHocMiniClusterResources(miniCluster, createEnv())
AdHocMiniClusterResources(miniCluster, createEnv(true))
}

private case class AdHocMiniClusterResources(miniCluster: MiniCluster, env: StreamExecutionEnvironment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import pl.touk.nussknacker.engine.testmode.{
TestServiceInvocationCollector
}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, blocking}

object FlinkTestMain {

// This method is invoked via reflection from module (Flink DM) without shared API classes, so simple types should be used
Expand All @@ -25,7 +28,7 @@ object FlinkTestMain {
modelData: ModelData,
scenario: CanonicalProcess,
scenarioTestData: ScenarioTestData
): TestResults[Json] = {
): Future[TestResults[Json]] = {
new FlinkTestMain(
sharedMiniClusterServicesOpt.map(StreamExecutionEnvironmentWithTotalSlots.apply _ tupled),
modelData
Expand All @@ -43,35 +46,40 @@ class FlinkTestMain(
private val adHocMiniClusterFallbackHandler =
new AdHocMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario testing")

def testScenario(scenario: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = {
def testScenario(scenario: CanonicalProcess, scenarioTestData: ScenarioTestData): Future[TestResults[Json]] = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
try {
adHocMiniClusterFallbackHandler.handleAdHocMniClusterFallback(
sharedMiniClusterServicesOpt,
scenario,
) { streamExecutionEnvWithMaxParallelism =>
import streamExecutionEnvWithMaxParallelism._
val alignedScenario = streamExecutionEnvWithMaxParallelism.alignParallelismIfNeeded(scenario)
val resultCollector = new TestServiceInvocationCollector(collectingListener)
// ProcessVersion can't be passed from DM because testing mechanism can be used with not saved scenario
val processVersion = ProcessVersion.empty.copy(processName = alignedScenario.name)
val deploymentData = DeploymentData.empty.copy(additionalModelConfigs =
AdditionalModelConfigs(modelData.additionalConfigsFromProvider)
)
val registrar = prepareRegistrar(collectingListener, alignedScenario, scenarioTestData, processVersion)
streamExecutionEnv.getCheckpointConfig.disableCheckpointing()
registrar.register(
streamExecutionEnv,
alignedScenario,
processVersion,
deploymentData,
resultCollector
)
streamExecutionEnv.execute(alignedScenario.name.value)
collectingListener.results
adHocMiniClusterFallbackHandler.handleAdHocMniClusterFallbackAsync(
sharedMiniClusterServicesOpt,
scenario,
) { streamExecutionEnvWithMaxParallelism =>
import streamExecutionEnvWithMaxParallelism._
val alignedScenario = streamExecutionEnvWithMaxParallelism.alignParallelismIfNeeded(scenario)
val resultCollector = new TestServiceInvocationCollector(collectingListener)
// ProcessVersion can't be passed from DM because testing mechanism can be used with not saved scenario
val processVersion = ProcessVersion.empty.copy(processName = alignedScenario.name)
val deploymentData = DeploymentData.empty.copy(additionalModelConfigs =
AdditionalModelConfigs(modelData.additionalConfigsFromProvider)
)
val registrar = prepareRegistrar(collectingListener, alignedScenario, scenarioTestData, processVersion)
streamExecutionEnv.getCheckpointConfig.disableCheckpointing()
registrar.register(
streamExecutionEnv,
alignedScenario,
processVersion,
deploymentData,
resultCollector
)
// TODO: Non-blocking future periodically checking if job is finished
val resultFuture = Future {
blocking {
streamExecutionEnv.execute(alignedScenario.name.value)
collectingListener.results
}
}
resultFuture.onComplete { _ =>
collectingListener.clean()
}
} finally {
collectingListener.clean()
resultFuture
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ abstract class FlinkDeploymentManager(
makeSavepoint(_, savepointDir)
}
case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) =>
testRunner.runTestsAsync(canonicalProcess, scenarioTestData)
testRunner.runTests(canonicalProcess, scenarioTestData)
case _: DMRunOffScheduleCommand => notImplemented
}

Expand Down Expand Up @@ -241,7 +241,7 @@ abstract class FlinkDeploymentManager(
processVersion: ProcessVersion
): Future[Unit] =
if (shouldVerifyBeforeDeploy)
verification.verify(processVersion, canonicalProcess, savepointPath)
Future.fromTry(verification.verify(processVersion, canonicalProcess, savepointPath))
else Future.successful(())

private def stopSavingSavepoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object MiniClusterFactoryReflectiveInvoker {
miniClusterConfig: Configuration,
streamExecutionEnvConfig: Configuration
): MiniClusterWithServices = {
val methodInvoker = new ReflectiveMethodInvoker[(MiniCluster, () => StreamExecutionEnvironment)](
val methodInvoker = new ReflectiveMethodInvoker[(MiniCluster, Boolean => StreamExecutionEnvironment)](
modelClassLoader,
"pl.touk.nussknacker.engine.process.minicluster.MiniClusterFactory",
"createMiniClusterWithStreamExecutionEnvironmentFactory"
Expand All @@ -38,15 +38,18 @@ object MiniClusterFactoryReflectiveInvoker {
val totalSlots =
miniClusterConfig.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS) *
miniClusterConfig.get(TaskManagerOptions.NUM_TASK_SLOTS)
MiniClusterWithServices(miniCluster, totalSlots, streamExecutionEnvironmentFactory)
new MiniClusterWithServices(miniCluster, totalSlots, streamExecutionEnvironmentFactory)
}

case class MiniClusterWithServices(
class MiniClusterWithServices(
miniCluster: MiniCluster,
totalSlots: Int,
streamExecutionEnvironmentFactory: () => StreamExecutionEnvironment
val totalSlots: Int,
streamExecutionEnvironmentFactory: Boolean => StreamExecutionEnvironment
) extends AutoCloseable {

def createStreamExecutionEnvironment(attached: Boolean): StreamExecutionEnvironment =
streamExecutionEnvironmentFactory(attached)

override def close(): Unit = miniCluster.close()

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,29 @@ class FlinkProcessTestRunner(
// We use reflection, because we don't want to bundle flinkExecutor.jar inside flinkDeploymentManager assembly jar
// because it is already in separate assembly for purpose of sending it to Flink during deployment.
// Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded
private val mainRunner = new ReflectiveMethodInvoker[TestResults[Json]](
private val mainRunner = new ReflectiveMethodInvoker[Future[TestResults[Json]]](
modelData.modelClassLoader,
"pl.touk.nussknacker.engine.process.scenariotesting.FlinkTestMain",
"run"
)

def runTestsAsync(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)(
// NU-1455: We encode variable on the engine, because of classLoader's problems
def runTests(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)(
implicit ec: ExecutionContext
): Future[TestResults[Json]] = {
// FIXME: make only execution asynchronous
Future {
runTests(canonicalProcess, scenarioTestData)
}
}

// NU-1455: We encode variable on the engine, because of classLoader's problems
def runTests(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = {
val streamExecutionEnvWithTotalSlots = miniClusterWithServicesOpt.map(miniClusterWithServices =>
(miniClusterWithServices.streamExecutionEnvironmentFactory(), miniClusterWithServices.totalSlots)
(miniClusterWithServices.createStreamExecutionEnvironment(attached = true), miniClusterWithServices.totalSlots)
)
val resultFuture = mainRunner.invokeStaticMethod(
streamExecutionEnvWithTotalSlots,
modelData,
canonicalProcess,
scenarioTestData
)
try {
mainRunner.invokeStaticMethod(
streamExecutionEnvWithTotalSlots,
modelData,
canonicalProcess,
scenarioTestData
)
} finally {
resultFuture.onComplete { _ =>
streamExecutionEnvWithTotalSlots.foreach(_._1.close())
}
resultFuture
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.management.minicluster.MiniClusterFactoryReflectiveInvoker.MiniClusterWithServices
import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker

import scala.concurrent.Future
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

class FlinkProcessVerifier(
modelData: ModelData,
Expand All @@ -28,10 +28,10 @@ class FlinkProcessVerifier(
processVersion: ProcessVersion,
canonicalProcess: CanonicalProcess,
savepointPath: String
): Future[Unit] = {
): Try[Unit] = {
val processId = processVersion.processName
val streamExecutionEnvWithTotalSlots = miniClusterWithServicesOpt.map(miniClusterWithServices =>
(miniClusterWithServices.streamExecutionEnvironmentFactory(), miniClusterWithServices.totalSlots)
(miniClusterWithServices.createStreamExecutionEnvironment(attached = true), miniClusterWithServices.totalSlots)
)
try {
logger.info(s"Starting to verify $processId")
Expand All @@ -43,11 +43,11 @@ class FlinkProcessVerifier(
savepointPath
)
logger.info(s"Verification of $processId successful")
Future.successful(())
Success(())
} catch {
case NonFatal(e) =>
logger.info(s"Failed to verify $processId", e)
Future.failed(
Failure(
new IllegalArgumentException(
"State is incompatible, please stop scenario and start again with clean state",
e
Expand Down
Loading

0 comments on commit 854e024

Please sign in to comment.