Skip to content

Commit

Permalink
[NU-1962] Flink test mechanism: caching flink mini cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 15, 2025
1 parent 27073bb commit 3a07ecf
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,30 @@ import java.net.{MalformedURLException, URL}
import scala.jdk.CollectionConverters._
import scala.util.Using

trait FlinkStubbedRunner {
class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, configuration: Configuration) {

protected def modelData: ModelData

protected def process: CanonicalProcess

protected def configuration: Configuration

protected def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(
def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(
MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1),
configuration
)

def createMiniCluster(parallelism: Int): MiniCluster = {
val miniCluster = new MiniCluster(
new MiniClusterConfiguration.Builder()
.setNumSlotsPerTaskManager(parallelism)
.setConfiguration(configuration)
.build()
)
miniCluster.start()
miniCluster
}

// we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath...
protected def execute[T](
def execute[T](
miniCluster: MiniCluster,
env: StreamExecutionEnvironment,
savepointRestoreSettings: SavepointRestoreSettings
): Unit = {
Expand All @@ -64,18 +70,8 @@ trait FlinkStubbedRunner {
// it is required for proper working of HadoopFileSystem
FileSystem.initialize(configuration, null)

Using.resource(
new MiniCluster(
new MiniClusterConfiguration.Builder()
.setNumSlotsPerTaskManager(env.getParallelism)
.setConfiguration(configuration)
.build()
)
) { exec =>
exec.start()
val id = exec.submitJob(jobGraph).get().getJobID
exec.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader)
}
val id = miniCluster.submitJob(jobGraph).get().getJobID
miniCluster.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader)
}

private def classpathsFromModelWithFallbackToConfiguration = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import io.circe.Json
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData}
import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory
Expand Down Expand Up @@ -53,18 +53,22 @@ class FlinkTestMain(
processVersion: ProcessVersion,
deploymentData: DeploymentData,
val configuration: Configuration
) extends FlinkStubbedRunner {
) {

private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration)

def runTest: TestResults[Json] = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
try {
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar(collectingListener, scenarioTestData)
val env = createEnv
val env = stubbedRunner.createEnv

registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.none())
collectingListener.results
Using.resource(stubbedRunner.createMiniCluster(env.getParallelism)) { miniCluster =>
stubbedRunner.execute(miniCluster, env, SavepointRestoreSettings.none())
collectingListener.results
}
} finally {
collectingListener.clean()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig}
import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestRunId, TestServiceInvocationCollector}
import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestServiceInvocationCollector}

import scala.util.Using

object FlinkVerificationMain extends FlinkRunner {

Expand All @@ -33,16 +35,20 @@ class FlinkVerificationMain(
deploymentData: DeploymentData,
savepointPath: String,
val configuration: Configuration
) extends FlinkStubbedRunner {
) {

private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration)

def runTest(): Unit = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar()
val env = createEnv
val env = stubbedRunner.createEnv

registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.forPath(savepointPath, true))
Using.resource(stubbedRunner.createMiniCluster(env.getParallelism)) { miniCluster =>
stubbedRunner.execute(miniCluster, env, SavepointRestoreSettings.forPath(savepointPath, true))
}
}

protected def prepareRegistrar(): FlinkProcessRegistrar = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@ import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.engine.util.StaticMethodRunner
import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker

import scala.concurrent.{ExecutionContext, Future}

class FlinkProcessTestRunner(modelData: ModelData)
extends StaticMethodRunner(
modelData.modelClassLoader.classLoader,
"pl.touk.nussknacker.engine.process.runner.FlinkTestMain",
"run"
) {
class FlinkProcessTestRunner(modelData: ModelData) {

private val methodInvoker = new ReflectiveMethodInvoker[TestResults[Json]](
modelData.modelClassLoader.classLoader,
"pl.touk.nussknacker.engine.process.runner.FlinkTestMain",
"run"
)

// NU-1455: We encode variable on the engine, because of classLoader's problems
def test(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)(
implicit ec: ExecutionContext
): Future[TestResults[Json]] =
Future {
tryToInvoke(modelData, canonicalProcess, scenarioTestData, new Configuration())
.asInstanceOf[TestResults[Json]]
methodInvoker.invokeStaticMethod(modelData, canonicalProcess, scenarioTestData, new Configuration())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.util.StaticMethodRunner
import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker

import scala.concurrent.Future
import scala.util.control.NonFatal

class FlinkProcessVerifier(modelData: ModelData)
extends StaticMethodRunner(
modelData.modelClassLoader.classLoader,
"pl.touk.nussknacker.engine.process.runner.FlinkVerificationMain",
"run"
)
with LazyLogging {
class FlinkProcessVerifier(modelData: ModelData) extends LazyLogging {

private val methodInvoker = new ReflectiveMethodInvoker[Unit](
modelData.modelClassLoader.classLoader,
"pl.touk.nussknacker.engine.process.runner.FlinkVerificationMain",
"run"
)

def verify(
processVersion: ProcessVersion,
Expand All @@ -27,7 +27,14 @@ class FlinkProcessVerifier(modelData: ModelData)
val processId = processVersion.processName
try {
logger.info(s"Starting to verify $processId")
tryToInvoke(modelData, canonicalProcess, processVersion, DeploymentData.empty, savepointPath, new Configuration())
methodInvoker.invokeStaticMethod(
modelData,
canonicalProcess,
processVersion,
DeploymentData.empty,
savepointPath,
new Configuration()
)
logger.info(s"Verification of $processId successful")
Future.successful(())
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.util

import java.lang.reflect.InvocationTargetException

abstract class StaticMethodRunner(classLoader: ClassLoader, className: String, methodName: String) {
final class ReflectiveMethodInvoker[Result](classLoader: ClassLoader, className: String, methodName: String) {

import scala.reflect.runtime.{universe => ru}

Expand All @@ -12,14 +12,13 @@ abstract class StaticMethodRunner(classLoader: ClassLoader, className: String, m
val im = m.reflectModule(module)
val method = im.symbol.info.decl(ru.TermName(methodName)).asMethod
val objMirror = m.reflect(im.instance)
val r = objMirror.reflectMethod(method)
r
objMirror.reflectMethod(method)
}

// we have to use context loader, as in UI we have don't have e.g. nussknacker-process or user model on classpath...
def tryToInvoke(args: Any*): Any = ThreadUtils.withThisAsContextClassLoader(classLoader) {
def invokeStaticMethod(args: Any*): Result = ThreadUtils.withThisAsContextClassLoader(classLoader) {
try {
invoker(args: _*)
invoker(args: _*).asInstanceOf[Result]
} catch {
case e: InvocationTargetException => throw e.getTargetException
}
Expand Down

0 comments on commit 3a07ecf

Please sign in to comment.