From 98ace545dfec93f7d1bd7d1ff3e88eb319c02e9e Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Thu, 16 Jan 2025 12:11:07 +0100 Subject: [PATCH] [NU-1962] Flink test mechanism refactoring: inheritance replaced with composition --- .../process/runner/FlinkStubbedRunner.scala | 12 +++------ .../engine/process/runner/FlinkTestMain.scala | 8 +++--- .../runner/FlinkVerificationMain.scala | 8 +++--- .../management/FlinkProcessTestRunner.scala | 18 ++++++------- .../management/FlinkProcessVerifier.scala | 25 ++++++++++++------- ...er.scala => ReflectiveMethodInvoker.scala} | 6 ++--- 6 files changed, 41 insertions(+), 36 deletions(-) rename utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/{StaticMethodRunner.scala => ReflectiveMethodInvoker.scala} (73%) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala index a51db93cf39..c30464fa795 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala @@ -21,15 +21,9 @@ import java.net.{MalformedURLException, URL} import scala.jdk.CollectionConverters._ import scala.util.Using -trait FlinkStubbedRunner { +final class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, configuration: Configuration) { - protected def modelData: ModelData - - protected def process: CanonicalProcess - - protected def configuration: Configuration - - protected def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment( + def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment( MetaDataExtractor .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) .parallelism @@ -38,7 +32,7 @@ trait FlinkStubbedRunner { ) // we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath... - protected def execute[T]( + def execute[T]( env: StreamExecutionEnvironment, savepointRestoreSettings: SavepointRestoreSettings ): Unit = { diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index 62e2efce685..75ba6546e70 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -53,17 +53,19 @@ class FlinkTestMain( processVersion: ProcessVersion, deploymentData: DeploymentData, val configuration: Configuration -) extends FlinkStubbedRunner { +) { + + private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration) def runTest: TestResults[Json] = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener try { val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar(collectingListener, scenarioTestData) - val env = createEnv + val env = stubbedRunner.createEnv registrar.register(env, process, processVersion, deploymentData, resultCollector) - execute(env, SavepointRestoreSettings.none()) + stubbedRunner.execute(env, SavepointRestoreSettings.none()) collectingListener.results } finally { collectingListener.clean() diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala index 3463bed6546..b5795736780 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala @@ -33,16 +33,18 @@ class FlinkVerificationMain( deploymentData: DeploymentData, savepointPath: String, val configuration: Configuration -) extends FlinkStubbedRunner { +) { + + private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration) def runTest(): Unit = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar() - val env = createEnv + val env = stubbedRunner.createEnv registrar.register(env, process, processVersion, deploymentData, resultCollector) - execute(env, SavepointRestoreSettings.forPath(savepointPath, true)) + stubbedRunner.execute(env, SavepointRestoreSettings.forPath(savepointPath, true)) } protected def prepareRegistrar(): FlinkProcessRegistrar = { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala index 853187e57d4..f1522ecdfe6 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala @@ -6,24 +6,24 @@ import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.test.ScenarioTestData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults -import pl.touk.nussknacker.engine.util.StaticMethodRunner +import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker import scala.concurrent.{ExecutionContext, Future} -class FlinkProcessTestRunner(modelData: ModelData) - extends StaticMethodRunner( - modelData.modelClassLoader.classLoader, - "pl.touk.nussknacker.engine.process.runner.FlinkTestMain", - "run" - ) { +class FlinkProcessTestRunner(modelData: ModelData) { + + private val methodInvoker = new ReflectiveMethodInvoker[TestResults[Json]]( + modelData.modelClassLoader.classLoader, + "pl.touk.nussknacker.engine.process.runner.FlinkTestMain", + "run" + ) // NU-1455: We encode variable on the engine, because of classLoader's problems def test(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)( implicit ec: ExecutionContext ): Future[TestResults[Json]] = Future { - tryToInvoke(modelData, canonicalProcess, scenarioTestData, new Configuration()) - .asInstanceOf[TestResults[Json]] + methodInvoker.invokeStaticMethod(modelData, canonicalProcess, scenarioTestData, new Configuration()) } } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala index 29b9226343a..82691385dc1 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala @@ -6,18 +6,18 @@ import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData -import pl.touk.nussknacker.engine.util.StaticMethodRunner +import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker import scala.concurrent.Future import scala.util.control.NonFatal -class FlinkProcessVerifier(modelData: ModelData) - extends StaticMethodRunner( - modelData.modelClassLoader.classLoader, - "pl.touk.nussknacker.engine.process.runner.FlinkVerificationMain", - "run" - ) - with LazyLogging { +class FlinkProcessVerifier(modelData: ModelData) extends LazyLogging { + + private val methodInvoker = new ReflectiveMethodInvoker[Unit]( + modelData.modelClassLoader.classLoader, + "pl.touk.nussknacker.engine.process.runner.FlinkVerificationMain", + "run" + ) def verify( processVersion: ProcessVersion, @@ -27,7 +27,14 @@ class FlinkProcessVerifier(modelData: ModelData) val processId = processVersion.processName try { logger.info(s"Starting to verify $processId") - tryToInvoke(modelData, canonicalProcess, processVersion, DeploymentData.empty, savepointPath, new Configuration()) + methodInvoker.invokeStaticMethod( + modelData, + canonicalProcess, + processVersion, + DeploymentData.empty, + savepointPath, + new Configuration() + ) logger.info(s"Verification of $processId successful") Future.successful(()) } catch { diff --git a/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/StaticMethodRunner.scala b/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala similarity index 73% rename from utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/StaticMethodRunner.scala rename to utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala index 5311746a49d..0e013712222 100644 --- a/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/StaticMethodRunner.scala +++ b/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.util import java.lang.reflect.InvocationTargetException -abstract class StaticMethodRunner(classLoader: ClassLoader, className: String, methodName: String) { +final class ReflectiveMethodInvoker[Result](classLoader: ClassLoader, className: String, methodName: String) { import scala.reflect.runtime.{universe => ru} @@ -17,9 +17,9 @@ abstract class StaticMethodRunner(classLoader: ClassLoader, className: String, m } // we have to use context loader, as in UI we have don't have e.g. nussknacker-process or user model on classpath... - def tryToInvoke(args: Any*): Any = ThreadUtils.withThisAsContextClassLoader(classLoader) { + def invokeStaticMethod(args: Any*): Result = ThreadUtils.withThisAsContextClassLoader(classLoader) { try { - invoker(args: _*) + invoker(args: _*).asInstanceOf[Result] } catch { case e: InvocationTargetException => throw e.getTargetException }