From b14fbd2d8f04d8bbf3c00a074d1258474b60c5ce Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Mon, 20 Jan 2025 16:32:26 +0100 Subject: [PATCH] FlinkProcessTestRunnerSpec: reusing test runner --- .../FlinkProcessTestRunner.scala | 23 +++++- .../FlinkProcessTestRunnerSpec.scala | 79 +++++++++++-------- 2 files changed, 66 insertions(+), 36 deletions(-) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala index 84cf2585bdf..f620a53f220 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunner.scala @@ -3,10 +3,11 @@ package pl.touk.nussknacker.engine.management.testsmechanism import io.circe.Json import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.api.StreamMetaData import pl.touk.nussknacker.engine.api.test.ScenarioTestData import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults -import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker +import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ReflectiveMethodInvoker} import scala.concurrent.{ExecutionContext, Future} @@ -39,10 +40,28 @@ class FlinkProcessTestRunner(modelData: ModelData, parallelism: Int, streamExecu miniCluster, streamExecutionEnvironment, modelData, - canonicalProcess, + rewriteParallelismIfHigherThanMaxParallelism(canonicalProcess), scenarioTestData ) + private def rewriteParallelismIfHigherThanMaxParallelism(canonicalProcess: CanonicalProcess): CanonicalProcess = { + val scenarioParallelism = MetaDataExtractor + .extractTypeSpecificDataOrDefault[StreamMetaData](canonicalProcess.metaData, StreamMetaData()) + .parallelism + .getOrElse(1) + if (scenarioParallelism > parallelism) { + canonicalProcess.copy(metaData = + canonicalProcess.metaData.copy(additionalFields = + canonicalProcess.metaData.additionalFields.copy(properties = + canonicalProcess.metaData.additionalFields.properties + (StreamMetaData.parallelismName -> parallelism.toString) + ) + ) + ) + } else { + canonicalProcess + } + } + def close(): Unit = { miniCluster.close() streamExecutionEnvironment.close() diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala index 7ed17267c4c..181f226cb5a 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala @@ -47,6 +47,7 @@ import java.util.{Date, UUID} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} +import scala.util.Using class FlinkProcessTestRunnerSpec extends AnyWordSpec @@ -78,6 +79,7 @@ class FlinkProcessTestRunnerSpec } private def runTests(useIOMonadInInterpreter: Boolean): Unit = { + val testRunner = prepareTestRunner(useIOMonadInInterpreter) "be able to return test results" in { val process = ScenarioBuilder @@ -92,15 +94,14 @@ class FlinkProcessTestRunnerSpec val input = SimpleRecord("0", 1, "2", new Date(3), Some(4), 5, "6") val input2 = SimpleRecord("0", 11, "2", new Date(3), Some(4), 5, "6") - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData( List( ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")), ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6")) ) - ), - useIOMonadInInterpreter + ) ) val nodeResults = results.nodeResults @@ -150,10 +151,9 @@ class FlinkProcessTestRunnerSpec .source(sourceNodeId, "input") .split("splitId1", GraphBuilder.emptySink("out1", "monitor"), GraphBuilder.emptySink("out2", "monitor")) - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))), - useIOMonadInInterpreter ) results.nodeResults("splitId1") shouldBe List( @@ -184,10 +184,9 @@ class FlinkProcessTestRunnerSpec val aggregate = SimpleRecordWithPreviousValue(input, 0, "s") val aggregate2 = SimpleRecordWithPreviousValue(input2, 1, "s") - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))), - useIOMonadInInterpreter ) val nodeResults = results.nodeResults @@ -238,10 +237,9 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "monitor") val results = - runFlinkTest( + testRunner.runTests( process, ScenarioTestData(createTestRecord() :: List.fill(4)(createTestRecord(value1 = 11))), - useIOMonadInInterpreter ) val nodeResults = results.nodeResults @@ -258,7 +256,7 @@ class FlinkProcessTestRunnerSpec .filter("filter", "1 / #input.value1 >= 0".spel) .emptySink("out", "monitor") - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData( List( @@ -268,7 +266,6 @@ class FlinkProcessTestRunnerSpec createTestRecord(id = "3", value1 = 4) ) ), - useIOMonadInInterpreter ) val nodeResults = results.nodeResults @@ -315,7 +312,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "monitor") val exceptionConsumerId = UUID.randomUUID().toString - val results = runFlinkTest( + val results = runTestsWithCustomModel( process = process, scenarioTestData = ScenarioTestData( List( @@ -326,7 +323,7 @@ class FlinkProcessTestRunnerSpec ) ), useIOMonadInInterpreter, - enrichDefaultConfig = RecordingExceptionConsumerProvider.configWithProvider(_, exceptionConsumerId) + enrichDefaultConfig = RecordingExceptionConsumerProvider.configWithProvider(_, exceptionConsumerId), ) val nodeResults = results.nodeResults @@ -347,7 +344,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "monitor") val run = Future { - runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + testRunner.runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2)))) } intercept[JobExecutionException](Await.result(run, 10 seconds)) @@ -376,7 +373,7 @@ class FlinkProcessTestRunnerSpec ) ) - val results = runFlinkTest(process, testData, useIOMonadInInterpreter) + val results = testRunner.runTests(process, testData) results.nodeResults(sourceNodeId) should have size 3 results.externalInvocationResults("out") shouldBe @@ -406,7 +403,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "#additionalOne + '|' + #additionalTwo".spel) val testData = ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("abc")))) - val results = runFlinkTest(process, testData, useIOMonadInInterpreter) + val results = testRunner.runTests(process, testData) results.nodeResults(sourceNodeId) should have size 1 results.externalInvocationResults("out") shouldBe @@ -427,7 +424,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "sinkForInts", "Value" -> "15 / {0, 1}[0]".spel) val results = - runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + testRunner.runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2)))) results.exceptions should have length 1 results.exceptions.head.nodeId shouldBe Some("out") @@ -447,7 +444,7 @@ class FlinkProcessTestRunnerSpec def recordWithSeconds(duration: FiniteDuration) = ScenarioTestJsonRecord(sourceNodeId, Json.fromString(s"0|0|0|${duration.toMillis}|0|0|0")) - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData( List( @@ -457,8 +454,7 @@ class FlinkProcessTestRunnerSpec recordWithSeconds(9 second), recordWithSeconds(20 second) ) - ), - useIOMonadInInterpreter + ) ) val nodeResults = results.nodeResults @@ -478,15 +474,14 @@ class FlinkProcessTestRunnerSpec ) .emptySink("out", "valueMonitor", "Value" -> "#input.field1 + #input.field2".spel) - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData( ScenarioTestJsonRecord( sourceNodeId, Json.obj("field1" -> Json.fromString("abc"), "field2" -> Json.fromString("def")) ) :: Nil - ), - useIOMonadInInterpreter + ) ) results.invocationResults("out").map(_.value) shouldBe List(variable("abcdef")) @@ -510,7 +505,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "#parsed.size + ' ' + #parsed[0].field2".spel) val results = - runFlinkTest(process, ScenarioTestData(List(createTestRecord(value1 = valueToReturn))), useIOMonadInInterpreter) + testRunner.runTests(process, ScenarioTestData(List(createTestRecord(value1 = valueToReturn)))) results.invocationResults("out").map(_.value) shouldBe List(variable(s"$countToPass $valueToReturn")) } @@ -533,7 +528,7 @@ class FlinkProcessTestRunnerSpec val recordTrue = createTestRecord(id = "ala") val recordFalse = createTestRecord(id = "bela") - val results = runFlinkTest(process, ScenarioTestData(List(recordTrue, recordFalse)), useIOMonadInInterpreter) + val results = testRunner.runTests(process, ScenarioTestData(List(recordTrue, recordFalse))) val invocationResults = results.invocationResults @@ -571,7 +566,7 @@ class FlinkProcessTestRunnerSpec val recB = createTestRecord(id = "b") val recC = createTestRecord(id = "c") - val results = runFlinkTest(process, ScenarioTestData(List(recA, recB, recC)), useIOMonadInInterpreter) + val results = testRunner.runTests(process, ScenarioTestData(List(recA, recB, recC))) results.invocationResults("proc2").map(_.contextId) should contain only ( s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-1-end1", @@ -580,7 +575,9 @@ class FlinkProcessTestRunnerSpec s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-2-end2" ) - results.externalInvocationResults("proc2").map(_.value.asInstanceOf[Json]) should contain theSameElementsAs List( + results + .externalInvocationResults("proc2") + .map(_.value.asInstanceOf[Json]) should contain theSameElementsAs List( "b", "a", "c", @@ -626,7 +623,7 @@ class FlinkProcessTestRunnerSpec val recordC = recordA.copy(id = "c") val recordD = recordA.copy(id = "d") - val results = runFlinkTest(process, scenarioTestData, useIOMonadInInterpreter) + val results = testRunner.runTests(process, scenarioTestData) val nodeResults = results.nodeResults nodeResults("source1") shouldBe List( @@ -673,7 +670,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "{#componentUseCaseService, #componentUseCaseCustomNode}".spel) val results = - runFlinkTest(process, ScenarioTestData(List(createTestRecord(sourceId = "start"))), useIOMonadInInterpreter) + testRunner.runTests(process, ScenarioTestData(List(createTestRecord(sourceId = "start")))) results.invocationResults("out").map(_.value) shouldBe List( variable(List(ComponentUseCase.TestRuntime, ComponentUseCase.TestRuntime)) @@ -694,7 +691,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) val run = Future { - runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + testRunner.runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2)))) } val dictEditorException = intercept[IllegalStateException](Await.result(run, 10 seconds)) dictEditorException.getMessage shouldBe "DictKeyWithLabel expression can only be used with DictParameterEditor, got Some(DualParameterEditor(StringParameterEditor,RAW))" @@ -715,7 +712,7 @@ class FlinkProcessTestRunnerSpec ) .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) - val results = runFlinkTest( + val results = runTestsWithCustomModel( process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter, @@ -745,10 +742,9 @@ class FlinkProcessTestRunnerSpec val resolved = FragmentResolver(List(processWithFragmentParameterValidation)).resolve(scenario) - val results = runFlinkTest( + val results = testRunner.runTests( resolved.valueOr { _ => throw new IllegalArgumentException("Won't happen") }, ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")))), - useIOMonadInInterpreter ) results.exceptions.length shouldBe 0 } @@ -761,7 +757,7 @@ class FlinkProcessTestRunnerSpec ): ScenarioTestJsonRecord = ScenarioTestJsonRecord(sourceId, Json.fromString(s"$id|$value1|2|3|4|5|6")) - private def runFlinkTest( + private def runTestsWithCustomModel( process: CanonicalProcess, scenarioTestData: ScenarioTestData, useIOMonadInInterpreter: Boolean, @@ -786,6 +782,21 @@ class FlinkProcessTestRunnerSpec .runTests(process, scenarioTestData) } + private def prepareTestRunner(useIOMonadInInterpreter: Boolean) = { + val config = ConfigFactory + .load("application.conf") + .withValue("globalParameters.useIOMonadInInterpreter", ConfigValueFactory.fromAnyRef(useIOMonadInInterpreter)) + + // We need to set context loader to avoid forking in sbt + val modelData = ModelData.duringExecution( + ModelConfigs(config, AdditionalModelConfigs(Map.empty)), + ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround), + resolveConfigs = false + ) + + new FlinkProcessTestRunner(modelData, parallelism = 1, FlinkTestConfiguration.setupMemory(new Configuration)) + } + private def nodeResult(count: Int, vars: (String, Any)*): ResultContext[_] = nodeResult(count, sourceNodeId, vars: _*)