diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala index 7ed17267c4c..253b3de2f29 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/testsmechanism/FlinkProcessTestRunnerSpec.scala @@ -47,6 +47,7 @@ import java.util.{Date, UUID} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} +import scala.util.Using class FlinkProcessTestRunnerSpec extends AnyWordSpec @@ -78,6 +79,7 @@ class FlinkProcessTestRunnerSpec } private def runTests(useIOMonadInInterpreter: Boolean): Unit = { + val testRunner = prepareTestRunner(useIOMonadInInterpreter) "be able to return test results" in { val process = ScenarioBuilder @@ -92,15 +94,14 @@ class FlinkProcessTestRunnerSpec val input = SimpleRecord("0", 1, "2", new Date(3), Some(4), 5, "6") val input2 = SimpleRecord("0", 11, "2", new Date(3), Some(4), 5, "6") - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData( List( ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")), ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6")) ) - ), - useIOMonadInInterpreter + ) ) val nodeResults = results.nodeResults @@ -150,10 +151,9 @@ class FlinkProcessTestRunnerSpec .source(sourceNodeId, "input") .split("splitId1", GraphBuilder.emptySink("out1", "monitor"), GraphBuilder.emptySink("out2", "monitor")) - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))), - useIOMonadInInterpreter ) results.nodeResults("splitId1") shouldBe List( @@ -184,10 +184,9 @@ class FlinkProcessTestRunnerSpec val aggregate = SimpleRecordWithPreviousValue(input, 0, "s") val aggregate2 = SimpleRecordWithPreviousValue(input2, 1, "s") - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))), - useIOMonadInInterpreter ) val nodeResults = results.nodeResults @@ -238,7 +237,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "monitor") val results = - runFlinkTest( + runTestsWithCustomModel( process, ScenarioTestData(createTestRecord() :: List.fill(4)(createTestRecord(value1 = 11))), useIOMonadInInterpreter @@ -258,7 +257,7 @@ class FlinkProcessTestRunnerSpec .filter("filter", "1 / #input.value1 >= 0".spel) .emptySink("out", "monitor") - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData( List( @@ -268,7 +267,6 @@ class FlinkProcessTestRunnerSpec createTestRecord(id = "3", value1 = 4) ) ), - useIOMonadInInterpreter ) val nodeResults = results.nodeResults @@ -315,7 +313,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "monitor") val exceptionConsumerId = UUID.randomUUID().toString - val results = runFlinkTest( + val results = runTestsWithCustomModel( process = process, scenarioTestData = ScenarioTestData( List( @@ -326,7 +324,7 @@ class FlinkProcessTestRunnerSpec ) ), useIOMonadInInterpreter, - enrichDefaultConfig = RecordingExceptionConsumerProvider.configWithProvider(_, exceptionConsumerId) + enrichDefaultConfig = RecordingExceptionConsumerProvider.configWithProvider(_, exceptionConsumerId), ) val nodeResults = results.nodeResults @@ -347,7 +345,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "monitor") val run = Future { - runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + testRunner.runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2)))) } intercept[JobExecutionException](Await.result(run, 10 seconds)) @@ -376,7 +374,7 @@ class FlinkProcessTestRunnerSpec ) ) - val results = runFlinkTest(process, testData, useIOMonadInInterpreter) + val results = testRunner.runTests(process, testData) results.nodeResults(sourceNodeId) should have size 3 results.externalInvocationResults("out") shouldBe @@ -406,7 +404,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "#additionalOne + '|' + #additionalTwo".spel) val testData = ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("abc")))) - val results = runFlinkTest(process, testData, useIOMonadInInterpreter) + val results = testRunner.runTests(process, testData) results.nodeResults(sourceNodeId) should have size 1 results.externalInvocationResults("out") shouldBe @@ -427,7 +425,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "sinkForInts", "Value" -> "15 / {0, 1}[0]".spel) val results = - runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + testRunner.runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2)))) results.exceptions should have length 1 results.exceptions.head.nodeId shouldBe Some("out") @@ -447,7 +445,7 @@ class FlinkProcessTestRunnerSpec def recordWithSeconds(duration: FiniteDuration) = ScenarioTestJsonRecord(sourceNodeId, Json.fromString(s"0|0|0|${duration.toMillis}|0|0|0")) - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData( List( @@ -457,8 +455,7 @@ class FlinkProcessTestRunnerSpec recordWithSeconds(9 second), recordWithSeconds(20 second) ) - ), - useIOMonadInInterpreter + ) ) val nodeResults = results.nodeResults @@ -478,15 +475,14 @@ class FlinkProcessTestRunnerSpec ) .emptySink("out", "valueMonitor", "Value" -> "#input.field1 + #input.field2".spel) - val results = runFlinkTest( + val results = testRunner.runTests( process, ScenarioTestData( ScenarioTestJsonRecord( sourceNodeId, Json.obj("field1" -> Json.fromString("abc"), "field2" -> Json.fromString("def")) ) :: Nil - ), - useIOMonadInInterpreter + ) ) results.invocationResults("out").map(_.value) shouldBe List(variable("abcdef")) @@ -510,7 +506,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "#parsed.size + ' ' + #parsed[0].field2".spel) val results = - runFlinkTest(process, ScenarioTestData(List(createTestRecord(value1 = valueToReturn))), useIOMonadInInterpreter) + testRunner.runTests(process, ScenarioTestData(List(createTestRecord(value1 = valueToReturn)))) results.invocationResults("out").map(_.value) shouldBe List(variable(s"$countToPass $valueToReturn")) } @@ -533,7 +529,7 @@ class FlinkProcessTestRunnerSpec val recordTrue = createTestRecord(id = "ala") val recordFalse = createTestRecord(id = "bela") - val results = runFlinkTest(process, ScenarioTestData(List(recordTrue, recordFalse)), useIOMonadInInterpreter) + val results = testRunner.runTests(process, ScenarioTestData(List(recordTrue, recordFalse))) val invocationResults = results.invocationResults @@ -571,7 +567,7 @@ class FlinkProcessTestRunnerSpec val recB = createTestRecord(id = "b") val recC = createTestRecord(id = "c") - val results = runFlinkTest(process, ScenarioTestData(List(recA, recB, recC)), useIOMonadInInterpreter) + val results = testRunner.runTests(process, ScenarioTestData(List(recA, recB, recC))) results.invocationResults("proc2").map(_.contextId) should contain only ( s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-1-end1", @@ -580,7 +576,9 @@ class FlinkProcessTestRunnerSpec s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-2-end2" ) - results.externalInvocationResults("proc2").map(_.value.asInstanceOf[Json]) should contain theSameElementsAs List( + results + .externalInvocationResults("proc2") + .map(_.value.asInstanceOf[Json]) should contain theSameElementsAs List( "b", "a", "c", @@ -626,7 +624,7 @@ class FlinkProcessTestRunnerSpec val recordC = recordA.copy(id = "c") val recordD = recordA.copy(id = "d") - val results = runFlinkTest(process, scenarioTestData, useIOMonadInInterpreter) + val results = testRunner.runTests(process, scenarioTestData) val nodeResults = results.nodeResults nodeResults("source1") shouldBe List( @@ -673,7 +671,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "{#componentUseCaseService, #componentUseCaseCustomNode}".spel) val results = - runFlinkTest(process, ScenarioTestData(List(createTestRecord(sourceId = "start"))), useIOMonadInInterpreter) + testRunner.runTests(process, ScenarioTestData(List(createTestRecord(sourceId = "start")))) results.invocationResults("out").map(_.value) shouldBe List( variable(List(ComponentUseCase.TestRuntime, ComponentUseCase.TestRuntime)) @@ -694,7 +692,7 @@ class FlinkProcessTestRunnerSpec .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) val run = Future { - runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + testRunner.runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2)))) } val dictEditorException = intercept[IllegalStateException](Await.result(run, 10 seconds)) dictEditorException.getMessage shouldBe "DictKeyWithLabel expression can only be used with DictParameterEditor, got Some(DualParameterEditor(StringParameterEditor,RAW))" @@ -715,7 +713,7 @@ class FlinkProcessTestRunnerSpec ) .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) - val results = runFlinkTest( + val results = runTestsWithCustomModel( process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter, @@ -745,10 +743,9 @@ class FlinkProcessTestRunnerSpec val resolved = FragmentResolver(List(processWithFragmentParameterValidation)).resolve(scenario) - val results = runFlinkTest( + val results = testRunner.runTests( resolved.valueOr { _ => throw new IllegalArgumentException("Won't happen") }, ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")))), - useIOMonadInInterpreter ) results.exceptions.length shouldBe 0 } @@ -761,7 +758,7 @@ class FlinkProcessTestRunnerSpec ): ScenarioTestJsonRecord = ScenarioTestJsonRecord(sourceId, Json.fromString(s"$id|$value1|2|3|4|5|6")) - private def runFlinkTest( + private def runTestsWithCustomModel( process: CanonicalProcess, scenarioTestData: ScenarioTestData, useIOMonadInInterpreter: Boolean, @@ -786,6 +783,21 @@ class FlinkProcessTestRunnerSpec .runTests(process, scenarioTestData) } + private def prepareTestRunner(useIOMonadInInterpreter: Boolean) = { + val config = ConfigFactory + .load("application.conf") + .withValue("globalParameters.useIOMonadInInterpreter", ConfigValueFactory.fromAnyRef(useIOMonadInInterpreter)) + + // We need to set context loader to avoid forking in sbt + val modelData = ModelData.duringExecution( + ModelConfigs(config, AdditionalModelConfigs(Map.empty)), + ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround), + resolveConfigs = false + ) + + new FlinkProcessTestRunner(modelData, parallelism = 1, FlinkTestConfiguration.setupMemory(new Configuration)) + } + private def nodeResult(count: Int, vars: (String, Any)*): ResultContext[_] = nodeResult(count, sourceNodeId, vars: _*)