Skip to content

Commit

Permalink
[NU-1962] Flink test mechanism refactoring: inheritance replaced with…
Browse files Browse the repository at this point in the history
… composition
  • Loading branch information
arkadius committed Jan 16, 2025
1 parent 8494ff4 commit 98ace54
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,9 @@ import java.net.{MalformedURLException, URL}
import scala.jdk.CollectionConverters._
import scala.util.Using

trait FlinkStubbedRunner {
final class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, configuration: Configuration) {

protected def modelData: ModelData

protected def process: CanonicalProcess

protected def configuration: Configuration

protected def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(
def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(
MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
Expand All @@ -38,7 +32,7 @@ trait FlinkStubbedRunner {
)

// we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath...
protected def execute[T](
def execute[T](
env: StreamExecutionEnvironment,
savepointRestoreSettings: SavepointRestoreSettings
): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,19 @@ class FlinkTestMain(
processVersion: ProcessVersion,
deploymentData: DeploymentData,
val configuration: Configuration
) extends FlinkStubbedRunner {
) {

private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration)

def runTest: TestResults[Json] = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
try {
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar(collectingListener, scenarioTestData)
val env = createEnv
val env = stubbedRunner.createEnv

registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.none())
stubbedRunner.execute(env, SavepointRestoreSettings.none())
collectingListener.results
} finally {
collectingListener.clean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ class FlinkVerificationMain(
deploymentData: DeploymentData,
savepointPath: String,
val configuration: Configuration
) extends FlinkStubbedRunner {
) {

private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration)

def runTest(): Unit = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar()
val env = createEnv
val env = stubbedRunner.createEnv

registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.forPath(savepointPath, true))
stubbedRunner.execute(env, SavepointRestoreSettings.forPath(savepointPath, true))
}

protected def prepareRegistrar(): FlinkProcessRegistrar = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@ import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.engine.util.StaticMethodRunner
import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker

import scala.concurrent.{ExecutionContext, Future}

class FlinkProcessTestRunner(modelData: ModelData)
extends StaticMethodRunner(
modelData.modelClassLoader.classLoader,
"pl.touk.nussknacker.engine.process.runner.FlinkTestMain",
"run"
) {
class FlinkProcessTestRunner(modelData: ModelData) {

private val methodInvoker = new ReflectiveMethodInvoker[TestResults[Json]](
modelData.modelClassLoader.classLoader,
"pl.touk.nussknacker.engine.process.runner.FlinkTestMain",
"run"
)

// NU-1455: We encode variable on the engine, because of classLoader's problems
def test(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)(
implicit ec: ExecutionContext
): Future[TestResults[Json]] =
Future {
tryToInvoke(modelData, canonicalProcess, scenarioTestData, new Configuration())
.asInstanceOf[TestResults[Json]]
methodInvoker.invokeStaticMethod(modelData, canonicalProcess, scenarioTestData, new Configuration())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.util.StaticMethodRunner
import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker

import scala.concurrent.Future
import scala.util.control.NonFatal

class FlinkProcessVerifier(modelData: ModelData)
extends StaticMethodRunner(
modelData.modelClassLoader.classLoader,
"pl.touk.nussknacker.engine.process.runner.FlinkVerificationMain",
"run"
)
with LazyLogging {
class FlinkProcessVerifier(modelData: ModelData) extends LazyLogging {

private val methodInvoker = new ReflectiveMethodInvoker[Unit](
modelData.modelClassLoader.classLoader,
"pl.touk.nussknacker.engine.process.runner.FlinkVerificationMain",
"run"
)

def verify(
processVersion: ProcessVersion,
Expand All @@ -27,7 +27,14 @@ class FlinkProcessVerifier(modelData: ModelData)
val processId = processVersion.processName
try {
logger.info(s"Starting to verify $processId")
tryToInvoke(modelData, canonicalProcess, processVersion, DeploymentData.empty, savepointPath, new Configuration())
methodInvoker.invokeStaticMethod(
modelData,
canonicalProcess,
processVersion,
DeploymentData.empty,
savepointPath,
new Configuration()
)
logger.info(s"Verification of $processId successful")
Future.successful(())
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.util

import java.lang.reflect.InvocationTargetException

abstract class StaticMethodRunner(classLoader: ClassLoader, className: String, methodName: String) {
final class ReflectiveMethodInvoker[Result](classLoader: ClassLoader, className: String, methodName: String) {

import scala.reflect.runtime.{universe => ru}

Expand All @@ -17,9 +17,9 @@ abstract class StaticMethodRunner(classLoader: ClassLoader, className: String, m
}

// we have to use context loader, as in UI we have don't have e.g. nussknacker-process or user model on classpath...
def tryToInvoke(args: Any*): Any = ThreadUtils.withThisAsContextClassLoader(classLoader) {
def invokeStaticMethod(args: Any*): Result = ThreadUtils.withThisAsContextClassLoader(classLoader) {
try {
invoker(args: _*)
invoker(args: _*).asInstanceOf[Result]
} catch {
case e: InvocationTargetException => throw e.getTargetException
}
Expand Down

0 comments on commit 98ace54

Please sign in to comment.