From 3a07ecf716423efe62c2030346ac8b51e2f544cd Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Wed, 15 Jan 2025 14:23:21 +0100 Subject: [PATCH] [NU-1962] Flink test mechanism: caching flink mini cluster --- .../process/runner/FlinkStubbedRunner.scala | 38 +++++++++---------- .../engine/process/runner/FlinkTestMain.scala | 14 ++++--- .../runner/FlinkVerificationMain.scala | 14 +++++-- .../management/FlinkProcessTestRunner.scala | 18 ++++----- .../management/FlinkProcessVerifier.scala | 25 +++++++----- ...er.scala => ReflectiveMethodInvoker.scala} | 9 ++--- 6 files changed, 65 insertions(+), 53 deletions(-) rename utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/{StaticMethodRunner.scala => ReflectiveMethodInvoker.scala} (69%) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala index a51db93cf39..325ae48b154 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala @@ -21,15 +21,9 @@ import java.net.{MalformedURLException, URL} import scala.jdk.CollectionConverters._ import scala.util.Using -trait FlinkStubbedRunner { +class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, configuration: Configuration) { - protected def modelData: ModelData - - protected def process: CanonicalProcess - - protected def configuration: Configuration - - protected def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment( + def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment( MetaDataExtractor .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) .parallelism @@ -37,8 +31,20 @@ trait FlinkStubbedRunner { configuration ) + def createMiniCluster(parallelism: Int): MiniCluster = { + val miniCluster = new MiniCluster( + new MiniClusterConfiguration.Builder() + .setNumSlotsPerTaskManager(parallelism) + .setConfiguration(configuration) + .build() + ) + miniCluster.start() + miniCluster + } + // we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath... - protected def execute[T]( + def execute[T]( + miniCluster: MiniCluster, env: StreamExecutionEnvironment, savepointRestoreSettings: SavepointRestoreSettings ): Unit = { @@ -64,18 +70,8 @@ trait FlinkStubbedRunner { // it is required for proper working of HadoopFileSystem FileSystem.initialize(configuration, null) - Using.resource( - new MiniCluster( - new MiniClusterConfiguration.Builder() - .setNumSlotsPerTaskManager(env.getParallelism) - .setConfiguration(configuration) - .build() - ) - ) { exec => - exec.start() - val id = exec.submitJob(jobGraph).get().getJobID - exec.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader) - } + val id = miniCluster.submitJob(jobGraph).get().getJobID + miniCluster.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader) } private def classpathsFromModelWithFallbackToConfiguration = { diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index 62e2efce685..27c16053c8f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -4,9 +4,9 @@ import io.circe.Json import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.test.ScenarioTestData +import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData} import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory @@ -53,18 +53,22 @@ class FlinkTestMain( processVersion: ProcessVersion, deploymentData: DeploymentData, val configuration: Configuration -) extends FlinkStubbedRunner { +) { + + private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration) def runTest: TestResults[Json] = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener try { val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar(collectingListener, scenarioTestData) - val env = createEnv + val env = stubbedRunner.createEnv registrar.register(env, process, processVersion, deploymentData, resultCollector) - execute(env, SavepointRestoreSettings.none()) - collectingListener.results + Using.resource(stubbedRunner.createMiniCluster(env.getParallelism)) { miniCluster => + stubbedRunner.execute(miniCluster, env, SavepointRestoreSettings.none()) + collectingListener.results + } } finally { collectingListener.clean() } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala index 3463bed6546..247ebcff80a 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala @@ -9,7 +9,9 @@ import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} -import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestRunId, TestServiceInvocationCollector} +import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestServiceInvocationCollector} + +import scala.util.Using object FlinkVerificationMain extends FlinkRunner { @@ -33,16 +35,20 @@ class FlinkVerificationMain( deploymentData: DeploymentData, savepointPath: String, val configuration: Configuration -) extends FlinkStubbedRunner { +) { + + private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration) def runTest(): Unit = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar() - val env = createEnv + val env = stubbedRunner.createEnv registrar.register(env, process, processVersion, deploymentData, resultCollector) - execute(env, SavepointRestoreSettings.forPath(savepointPath, true)) + Using.resource(stubbedRunner.createMiniCluster(env.getParallelism)) { miniCluster => + stubbedRunner.execute(miniCluster, env, SavepointRestoreSettings.forPath(savepointPath, true)) + } } protected def prepareRegistrar(): FlinkProcessRegistrar = { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala index 853187e57d4..f1522ecdfe6 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala @@ -6,24 +6,24 @@ import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.test.ScenarioTestData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults -import pl.touk.nussknacker.engine.util.StaticMethodRunner +import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker import scala.concurrent.{ExecutionContext, Future} -class FlinkProcessTestRunner(modelData: ModelData) - extends StaticMethodRunner( - modelData.modelClassLoader.classLoader, - "pl.touk.nussknacker.engine.process.runner.FlinkTestMain", - "run" - ) { +class FlinkProcessTestRunner(modelData: ModelData) { + + private val methodInvoker = new ReflectiveMethodInvoker[TestResults[Json]]( + modelData.modelClassLoader.classLoader, + "pl.touk.nussknacker.engine.process.runner.FlinkTestMain", + "run" + ) // NU-1455: We encode variable on the engine, because of classLoader's problems def test(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)( implicit ec: ExecutionContext ): Future[TestResults[Json]] = Future { - tryToInvoke(modelData, canonicalProcess, scenarioTestData, new Configuration()) - .asInstanceOf[TestResults[Json]] + methodInvoker.invokeStaticMethod(modelData, canonicalProcess, scenarioTestData, new Configuration()) } } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala index 29b9226343a..82691385dc1 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala @@ -6,18 +6,18 @@ import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData -import pl.touk.nussknacker.engine.util.StaticMethodRunner +import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker import scala.concurrent.Future import scala.util.control.NonFatal -class FlinkProcessVerifier(modelData: ModelData) - extends StaticMethodRunner( - modelData.modelClassLoader.classLoader, - "pl.touk.nussknacker.engine.process.runner.FlinkVerificationMain", - "run" - ) - with LazyLogging { +class FlinkProcessVerifier(modelData: ModelData) extends LazyLogging { + + private val methodInvoker = new ReflectiveMethodInvoker[Unit]( + modelData.modelClassLoader.classLoader, + "pl.touk.nussknacker.engine.process.runner.FlinkVerificationMain", + "run" + ) def verify( processVersion: ProcessVersion, @@ -27,7 +27,14 @@ class FlinkProcessVerifier(modelData: ModelData) val processId = processVersion.processName try { logger.info(s"Starting to verify $processId") - tryToInvoke(modelData, canonicalProcess, processVersion, DeploymentData.empty, savepointPath, new Configuration()) + methodInvoker.invokeStaticMethod( + modelData, + canonicalProcess, + processVersion, + DeploymentData.empty, + savepointPath, + new Configuration() + ) logger.info(s"Verification of $processId successful") Future.successful(()) } catch { diff --git a/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/StaticMethodRunner.scala b/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala similarity index 69% rename from utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/StaticMethodRunner.scala rename to utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala index 5311746a49d..7592c65ed16 100644 --- a/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/StaticMethodRunner.scala +++ b/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.util import java.lang.reflect.InvocationTargetException -abstract class StaticMethodRunner(classLoader: ClassLoader, className: String, methodName: String) { +final class ReflectiveMethodInvoker[Result](classLoader: ClassLoader, className: String, methodName: String) { import scala.reflect.runtime.{universe => ru} @@ -12,14 +12,13 @@ abstract class StaticMethodRunner(classLoader: ClassLoader, className: String, m val im = m.reflectModule(module) val method = im.symbol.info.decl(ru.TermName(methodName)).asMethod val objMirror = m.reflect(im.instance) - val r = objMirror.reflectMethod(method) - r + objMirror.reflectMethod(method) } // we have to use context loader, as in UI we have don't have e.g. nussknacker-process or user model on classpath... - def tryToInvoke(args: Any*): Any = ThreadUtils.withThisAsContextClassLoader(classLoader) { + def invokeStaticMethod(args: Any*): Result = ThreadUtils.withThisAsContextClassLoader(classLoader) { try { - invoker(args: _*) + invoker(args: _*).asInstanceOf[Result] } catch { case e: InvocationTargetException => throw e.getTargetException }