From 23cfd43f63bfeaebbe2a05c71e17eadbb64f04ac Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Mon, 20 Jan 2025 15:49:06 +0100 Subject: [PATCH] FlinkDeploymentManager: close test runner + test runner reused in some tests --- .../management/FlinkDeploymentManager.scala | 5 +++ .../FlinkProcessTestRunner.scala | 8 +++- .../FlinkProcessTestRunnerSpec.scala | 39 ++++++++++++------- .../kafka/TestFromFileSpec.scala | 32 +++++++-------- 4 files changed, 53 insertions(+), 31 deletions(-) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index 9b995c45791..c59d906e682 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -265,6 +265,11 @@ abstract class FlinkDeploymentManager( override def processStateDefinitionManager: ProcessStateDefinitionManager = FlinkProcessStateDefinitionManager + override def close(): Unit = { + super.close() + testRunner.close() + } + } object FlinkDeploymentManager { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala index 1bc413a1909..84cf2585bdf 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala @@ -10,7 +10,8 @@ import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker import scala.concurrent.{ExecutionContext, Future} -class FlinkProcessTestRunner(modelData: ModelData, parallelism: Int, streamExecutionConfig: Configuration) { +class FlinkProcessTestRunner(modelData: ModelData, parallelism: Int, streamExecutionConfig: Configuration) + extends AutoCloseable { private val streamExecutionEnvironment = TestsMechanismStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment(parallelism, streamExecutionConfig) @@ -42,4 +43,9 @@ class FlinkProcessTestRunner(modelData: ModelData, parallelism: Int, streamExecu scenarioTestData ) + def close(): Unit = { + miniCluster.close() + streamExecutionEnvironment.close() + } + } diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala index 5e53e6e7223..7ed17267c4c 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala @@ -7,8 +7,16 @@ import org.apache.flink.runtime.client.JobExecutionException import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfterEach, Inside, OptionValues} -import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId, ParameterAdditionalUIConfig} -import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ParameterValueCompileTimeValidation, ValueInputWithDictEditor} +import pl.touk.nussknacker.engine.api.component.{ + ComponentAdditionalConfig, + DesignerWideComponentId, + ParameterAdditionalUIConfig +} +import pl.touk.nussknacker.engine.api.parameter.{ + ParameterName, + ParameterValueCompileTimeValidation, + ValueInputWithDictEditor +} import pl.touk.nussknacker.engine.api.process.ComponentUseCase import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.api.{DisplayJsonWithEncoder, FragmentSpecificData, MetaData, StreamMetaData} @@ -17,11 +25,18 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.canonicalgraph.canonicalnode.FlatNode import pl.touk.nussknacker.engine.compile.FragmentResolver import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs -import pl.touk.nussknacker.engine.flink.test.{FlinkTestConfiguration, RecordingExceptionConsumer, RecordingExceptionConsumerProvider} +import pl.touk.nussknacker.engine.flink.test.{ + FlinkTestConfiguration, + RecordingExceptionConsumer, + RecordingExceptionConsumerProvider +} import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter} import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition} -import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunnerSpec.{fragmentWithValidationName, processWithFragmentParameterValidation} +import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunnerSpec.{ + fragmentWithValidationName, + processWithFragmentParameterValidation +} import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ThreadUtils} @@ -762,15 +777,13 @@ class FlinkProcessTestRunnerSpec ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround), resolveConfigs = false ) - ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - // TODO: reuse this instance between all test cases - val parallelism = MetaDataExtractor - .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) - .parallelism - .getOrElse(1) - new FlinkProcessTestRunner(modelData, parallelism, FlinkTestConfiguration.setupMemory(new Configuration)) - .runTests(process, scenarioTestData) - } + // TODO: reuse this instance between all test cases + val parallelism = MetaDataExtractor + .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) + .parallelism + .getOrElse(1) + new FlinkProcessTestRunner(modelData, parallelism, FlinkTestConfiguration.setupMemory(new Configuration)) + .runTests(process, scenarioTestData) } private def nodeResult(count: Int, vars: (String, Any)*): ResultContext[_] = diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/testsmechanism/kafka/TestFromFileSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/testsmechanism/kafka/TestFromFileSpec.scala index 1eeed0a4a3c..49db5f3e0dc 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/testsmechanism/kafka/TestFromFileSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/testsmechanism/kafka/TestFromFileSpec.scala @@ -7,13 +7,12 @@ import io.circe.Json import io.circe.Json.{Null, fromString, obj} import org.apache.flink.configuration.Configuration import org.apache.kafka.common.record.TimestampType -import org.scalatest.OptionValues import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, OptionValues} import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleValueParamName import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName @@ -23,8 +22,6 @@ import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessCo import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData -import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults -import pl.touk.nussknacker.engine.util.ThreadUtils import pl.touk.nussknacker.engine.util.json.ToJsonEncoder import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties} @@ -36,9 +33,10 @@ class TestFromFileSpec with Matchers with LazyLogging with EitherValuesDetailedMessage - with OptionValues { + with OptionValues + with BeforeAndAfterAll { - private lazy val config = ConfigFactory + private val config = ConfigFactory .empty() .withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef("kafka_should_not_be_used:9092")) .withValue( @@ -47,7 +45,7 @@ class TestFromFileSpec ) .withValue("kafka.topicsExistenceValidationConfig.enabled", fromAnyRef(false)) - protected lazy val modelData: ModelData = + private val modelData: ModelData = LocalModelData( inputConfig = config, components = List.empty, @@ -55,6 +53,14 @@ class TestFromFileSpec modelClassLoader = new ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround) ) + private val testRunner = + new FlinkProcessTestRunner(modelData, parallelism = 1, FlinkTestConfiguration.setupMemory(new Configuration)) + + override protected def afterAll(): Unit = { + super.afterAll() + testRunner.close() + } + test("Should pass correct timestamp from test data") { val topic = "simple" val expectedTimestamp = System.currentTimeMillis() @@ -97,7 +103,7 @@ class TestFromFileSpec _.add("value", obj("id" -> fromString("fooId"), "field" -> fromString("fooField"))) ) - val results = run(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil)) + val results = testRunner.runTests(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil)) val testResultVars = results.nodeResults("end").head.variables testResultVars("extractedTimestamp").hcursor.downField("pretty").as[Long].rightValue shouldBe expectedTimestamp @@ -126,19 +132,11 @@ class TestFromFileSpec .add("value", obj("id" -> fromString("1234"), "field" -> fromString("abcd"))) ) - val results = run(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil)) + val results = testRunner.runTests(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil)) results.nodeResults shouldBe Symbol("nonEmpty") } - private def run(process: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = { - ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - // TODO: reuse this instance between all test cases - new FlinkProcessTestRunner(modelData, parallelism = 1, FlinkTestConfiguration.setupMemory(new Configuration)) - .runTests(process, scenarioTestData) - } - } - } object TestFromFileSpec extends Serializable {