From a2c8db714a5da24862882b5494fbe5675bab7763 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Fri, 31 Jan 2025 18:07:19 +0100 Subject: [PATCH] [NU-1979] MiniCluster refactor: MiniClusterJobStatusCheckingOps + using flinkMiniCluster module in scenario unit tests --- build.sbt | 28 +-- .../OpenApiScenarioIntegrationTest.scala | 13 +- docs/Changelog.md | 1 + docs/MigrationGuide.md | 14 +- .../common/components/DecisionTableSpec.scala | 18 +- .../flink/SpelTemplateLazyParameterTest.scala | 16 +- .../aggregate/TableAggregationTest.scala | 20 +- .../flink/table/join/TableJoinTest.scala | 17 +- .../flink/table/sink/TableFileSinkTest.scala | 13 +- .../table/sink/TableSinkParametersTest.scala | 11 +- .../flink/table/source/TableSourceTest.scala | 19 +- .../EventGeneratorSourceFactorySpec.scala | 39 ++-- .../transformer/ForEachTransformerSpec.scala | 74 +++---- .../transformer/UnionTransformerSpec.scala | 25 ++- ...ionTransformersExceptionHandlingSpec.scala | 7 +- .../UnionTransformersTestModeSpec.scala | 9 +- .../UnionWithMemoTransformerSpec.scala | 9 +- .../FullOuterJoinTransformerSpec.scala | 53 ++--- .../JavaCollectionsSerializationTest.scala | 9 +- .../ModelUtilExceptionHandlingSpec.scala | 38 ++-- .../SingleSideJoinTransformerSpec.scala | 58 +++--- .../aggregate/TransformersTest.scala | 9 +- ...r.scala => FlinkScenarioUnitTestJob.scala} | 16 +- ...xception.FlinkEspExceptionConsumerProvider | 1 - .../process/api/EvictableStateTest.scala | 21 +- .../process/functional/MetricsSpec.scala | 3 +- .../process/helpers/ProcessTestHelpers.scala | 30 +-- .../registrar/FlinkStreamGraphSpec.scala | 11 +- .../FlinkStreamingProcessMainSpec.scala | 8 +- .../KafkaExceptionConsumerSpec.scala | 31 +-- .../KafkaSourceFactoryProcessMixin.scala | 9 +- .../process/SampleComponentProviderTest.scala | 9 +- .../MiniClusterJobStatusCheckingOps.scala | 144 +++++++++++++ .../ConsumerRecordSerializerSpec.scala | 5 +- .../helpers/KafkaAvroSpecMixin.scala | 9 +- ...kaUniversalSinkExceptionHandlingSpec.scala | 7 +- .../test/CorrectExceptionHandlingSpec.scala | 21 +- .../flink/test/FlinkMiniClusterHolder.scala | 123 +++-------- .../engine/flink/test/FlinkSpec.scala | 21 +- .../flink/test/FlinkTestConfiguration.scala | 42 ---- .../MiniClusterExecutionEnvironment.scala | 191 ++++-------------- .../defaultmodel/FlinkWithKafkaSuite.scala | 8 +- .../defaultmodel/SerializationTest.scala | 20 +- .../defaultmodel/StateCompatibilityTest.scala | 49 ++--- .../util/test/FlinkTestScenarioRunner.scala | 55 +++-- .../test/FlinkTestScenarioRunnerSpec.scala | 31 ++- 46 files changed, 708 insertions(+), 657 deletions(-) rename engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/{UnitTestsFlinkRunner.scala => FlinkScenarioUnitTestJob.scala} (71%) delete mode 100644 engine/flink/executor/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.exception.FlinkEspExceptionConsumerProvider create mode 100644 engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/MiniClusterJobStatusCheckingOps.scala delete mode 100644 engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala diff --git a/build.sbt b/build.sbt index a1d1eec9a11..e2ff9847e85 100644 --- a/build.sbt +++ b/build.sbt @@ -694,11 +694,9 @@ lazy val flinkTests = (project in flink("tests")) name := "nussknacker-flink-tests", libraryDependencies ++= { Seq( - "org.apache.flink" % "flink-connector-base" % flinkV % Test, - "org.apache.flink" % "flink-streaming-java" % flinkV % Test, - "org.apache.flink" % "flink-statebackend-rocksdb" % flinkV % Test, - "org.apache.flink" % "flink-connector-kafka" % flinkConnectorKafkaV % Test, - "org.apache.flink" % "flink-json" % flinkV % Test + "org.apache.flink" % "flink-connector-base" % flinkV % Test, + "org.apache.flink" % "flink-connector-kafka" % flinkConnectorKafkaV % Test, + "org.apache.flink" % "flink-json" % flinkV % Test ) } ) @@ -785,7 +783,7 @@ lazy val flinkExecutor = (project in flink("executor")) // Different versions of netty which is on the bottom of this stack causes NoClassDefFoundError. // To overcome this problem and reduce size of model jar bundle, we add http utils as a compile time dependency. httpUtils, - flinkTestUtils % Test + flinkTestUtils % Test, ) lazy val scenarioCompiler = (project in file("scenario-compiler")) @@ -1033,14 +1031,14 @@ lazy val flinkComponentsTestkit = (project in utils("flink-components-testkit")) name := "nussknacker-flink-components-testkit", libraryDependencies ++= { Seq( - "org.apache.flink" % "flink-streaming-java" % flinkV exclude ("com.esotericsoftware", "kryo-shaded"), + "org.apache.flink" % "flink-metrics-dropwizard" % flinkV ) } ) .dependsOn( componentsTestkit, flinkExecutor, - flinkTestUtils, + flinkMiniCluster, flinkBaseComponents, flinkBaseUnboundedComponents, defaultModel @@ -1208,6 +1206,7 @@ lazy val flinkMiniCluster = (project in flink("minicluster")) "org.apache.flink" % "flink-statebackend-rocksdb" % flinkV, "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV % Provided, "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV % Provided, + "com.softwaremill.retry" %% "retry" % retryV, ) ++ flinkLibScalaDeps(scalaVersion.value) } ) @@ -1222,21 +1221,13 @@ lazy val flinkTestUtils = (project in flink("test-utils")) name := "nussknacker-flink-test-utils", libraryDependencies ++= { Seq( - "org.apache.flink" % "flink-streaming-java" % flinkV % Provided, - // intellij has some problems with provided... - "org.apache.flink" % "flink-statebackend-rocksdb" % flinkV, - "org.apache.flink" % "flink-test-utils" % flinkV excludeAll ( - // we use logback in NK - ExclusionRule("org.apache.logging.log4j", "log4j-slf4j-impl") - ), - "org.apache.flink" % "flink-runtime" % flinkV % Compile classifier "tests", "org.apache.flink" % "flink-metrics-dropwizard" % flinkV, "com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV, "com.dimafeng" %% "testcontainers-scala-kafka" % testContainersScalaV, - ) ++ flinkLibScalaDeps(scalaVersion.value) + ) } ) - .dependsOn(testUtils, flinkComponentsUtils, flinkExtensionsApi, componentsUtils, scenarioCompiler) + .dependsOn(testUtils, flinkComponentsUtils, flinkExtensionsApi, scenarioCompiler, flinkMiniCluster) lazy val requestResponseComponentsUtils = (project in lite("request-response/components-utils")) .settings(commonSettings) @@ -1825,6 +1816,7 @@ lazy val flinkBaseComponentsTests = (project in flink("components/base-tests")) ) .dependsOn( flinkComponentsTestkit % Test, + flinkTestUtils % Test, flinkTableApiComponents % Test ) diff --git a/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenApiScenarioIntegrationTest.scala b/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenApiScenarioIntegrationTest.scala index 0e8290c1789..b3ae22022e8 100644 --- a/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenApiScenarioIntegrationTest.scala +++ b/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenApiScenarioIntegrationTest.scala @@ -10,9 +10,8 @@ import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.api.typed.TypedMap import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.graph.expression.Expression -import pl.touk.nussknacker.engine.spel import pl.touk.nussknacker.engine.util.test.{ClassBasedTestScenarioRunner, RunResult, TestScenarioRunner} import pl.touk.nussknacker.openapi.enrichers.SwaggerEnricher import pl.touk.nussknacker.openapi.parser.SwaggerParser @@ -30,7 +29,6 @@ class OpenApiScenarioIntegrationTest extends AnyFlatSpec with BeforeAndAfterAll with Matchers - with FlinkSpec with LazyLogging with VeryPatientScalaFutures with ValidatedValuesDetailedMessage { @@ -59,6 +57,13 @@ class OpenApiScenarioIntegrationTest test(prepareScenarioRunner(port, sttpBackend, _.copy(allowedMethods = List("POST")))) } + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + + override protected def afterAll(): Unit = { + super.afterAll() + flinkMiniClusterWithServices.close() + } + val stubbedBackend: SttpBackendStub[Future, Any] = SttpBackendStub.asynchronousFuture.whenRequestMatchesPartial { case request => request.headers match { @@ -150,7 +155,7 @@ class OpenApiScenarioIntegrationTest val stubComponent = prepareStubbedComponent(sttpBackend, openAPIsConfig, url) // TODO: switch to liteBased after adding ability to override components there (currently there is only option to append not conflicting once) and rename class to *FunctionalTest TestScenarioRunner - .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices) .withExtraComponents(List(stubComponent)) .build() } diff --git a/docs/Changelog.md b/docs/Changelog.md index 96a026d3d04..a056bdd54b9 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -76,6 +76,7 @@ * For `aggregate-session` - default `endSessionCondition` is now false * Improved scenario visualization loading time * [#7516](https://github.com/TouK/nussknacker/pull/7516) Scenario testing endpoints no longer perform full scenario compilation and validation +* [#7511](https://github.com/TouK/nussknacker/pull/7511) `flink-components-testkit` rework: easier ScenarioTestRunner creation - see [Migration guide](MigrationGuide.md) for details ## 1.18 diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index ccc23ebeb44..79842ba28f5 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -4,7 +4,6 @@ To see the biggest differences please consult the [changelog](Changelog.md). ## In version 1.19.0 (Not released yet) - ### Configuration changes * [#7181](https://github.com/TouK/nussknacker/pull/7181) Added designer configuration: stickyNotesSettings @@ -12,7 +11,6 @@ To see the biggest differences please consult the [changelog](Changelog.md). * maxNotesCount - max count of sticky notes inside one scenario/fragment * enabled - if set to false stickyNotes feature is disabled, stickyNotes cant be created, they are also not loaded to graph - ### Other changes * [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation @@ -84,6 +82,18 @@ To see the biggest differences please consult the [changelog](Changelog.md). ### Code API changes * [#7368](https://github.com/TouK/nussknacker/pull/7368) [#7502](https://github.com/TouK/nussknacker/pull/7502) Renamed `PeriodicSourceFactory` to `EventGeneratorSourceFactory` * [#7364](https://github.com/TouK/nussknacker/pull/7364) The DeploymentManager must implement `def schedulingSupport: SchedulingSupport`. If support not added, then `NoSchedulingSupport` should be used. +* [#7511](https://github.com/TouK/nussknacker/pull/7511) Changes around flink-based scenario testing + * `TestScenarioRunner.flinkBased` now takes `FlinkMiniClusterWithServices` instead of `FlinkMiniClusterHolder`. + `flink-tests` module doesn't depend on `flink-test-utils` module. To create `FlinkMiniClusterWithServices` follow steps: + * Remove `FlinkSpec` inheritance from test class + * Extend `BeforeAndAfterAll` by your test class + * Create `FlinkMiniClusterWithServices` using `val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices()` + * Close `FlinkMiniClusterWithServices` in `afterAll` block + * Changes in `flink-test-utils` module + * `FlinkMiniClusterHolder` doesn't provide `MiniCluster` methods. If you want to use them, use `FlinkMiniClusterHolder.miniCluster` + * `FlinkMiniClusterHolder.createExecutionEnvironment` method is not available. Use `FlinkMiniClusterHolder.withExecutionEnvironment` + which properly closes created environment + * `MiniClusterExecutionEnvironment` created by `FlinkMiniClusterHolder` is not a `StreamExecutionEnvironment`, to access it, use `.env` nested method ## In version 1.18.0 diff --git a/engine/common/components-tests/src/test/scala/pl/touk/nussknacker/engine/common/components/DecisionTableSpec.scala b/engine/common/components-tests/src/test/scala/pl/touk/nussknacker/engine/common/components/DecisionTableSpec.scala index e71e4882a18..71c3947f078 100644 --- a/engine/common/components-tests/src/test/scala/pl/touk/nussknacker/engine/common/components/DecisionTableSpec.scala +++ b/engine/common/components-tests/src/test/scala/pl/touk/nussknacker/engine/common/components/DecisionTableSpec.scala @@ -1,7 +1,8 @@ package pl.touk.nussknacker.engine.common.components import cats.data.{NonEmptyList, Validated, ValidatedNel} -import org.scalatest.Inside +import com.typesafe.config.ConfigFactory +import org.scalatest.{BeforeAndAfterAll, Inside} import org.scalatest.concurrent.Eventually import org.scalatest.freespec.AnyFreeSpec import org.scalatest.matchers.should.Matchers @@ -15,7 +16,7 @@ import pl.touk.nussknacker.engine.api.generics.ExpressionParseError.{ import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner import pl.touk.nussknacker.engine.lite.util.test.LiteTestScenarioRunner._ import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ @@ -272,11 +273,20 @@ trait DecisionTableSpec private final case class TestMessage(id: String, minAge: Int) -class FlinkEngineRunDecisionTableSpec extends DecisionTableSpec with FlinkSpec { +class FlinkEngineRunDecisionTableSpec extends DecisionTableSpec with BeforeAndAfterAll { + + private val config = ConfigFactory.empty() + + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + + override protected def afterAll(): Unit = { + super.afterAll() + flinkMiniClusterWithServices.close() + } override protected lazy val testScenarioRunner: FlinkTestScenarioRunner = TestScenarioRunner - .flinkBased(config, flinkMiniCluster) + .flinkBased(config, flinkMiniClusterWithServices) .build() override protected def execute[DATA: ClassTag, RESULT]( diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/SpelTemplateLazyParameterTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/SpelTemplateLazyParameterTest.scala index 0ece0cb1173..85d35bd3c7c 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/SpelTemplateLazyParameterTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/SpelTemplateLazyParameterTest.scala @@ -5,6 +5,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.connector.source.Boundedness import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.util.Collector +import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.TemplateRenderedPart.{RenderedLiteral, RenderedSubExpression} @@ -30,7 +31,7 @@ import pl.touk.nussknacker.engine.flink.api.process.{ FlinkCustomNodeContext, FlinkCustomStreamTransformation } -import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode @@ -40,18 +41,25 @@ import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage class SpelTemplateLazyParameterTest extends AnyFunSuite - with FlinkSpec with Matchers - with ValidatedValuesDetailedMessage { + with ValidatedValuesDetailedMessage + with BeforeAndAfterAll { + + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() private lazy val runner = TestScenarioRunner - .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices) .withExecutionMode(ExecutionMode.Batch) .withExtraComponents( List(ComponentDefinition("spelTemplatePartsCustomTransformer", SpelTemplatePartsCustomTransformer)) ) .build() + override protected def afterAll(): Unit = { + super.afterAll() + flinkMiniClusterWithServices.close() + } + test("flink custom transformer using spel template rendered parts") { val scenario = ScenarioBuilder .streaming("test") diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/aggregate/TableAggregationTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/aggregate/TableAggregationTest.scala index 2c93a025552..e2d84eb3530 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/aggregate/TableAggregationTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/aggregate/TableAggregationTest.scala @@ -2,16 +2,17 @@ package pl.touk.nussknacker.engine.flink.table.aggregate import com.typesafe.config.ConfigFactory import org.apache.flink.api.connector.source.Boundedness -import org.scalatest.Inside import org.scalatest.LoneElement._ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks +import org.scalatest.{BeforeAndAfterAll, Inside} import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider import pl.touk.nussknacker.engine.flink.table.SpelValues._ import pl.touk.nussknacker.engine.flink.table.aggregate.TableAggregationTest.{ @@ -19,7 +20,6 @@ import pl.touk.nussknacker.engine.flink.table.aggregate.TableAggregationTest.{ TestRecord, buildMultipleAggregationsScenario } -import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode @@ -32,17 +32,29 @@ import java.time.{LocalDate, OffsetDateTime, ZonedDateTime} import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag -class TableAggregationTest extends AnyFunSuite with TableDrivenPropertyChecks with FlinkSpec with Matchers with Inside { +class TableAggregationTest + extends AnyFunSuite + with TableDrivenPropertyChecks + with Matchers + with Inside + with BeforeAndAfterAll { private lazy val additionalComponents: List[ComponentDefinition] = FlinkTableComponentProvider.configIndependentComponents + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + private lazy val runner = TestScenarioRunner - .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices) .withExecutionMode(ExecutionMode.Batch) .withExtraComponents(additionalComponents) .build() + override protected def afterAll(): Unit = { + super.afterAll() + flinkMiniClusterWithServices.close() + } + test("should be able to aggregate by number types, string and boolean declared in spel") { val aggregationParameters = (spelBoolean :: spelStr :: spelBigDecimal :: numberPrimitiveLiteralExpressions).map { expr => diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/join/TableJoinTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/join/TableJoinTest.scala index 2404d01dfe2..7ecd19768c0 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/join/TableJoinTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/join/TableJoinTest.scala @@ -5,16 +5,16 @@ import org.apache.flink.api.connector.source.Boundedness import org.apache.flink.types.Row import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import org.scalatest.{Inside, LoneElement} +import org.scalatest.{BeforeAndAfterAll, Inside, LoneElement} import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.table.{FlinkTableComponentProvider, SpelValues} import pl.touk.nussknacker.engine.flink.table.join.TableJoinTest.OrderOrProduct import pl.touk.nussknacker.engine.flink.table.join.TableJoinTest.OrderOrProduct._ -import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.transformer.join.BranchType import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode @@ -23,11 +23,11 @@ import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage class TableJoinTest extends AnyFunSuite - with FlinkSpec with Matchers with Inside with ValidatedValuesDetailedMessage - with LoneElement { + with LoneElement + with BeforeAndAfterAll { import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.spel.SpelExtension._ @@ -37,12 +37,19 @@ class TableJoinTest private lazy val additionalComponents: List[ComponentDefinition] = FlinkTableComponentProvider.configIndependentComponents ::: Nil + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + private lazy val runner = TestScenarioRunner - .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices) .withExecutionMode(ExecutionMode.Batch) .withExtraComponents(additionalComponents) .build() + override protected def afterAll(): Unit = { + super.afterAll() + flinkMiniClusterWithServices.close() + } + private val mainBranchId = "main" private val joinedBranchId = "joined" diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableFileSinkTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableFileSinkTest.scala index 2e001094bca..2f461ae5fce 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableFileSinkTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableFileSinkTest.scala @@ -5,7 +5,7 @@ import io.circe.Json import org.apache.commons.io.FileUtils import org.apache.flink.api.connector.source.Boundedness import org.apache.flink.table.api.DataTypes -import org.scalatest.LoneElement +import org.scalatest.{BeforeAndAfterAll, LoneElement} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.component.ComponentDefinition @@ -13,10 +13,10 @@ import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNode import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider import pl.touk.nussknacker.engine.flink.table.SpelValues._ import pl.touk.nussknacker.engine.flink.table.utils.NotConvertibleResultOfAlignmentException -import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode @@ -31,11 +31,11 @@ import scala.jdk.CollectionConverters._ class TableFileSinkTest extends AnyFunSuite - with FlinkSpec with Matchers with PatientScalaFutures with LoneElement - with ValidatedValuesDetailedMessage { + with ValidatedValuesDetailedMessage + with BeforeAndAfterAll { import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.spel.SpelExtension._ @@ -238,8 +238,10 @@ class TableFileSinkTest ProcessObjectDependencies.withConfig(tableComponentsConfig) ) + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + private lazy val runner: FlinkTestScenarioRunner = TestScenarioRunner - .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices) .withExecutionMode(ExecutionMode.Batch) .withExtraComponents(tableComponents) .build() @@ -256,6 +258,7 @@ class TableFileSinkTest FileUtils.deleteQuietly(datetimeExpressionOutputDirectory.toFile) FileUtils.deleteQuietly(oneColumnOutputDirectory.toFile) FileUtils.deleteQuietly(virtualColumnOutputDirectory.toFile) + flinkMiniClusterWithServices.close() super.afterAll() } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkParametersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkParametersTest.scala index d562d63e193..d2212519b1f 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkParametersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkParametersTest.scala @@ -4,6 +4,7 @@ import cats.data.NonEmptyList import cats.data.Validated.Invalid import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.io.FileUtils +import org.scalatest.BeforeAndAfterAll import org.scalatest.Inside.inside import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers @@ -14,20 +15,19 @@ import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{ } import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider -import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode import pl.touk.nussknacker.engine.util.test.TestScenarioRunner import pl.touk.nussknacker.test.PatientScalaFutures -import scala.jdk.CollectionConverters._ import java.io.File import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path} import scala.jdk.CollectionConverters._ -class TableSinkParametersTest extends AnyFunSuite with FlinkSpec with Matchers with PatientScalaFutures { +class TableSinkParametersTest extends AnyFunSuite with Matchers with PatientScalaFutures with BeforeAndAfterAll { import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.spel.SpelExtension._ @@ -99,8 +99,10 @@ class TableSinkParametersTest extends AnyFunSuite with FlinkSpec with Matchers w ProcessObjectDependencies.withConfig(tableComponentsConfig) ) + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + private lazy val runner: FlinkTestScenarioRunner = TestScenarioRunner - .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices) .withExecutionMode(ExecutionMode.Batch) .withExtraComponents(tableComponents) .build() @@ -108,6 +110,7 @@ class TableSinkParametersTest extends AnyFunSuite with FlinkSpec with Matchers w override protected def afterAll(): Unit = { FileUtils.deleteQuietly(outputDirectory.toFile) FileUtils.deleteQuietly(virtualColumnOutputDirectory.toFile) + flinkMiniClusterWithServices.close() super.afterAll() } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala index 36e2b17f08f..1a3f784dca1 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala @@ -3,16 +3,16 @@ package pl.touk.nussknacker.engine.flink.table.source import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.io.FileUtils import org.apache.flink.types.Row -import org.scalatest.LoneElement +import org.scalatest.{BeforeAndAfterAll, LoneElement} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.NodeId import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodesDeploymentData, SqlFilteringExpression} import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider import pl.touk.nussknacker.engine.flink.table.definition.{FlinkDataDefinition, StubbedCatalogFactory} -import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode import pl.touk.nussknacker.engine.util.test.TestScenarioRunner @@ -23,11 +23,11 @@ import java.nio.charset.StandardCharsets class TableSourceTest extends AnyFunSuite - with FlinkSpec with Matchers with PatientScalaFutures with LoneElement - with ValidatedValuesDetailedMessage { + with ValidatedValuesDetailedMessage + with BeforeAndAfterAll { import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.spel.SpelExtension._ @@ -61,12 +61,19 @@ class TableSourceTest ProcessObjectDependencies.withConfig(tableComponentsConfig) ) + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + private lazy val runner: FlinkTestScenarioRunner = TestScenarioRunner - .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices) .withExecutionMode(ExecutionMode.Batch) .withExtraComponents(tableComponents) .build() + override protected def afterAll(): Unit = { + super.afterAll() + flinkMiniClusterWithServices.close() + } + test("be possible to use table declared inside a database other than the default one") { val scenario = ScenarioBuilder .streaming("test") @@ -108,7 +115,7 @@ class TableSourceTest ) val runnerWithCatalogConfiguration: FlinkTestScenarioRunner = TestScenarioRunner - .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices) .withExecutionMode(ExecutionMode.Batch) .withExtraComponents(tableComponentsBasedOnCatalogConfiguration) .build() diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/EventGeneratorSourceFactorySpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/EventGeneratorSourceFactorySpec.scala index 29cdc2e817a..4172a5768c4 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/EventGeneratorSourceFactorySpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/EventGeneratorSourceFactorySpec.scala @@ -8,7 +8,7 @@ import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.ResultsCollectingListenerHolder @@ -42,17 +42,14 @@ class EventGeneratorSourceFactorySpec ) .emptySink(sinkId, "dead-end") - val stoppableEnv = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(scenario) - - val id = stoppableEnv.executeAndWaitForStart(scenario.name.value) - try { - eventually { - val results = collectingListener.results.nodeResults.get(sinkId) - results.flatMap(_.headOption).flatMap(_.variableTyped("input")) shouldBe Some(input) + flinkMiniCluster.withExecutionEnvironment { stoppableEnv => + val executionResult = new FlinkScenarioUnitTestJob(model).run(scenario, stoppableEnv.env) + stoppableEnv.withJobRunning(executionResult.getJobID) { + eventually { + val results = collectingListener.results.nodeResults.get(sinkId) + results.flatMap(_.headOption).flatMap(_.variableTyped("input")) shouldBe Some(input) + } } - } finally { - stoppableEnv.cancel(id.getJobID) } } @@ -77,19 +74,17 @@ class EventGeneratorSourceFactorySpec ) .emptySink(sinkId, "dead-end") - val stoppableEnv = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(scenario) + flinkMiniCluster.withExecutionEnvironment { stoppableEnv => + val executionResult = new FlinkScenarioUnitTestJob(model).run(scenario, stoppableEnv.env) - val id = stoppableEnv.executeAndWaitForStart(scenario.name.value) - try { - eventually { - val results = collectingListener.results.nodeResults.get(sinkId) - val emittedResults = results.toList.flatten.flatMap(_.variableTyped("input")) - emittedResults.size should be > 1 - emittedResults.distinct.size shouldBe emittedResults.size + stoppableEnv.withJobRunning(executionResult.getJobID) { + eventually { + val results = collectingListener.results.nodeResults.get(sinkId) + val emittedResults = results.toList.flatten.flatMap(_.variableTyped("input")) + emittedResults.size should be > 1 + emittedResults.distinct.size shouldBe emittedResults.size + } } - } finally { - stoppableEnv.cancel(id.getJobID) } } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/ForEachTransformerSpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/ForEachTransformerSpec.scala index 41219de0f30..672ab169d52 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/ForEachTransformerSpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/ForEachTransformerSpec.scala @@ -16,12 +16,13 @@ import pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass.CaseClassTypeInfo import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.source.EmitWatermarkAfterEachElementCollectionSource import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode._ import java.time.Duration +import scala.util.Using class ForEachTransformerSpec extends AnyFunSuite with FlinkSpec with Matchers with Inside { @@ -31,54 +32,56 @@ class ForEachTransformerSpec extends AnyFunSuite with FlinkSpec with Matchers wi private val forEachNodeResultId = "for-each-result" test("should produce results for each element in list") { - val collectingListener = initializeListener - val model = modelData(List(TestRecord()), collectingListener) + Using.resource(ResultsCollectingListenerHolder.registerListener) { collectingListener => + val model = modelData(List(TestRecord()), collectingListener) - val testProcess = - aProcessWithForEachNode(elements = "{'one', 'other'}", resultExpression = s"#$forEachOutputVariableName + '_1'") + val testProcess = + aProcessWithForEachNode(elements = "{'one', 'other'}", resultExpression = s"#$forEachOutputVariableName + '_1'") - val results = collectTestResults(model, testProcess, collectingListener) - extractResultValues(results) shouldBe List("one_1", "other_1") + val results = collectTestResults(model, testProcess, collectingListener) + extractResultValues(results) shouldBe List("one_1", "other_1") + } } test("should produce unique contextId for each element in list") { - val collectingListener = initializeListener - val model = modelData(List(TestRecord()), collectingListener) + Using.resource(ResultsCollectingListenerHolder.registerListener) { collectingListener => + val model = modelData(List(TestRecord()), collectingListener) - val testProcess = - aProcessWithForEachNode(elements = "{'one', 'other'}", resultExpression = s"#$forEachOutputVariableName + '_1'") + val testProcess = + aProcessWithForEachNode(elements = "{'one', 'other'}", resultExpression = s"#$forEachOutputVariableName + '_1'") - val results = collectTestResults(model, testProcess, collectingListener) - extractContextIds(results) shouldBe List("forEachProcess-start-0-0-0", "forEachProcess-start-0-0-1") + val results = collectTestResults(model, testProcess, collectingListener) + extractContextIds(results) shouldBe List("forEachProcess-start-0-0-0", "forEachProcess-start-0-0-1") + } } test("should set return type based on element types") { - val collectingListener = initializeListener - val model = modelData(List(TestRecord()), collectingListener) - - val testProcess = - aProcessWithForEachNode(elements = "{'one', 'other'}", resultExpression = s"#$forEachOutputVariableName + '_1'") - val processValidator = ProcessValidator.default(model) - implicit val jobData: JobData = - JobData(testProcess.metaData, ProcessVersion.empty.copy(processName = testProcess.metaData.name)) - - val forEachResultValidationContext = - processValidator.validate(testProcess, isFragment = false).typing(forEachNodeResultId) - forEachResultValidationContext.inputValidationContext.get(forEachOutputVariableName) shouldBe Some(Typed[String]) + Using.resource(ResultsCollectingListenerHolder.registerListener) { collectingListener => + val model = modelData(List(TestRecord()), collectingListener) + + val testProcess = + aProcessWithForEachNode(elements = "{'one', 'other'}", resultExpression = s"#$forEachOutputVariableName + '_1'") + val processValidator = ProcessValidator.default(model) + implicit val jobData: JobData = + JobData(testProcess.metaData, ProcessVersion.empty.copy(processName = testProcess.metaData.name)) + + val forEachResultValidationContext = + processValidator.validate(testProcess, isFragment = false).typing(forEachNodeResultId) + forEachResultValidationContext.inputValidationContext.get(forEachOutputVariableName) shouldBe Some(Typed[String]) + } } test("should not produce any results when elements list is empty") { - val collectingListener = initializeListener - val model = modelData(List(TestRecord()), collectingListener) + Using.resource(ResultsCollectingListenerHolder.registerListener) { collectingListener => + val model = modelData(List(TestRecord()), collectingListener) - val testProcess = aProcessWithForEachNode(elements = "{}") + val testProcess = aProcessWithForEachNode(elements = "{}") - val results = collectTestResults(model, testProcess, collectingListener) - results.nodeResults shouldNot contain key sinkId + val results = collectTestResults(model, testProcess, collectingListener) + results.nodeResults shouldNot contain key sinkId + } } - private def initializeListener = ResultsCollectingListenerHolder.registerListener - private def modelData( list: List[TestRecord] = List(), collectingListener: ResultsCollectingListener[Any] @@ -124,9 +127,10 @@ class ForEachTransformerSpec extends AnyFunSuite with FlinkSpec with Matchers wi .map(_.id) private def runProcess(model: LocalModelData, testProcess: CanonicalProcess): Unit = { - val stoppableEnv = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(testProcess) - stoppableEnv.executeAndWaitForFinished(testProcess.name.value)() + flinkMiniCluster.withExecutionEnvironment { stoppableEnv => + val executionResult = new FlinkScenarioUnitTestJob(model).run(testProcess, stoppableEnv.env) + stoppableEnv.waitForFinished(executionResult.getJobID) + } } } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformerSpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformerSpec.scala index ef784063ff1..5dc2dcb9a50 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformerSpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformerSpec.scala @@ -1,21 +1,21 @@ package pl.touk.nussknacker.engine.flink.util.transformer +import com.typesafe.config.ConfigFactory import com.typesafe.scalalogging.LazyLogging -import org.scalatest.BeforeAndAfterEach +import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CannotCreateObjectError import pl.touk.nussknacker.engine.api.typed.CustomNodeValidationException import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} -import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.util.test.{RunResult, TestScenarioRunner} import pl.touk.nussknacker.test.{ValidatedValuesDetailedMessage, VeryPatientScalaFutures} class UnionTransformerSpec extends AnyFunSuite - with BeforeAndAfterEach + with BeforeAndAfterAll with Matchers - with FlinkSpec with LazyLogging with VeryPatientScalaFutures { @@ -33,8 +33,16 @@ class UnionTransformerSpec private val data = List("10", "20", "30", "40") - override def afterEach(): Unit = { - super.afterEach() + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + + private lazy val testScenarioRunner = + TestScenarioRunner + .flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices) + .build() + + override protected def afterAll(): Unit = { + super.afterAll() + flinkMiniClusterWithServices.close() } test("should unify streams with union-memo") { @@ -186,9 +194,4 @@ class UnionTransformerSpec ) } - private def testScenarioRunner = - TestScenarioRunner - .flinkBased(config, flinkMiniCluster) - .build() - } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformersExceptionHandlingSpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformersExceptionHandlingSpec.scala index 11ee644c2b3..0d926008dfa 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformersExceptionHandlingSpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformersExceptionHandlingSpec.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.flink.util.transformer import cats.data.NonEmptyList +import org.apache.flink.api.common.JobExecutionResult import org.scalatest.funsuite.AnyFunSuite import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.build.GraphBuilder @@ -8,16 +9,16 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider import pl.touk.nussknacker.engine.flink.test.{CorrectExceptionHandlingSpec, MiniClusterExecutionEnvironment} import pl.touk.nussknacker.engine.graph.expression.Expression -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ class UnionTransformersExceptionHandlingSpec extends AnyFunSuite with CorrectExceptionHandlingSpec { - override protected def registerInEnvironment( + override protected def runScenario( env: MiniClusterExecutionEnvironment, modelData: ModelData, scenario: CanonicalProcess - ): Unit = UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(scenario) + ): JobExecutionResult = new FlinkScenarioUnitTestJob(modelData).run(scenario, env.env) private val durationExpression = "T(java.time.Duration).parse('PT1M')" diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformersTestModeSpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformersTestModeSpec.scala index 270650bb2ea..db22d4373f6 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformersTestModeSpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionTransformersTestModeSpec.scala @@ -15,7 +15,7 @@ import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.source.CollectionSource import pl.touk.nussknacker.engine.graph.node import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode._ import pl.touk.nussknacker.test.VeryPatientScalaFutures @@ -135,9 +135,10 @@ class UnionTransformersTestModeSpec .map(_.id) private def runProcess(modelData: LocalModelData, scenario: CanonicalProcess): Unit = { - val stoppableEnv = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, modelData)(scenario) - stoppableEnv.executeAndWaitForFinished(scenario.name.value)() + flinkMiniCluster.withExecutionEnvironment { stoppableEnv => + val executionResult = new FlinkScenarioUnitTestJob(modelData).run(scenario, stoppableEnv.env) + stoppableEnv.waitForFinished(executionResult.getJobID) + } } } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionWithMemoTransformerSpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionWithMemoTransformerSpec.scala index 54201768b52..b2f75aeaad6 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionWithMemoTransformerSpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/UnionWithMemoTransformerSpec.scala @@ -16,7 +16,7 @@ import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.source.BlockingQueueSource import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListener, ResultsCollectingListenerHolder} import pl.touk.nussknacker.test.VeryPatientScalaFutures @@ -215,9 +215,10 @@ class UnionWithMemoTransformerSpec extends AnyFunSuite with FlinkSpec with Match prepareComponents(sourceFoo, sourceBar), configCreator = new ConfigCreatorWithCollectingListener(collectingListener), ) - val stoppableEnv = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(testProcess) - stoppableEnv.withJobRunning(testProcess.name.value)(action) + flinkMiniCluster.withExecutionEnvironment { stoppableEnv => + val executionResult = new FlinkScenarioUnitTestJob(model).run(testProcess, stoppableEnv.env) + stoppableEnv.withJobRunning(executionResult.getJobID)(action) + } } def prepareComponents( diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/FullOuterJoinTransformerSpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/FullOuterJoinTransformerSpec.scala index 80462174f8a..34a49810490 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/FullOuterJoinTransformerSpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/FullOuterJoinTransformerSpec.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate import com.typesafe.config.ConfigFactory +import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.runtime.execution.ExecutionState @@ -15,14 +16,14 @@ import pl.touk.nussknacker.engine.api.typed.typing.TypingResult import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.compile.ProcessValidator -import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.flink.test.{FlinkSpec, MiniClusterExecutionEnvironment} import pl.touk.nussknacker.engine.flink.util.function.ProcessFunctionInterceptor import pl.touk.nussknacker.engine.flink.util.keyed.StringKeyedValue import pl.touk.nussknacker.engine.flink.util.sink.EmptySink import pl.touk.nussknacker.engine.flink.util.source.BlockingQueueSource import pl.touk.nussknacker.engine.flink.util.transformer.join.FullOuterJoinTransformer import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListener, ResultsCollectingListenerHolder} import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap @@ -104,27 +105,27 @@ class FullOuterJoinTransformerSpec extends AnyFunSuite with FlinkSpec with Match } val collectingListener = ResultsCollectingListenerHolder.registerListener - val (id, stoppableEnv) = runProcess(process, input1, input2, collectingListener) - - input.foreach { - case Left(x) => addTo1(x) - case Right(x) => addTo2(x) - } + withRunningScenario(process, input1, input2, collectingListener) { (id, stoppableEnv) => + input.foreach { + case Left(x) => addTo1(x) + case Right(x) => addTo2(x) + } - input1.finish() - input2.finish() + input1.finish() + input2.finish() - stoppableEnv.waitForJobStateWithNotFailingCheck(id.getJobID, process.name.value, ExecutionState.FINISHED)() + stoppableEnv.waitForFinished(id.getJobID) - val outValues = collectingListener.results - .nodeResults(EndNodeId) - .map(_.variableTyped[java.util.Map[String, AnyRef]](OutVariableName).get.asScala.toMap) - .map(_.mapValuesNow { - case x: java.util.Map[String @unchecked, AnyRef @unchecked] => x.asScala.asInstanceOf[AnyRef] - case x => x - }) + val outValues = collectingListener.results + .nodeResults(EndNodeId) + .map(_.variableTyped[java.util.Map[String, AnyRef]](OutVariableName).get.asScala.toMap) + .map(_.mapValuesNow { + case x: java.util.Map[String @unchecked, AnyRef @unchecked] => x.asScala.asInstanceOf[AnyRef] + case x => x + }) - outValues shouldEqual expected + outValues shouldEqual expected + } } test("simple join") { @@ -508,17 +509,17 @@ class FullOuterJoinTransformerSpec extends AnyFunSuite with FlinkSpec with Match assert(validationResult.isInvalid) } - private def runProcess( + private def withRunningScenario( testProcess: CanonicalProcess, input1: BlockingQueueSource[OneRecord], input2: BlockingQueueSource[OneRecord], collectingListener: ResultsCollectingListener[Any] - ) = { - val model = modelData(input1, input2, collectingListener) - val stoppableEnv = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(testProcess) - val id = stoppableEnv.executeAndWaitForStart(testProcess.name.value) - (id, stoppableEnv) + )(action: (JobExecutionResult, MiniClusterExecutionEnvironment) => Unit): Unit = { + val model = modelData(input1, input2, collectingListener) + flinkMiniCluster.withExecutionEnvironment { stoppableEnv => + val result = new FlinkScenarioUnitTestJob(model).run(testProcess, stoppableEnv.env) + stoppableEnv.withJobRunning(result.getJobID)(action(result, stoppableEnv)) + } } private def modelData( diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/JavaCollectionsSerializationTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/JavaCollectionsSerializationTest.scala index a12056153d8..6a98953fdfb 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/JavaCollectionsSerializationTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/JavaCollectionsSerializationTest.scala @@ -14,7 +14,7 @@ import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.source.CollectionSource import pl.touk.nussknacker.engine.flink.util.transformer.FlinkBaseComponentProvider import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListener, ResultsCollectingListenerHolder} @@ -80,9 +80,10 @@ class JavaCollectionsSerializationTest extends AnyFunSuite with FlinkSpec with M model: LocalModelData, testProcess: CanonicalProcess ): Unit = { - val stoppableEnv = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(testProcess) - stoppableEnv.executeAndWaitForFinished(testProcess.name.value)() + flinkMiniCluster.withExecutionEnvironment { stoppableEnv => + val executionResult = new FlinkScenarioUnitTestJob(model).run(testProcess, stoppableEnv.env) + stoppableEnv.waitForFinished(executionResult.getJobID) + } } } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ModelUtilExceptionHandlingSpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ModelUtilExceptionHandlingSpec.scala index 6eb9630d79b..0ce4240633b 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ModelUtilExceptionHandlingSpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/ModelUtilExceptionHandlingSpec.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate import cats.data.NonEmptyList import com.typesafe.config.ConfigFactory +import org.apache.flink.api.common.JobExecutionResult import org.scalatest.funsuite.AnyFunSuite import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodeComponentInfo} @@ -12,7 +13,7 @@ import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider import pl.touk.nussknacker.engine.flink.test._ import pl.touk.nussknacker.engine.flink.util.transformer.FlinkBaseComponentProvider import pl.touk.nussknacker.engine.flink.util.transformer.join.BranchType -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.spel.SpelExpressionEvaluationException import pl.touk.nussknacker.engine.testing.LocalModelData @@ -21,11 +22,11 @@ import java.util.UUID class ModelUtilExceptionHandlingSpec extends AnyFunSuite with CorrectExceptionHandlingSpec { - override protected def registerInEnvironment( + override protected def runScenario( env: MiniClusterExecutionEnvironment, modelData: ModelData, scenario: CanonicalProcess - ): Unit = UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(scenario) + ): JobExecutionResult = new FlinkScenarioUnitTestJob(modelData).run(scenario, env.env) private val durationExpression = "T(java.time.Duration).parse('PT1M')" @@ -136,23 +137,22 @@ class ModelUtilExceptionHandlingSpec extends AnyFunSuite with CorrectExceptionHa val sourceComponentDefinition = ComponentDefinition("source", SamplesComponent.create(generator.count)) val enrichedComponents = sourceComponentDefinition :: FlinkBaseComponentProvider.Components ::: FlinkBaseUnboundedComponentProvider.Components - val env = flinkMiniCluster.createExecutionEnvironment() - registerInEnvironment(env, LocalModelData(config, enrichedComponents), scenario) - - env.executeAndWaitForFinished("test")() - - // A bit more complex check, since there are errors from both join sides... - RecordingExceptionConsumer - .exceptionsFor(runId) - .collect { case NuExceptionInfo(Some(NodeComponentInfo("join", _)), e: SpelExpressionEvaluationException, _) => - e.expression - } - .toSet shouldBe Set( - "'right' + '' + (1 / #input[0])", - "'left' + '' + (1 / #input[0])", - "'aggregate' + '' + (1 / #input[1])" - ) + flinkMiniCluster.withExecutionEnvironment { env => + val executionResult = runScenario(env, LocalModelData(config, enrichedComponents), scenario) + env.waitForFinished(executionResult.getJobID) + // A bit more complex check, since there are errors from both join sides... + RecordingExceptionConsumer + .exceptionsFor(runId) + .collect { case NuExceptionInfo(Some(NodeComponentInfo("join", _)), e: SpelExpressionEvaluationException, _) => + e.expression + } + .toSet shouldBe Set( + "'right' + '' + (1 / #input[0])", + "'left' + '' + (1 / #input[0])", + "'aggregate' + '' + (1 / #input[1])" + ) + } } } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/SingleSideJoinTransformerSpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/SingleSideJoinTransformerSpec.scala index 959c528c1eb..b7137f4f48a 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/SingleSideJoinTransformerSpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/SingleSideJoinTransformerSpec.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate import com.typesafe.config.ConfigFactory -import org.apache.flink.api.common.JobExecutionResult +import org.apache.flink.api.common.{JobExecutionResult, JobID} import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.runtime.execution.ExecutionState @@ -22,7 +22,7 @@ import pl.touk.nussknacker.engine.flink.util.sink.EmptySink import pl.touk.nussknacker.engine.flink.util.source.{BlockingQueueSource, EmitWatermarkAfterEachElementCollectionSource} import pl.touk.nussknacker.engine.flink.util.transformer.join.{BranchType, SingleSideJoinTransformer} import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListener, ResultsCollectingListenerHolder} import pl.touk.nussknacker.test.VeryPatientScalaFutures @@ -32,6 +32,7 @@ import java.util.Collections.{emptyList, singletonList} import java.util.concurrent.ConcurrentLinkedQueue import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ +import scala.util.Using class SingleSideJoinTransformerSpec extends AnyFunSuite with FlinkSpec with Matchers with VeryPatientScalaFutures { @@ -89,41 +90,42 @@ class SingleSideJoinTransformerSpec extends AnyFunSuite with FlinkSpec with Matc OneRecord(key, 1, 123) ) - val collectingListener = ResultsCollectingListenerHolder.registerListener - val (id, stoppableEnv) = runProcess(process, input1, input2, collectingListener) - - input1.add(OneRecord(key, 0, -1)) - // We can't be sure that main records will be consumed after matching joined records so we need to wait for them. - eventually { - SingleSideJoinTransformerSpec.elementsAddedToState should have size input2.size - } - input1.add(OneRecord(key, 2, -1)) - input1.finish() + Using.resource(ResultsCollectingListenerHolder.registerListener) { collectingListener => + withRunningScenario(process, input1, input2, collectingListener) { (jobId, stoppableEnv) => + input1.add(OneRecord(key, 0, -1)) + // We can't be sure that main records will be consumed after matching joined records so we need to wait for them. + eventually { + SingleSideJoinTransformerSpec.elementsAddedToState should have size input2.size + } + input1.add(OneRecord(key, 2, -1)) + input1.finish() - stoppableEnv.waitForJobStateWithNotFailingCheck(id.getJobID, process.name.value, ExecutionState.FINISHED)() + stoppableEnv.waitForFinished(jobId) - val outValues = collectingListener.results - .nodeResults(EndNodeId) - .filter(_.variableTyped(KeyVariableName).contains(key)) - .map(_.variableTyped[java.util.Map[String, AnyRef]](OutVariableName).get.asScala) + val outValues = collectingListener.results + .nodeResults(EndNodeId) + .filter(_.variableTyped(KeyVariableName).contains(key)) + .map(_.variableTyped[java.util.Map[String, AnyRef]](OutVariableName).get.asScala) - outValues shouldEqual List( - Map("approxCardinality" -> 0, "last" -> null, "list" -> emptyList(), "sum" -> 0), - Map("approxCardinality" -> 1, "last" -> 123, "list" -> singletonList(123), "sum" -> 123) - ) + outValues shouldEqual List( + Map("approxCardinality" -> 0, "last" -> null, "list" -> emptyList(), "sum" -> 0), + Map("approxCardinality" -> 1, "last" -> 123, "list" -> singletonList(123), "sum" -> 123) + ) + } + } } - private def runProcess( + private def withRunningScenario( testProcess: CanonicalProcess, input1: BlockingQueueSource[OneRecord], input2: List[OneRecord], collectingListener: ResultsCollectingListener[Any] - ): (JobExecutionResult, MiniClusterExecutionEnvironment) = { - val model = modelData(input1, input2, collectingListener) - val stoppableEnv = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(testProcess) - val id = stoppableEnv.executeAndWaitForStart(testProcess.name.value) - (id, stoppableEnv) + )(action: (JobID, MiniClusterExecutionEnvironment) => Unit): Unit = { + val model = modelData(input1, input2, collectingListener) + flinkMiniCluster.withExecutionEnvironment { stoppableEnv => + val result = new FlinkScenarioUnitTestJob(model).run(testProcess, stoppableEnv.env) + stoppableEnv.withJobRunning(result.getJobID)(action(result.getJobID, stoppableEnv)) + } } private def modelData( diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala index e714c4101f0..b2d6655125a 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/TransformersTest.scala @@ -31,7 +31,7 @@ import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentCl import pl.touk.nussknacker.engine.graph.node.{CustomNode, FragmentInputDefinition, FragmentOutputDefinition} import pl.touk.nussknacker.engine.graph.variable.Field import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListener, ResultsCollectingListenerHolder, TestProcess} @@ -736,9 +736,10 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins model: LocalModelData, testProcess: CanonicalProcess ): Unit = { - val stoppableEnv = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(stoppableEnv, model)(testProcess) - stoppableEnv.executeAndWaitForFinished(testProcess.name.value)() + flinkMiniCluster.withExecutionEnvironment { stoppableEnv => + val executionResult = new FlinkScenarioUnitTestJob(model).run(testProcess, stoppableEnv.env) + stoppableEnv.waitForFinished(executionResult.getJobID) + } } private def variablesForKey( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/UnitTestsFlinkRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioUnitTestJob.scala similarity index 71% rename from engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/UnitTestsFlinkRunner.scala rename to engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioUnitTestJob.scala index 716588a4cab..a34788e0b9f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/UnitTestsFlinkRunner.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkScenarioUnitTestJob.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.runner +import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.ProcessVersion @@ -12,13 +13,16 @@ import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConf // This is a temporary solution for unit tests purpose. It is in production code, because we don't have flink-executor-test-utils // module with dependency to flink-executor. We have flink-test-utils but there is a dependency from flink-executor to flink-test-utils. // At the end we should rewrite all tests to TestScenarioRunner.flinkBased -object UnitTestsFlinkRunner { +class FlinkScenarioUnitTestJob(modelData: ModelData) { - def registerInEnvironmentWithModel(env: StreamExecutionEnvironment, modelData: ModelData)( - scenario: CanonicalProcess, - deploymentData: DeploymentData = DeploymentData.empty, - version: ProcessVersion = ProcessVersion.empty - ): Unit = { + def run(scenario: CanonicalProcess, env: StreamExecutionEnvironment): JobExecutionResult = { + registerInEnvironmentWithModel(scenario, env) + env.execute(scenario.name.value) + } + + def registerInEnvironmentWithModel(scenario: CanonicalProcess, env: StreamExecutionEnvironment): Unit = { + val deploymentData = DeploymentData.empty + val version = ProcessVersion.empty val registrar = FlinkProcessRegistrar( new FlinkProcessCompilerDataFactory(modelData), diff --git a/engine/flink/executor/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.exception.FlinkEspExceptionConsumerProvider b/engine/flink/executor/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.exception.FlinkEspExceptionConsumerProvider deleted file mode 100644 index b6632bec090..00000000000 --- a/engine/flink/executor/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.exception.FlinkEspExceptionConsumerProvider +++ /dev/null @@ -1 +0,0 @@ -pl.touk.nussknacker.engine.flink.test.RecordingExceptionConsumerProvider diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/api/EvictableStateTest.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/api/EvictableStateTest.scala index 0aab1e80fce..de18761de02 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/api/EvictableStateTest.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/api/EvictableStateTest.scala @@ -2,15 +2,14 @@ package pl.touk.nussknacker.engine.process.api import com.github.ghik.silencer.silent import org.apache.flink.api.common.state.ValueStateDescriptor -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.util.Collector -import org.scalatest.BeforeAndAfter import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import pl.touk.nussknacker.engine.flink.api.state.EvictableStateFunction -import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.util.source.StaticSource import pl.touk.nussknacker.engine.flink.util.source.StaticSource.{Data, Watermark} import pl.touk.nussknacker.engine.util.ThreadUtils @@ -21,14 +20,26 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} @silent("deprecated") -class EvictableStateTest extends AnyFlatSpec with Matchers with BeforeAndAfter with VeryPatientScalaFutures { +class EvictableStateTest + extends AnyFlatSpec + with Matchers + with BeforeAndAfter + with BeforeAndAfterAll + with VeryPatientScalaFutures { var futureResult: Future[_] = _ + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + + override protected def afterAll(): Unit = { + super.afterAll() + flinkMiniClusterWithServices.close() + } + before { StaticSource.running = true - val env = StreamExecutionEnvironment.createLocalEnvironment(1, FlinkTestConfiguration.configuration()) + val env = flinkMiniClusterWithServices.createStreamExecutionEnvironment(attached = true) env.enableCheckpointing(500) env diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/MetricsSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/MetricsSpec.scala index 03d87974311..018f490f268 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/MetricsSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/MetricsSpec.scala @@ -9,7 +9,6 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.{BeforeAndAfterEach, Outcome} import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} -import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleValueParamName import pl.touk.nussknacker.engine.graph.node.{Case, DeadEndingData, EndingNodeData} import pl.touk.nussknacker.engine.process.helpers.ProcessTestHelpers @@ -259,7 +258,7 @@ class MetricsSpec } override protected def prepareFlinkConfiguration(): Configuration = { - TestReporterUtil.configWithTestMetrics(reporterName, FlinkTestConfiguration.configuration()) + TestReporterUtil.configWithTestMetrics(reporterName, new Configuration()) } } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala index 5bd2dd1435d..8ddb53c6d01 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.process.helpers import com.typesafe.config.Config -import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.scalatest.Suite import pl.touk.nussknacker.engine.api._ @@ -12,21 +11,14 @@ import pl.touk.nussknacker.engine.api.exception.NonTransientException import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.api.typed.TypedMap import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.flink.api.process.{ - BasicFlinkSink, - FlinkCustomNodeContext, - FlinkLazyParameterFunctionHelper, - FlinkSink -} +import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.sink.EmptySink import pl.touk.nussknacker.engine.process.SimpleJavaEnum import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.testing.LocalModelData -import java.util.concurrent.atomic.AtomicInteger - trait ProcessTestHelpers extends FlinkSpec { self: Suite => object processInvoker { @@ -37,21 +29,21 @@ trait ProcessTestHelpers extends FlinkSpec { self: Suite => config: Config = config ): Unit = { val defaultComponents = ProcessTestHelpers.prepareComponents(data) - val env = flinkMiniCluster.createExecutionEnvironment() - val modelData = LocalModelData( - config, - defaultComponents, - configCreator = ProcessTestHelpersConfigCreator - ) - UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(process) - ProcessTestHelpers.logServiceResultsHolder.clear() ProcessTestHelpers.sinkForStringsResultsHolder.clear() ProcessTestHelpers.sinkForIntsResultsHolder.clear() ProcessTestHelpers.eagerOptionalParameterSinkResultsHolder.clear() ProcessTestHelpers.genericParameterSinkResultsHolder.clear() ProcessTestHelpers.optionalEndingCustomResultsHolder.clear() - env.executeAndWaitForFinished(process.name.value)() + flinkMiniCluster.withExecutionEnvironment { env => + val modelData = LocalModelData( + config, + defaultComponents, + configCreator = ProcessTestHelpersConfigCreator + ) + val executionResult = new FlinkScenarioUnitTestJob(modelData).run(process, env.env) + env.waitForFinished(executionResult.getJobID) + } } } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/registrar/FlinkStreamGraphSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/registrar/FlinkStreamGraphSpec.scala index 5bdee09f4d5..6abd31a5b89 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/registrar/FlinkStreamGraphSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/registrar/FlinkStreamGraphSpec.scala @@ -7,7 +7,7 @@ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.process.helpers.{ProcessTestHelpers, ProcessTestHelpersConfigCreator} -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.test.PatientScalaFutures @@ -23,10 +23,11 @@ trait FlinkStreamGraphSpec protected def streamGraph(process: CanonicalProcess, config: Config = ConfigFactory.load()): StreamGraph = { val components = ProcessTestHelpers.prepareComponents(List.empty) - val env = flinkMiniCluster.createExecutionEnvironment() - val modelData = LocalModelData(config, components, configCreator = ProcessTestHelpersConfigCreator) - UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(process) - env.getStreamGraph + flinkMiniCluster.withExecutionEnvironment { env => + val modelData = LocalModelData(config, components, configCreator = ProcessTestHelpersConfigCreator) + new FlinkScenarioUnitTestJob(modelData).registerInEnvironmentWithModel(process, env.env) + env.env.getStreamGraph + } } implicit class EnhancedStreamGraph(graph: StreamGraph) { diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala index 8689d06c41e..b2b7be231e9 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala @@ -9,16 +9,20 @@ import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api._ import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.deployment.DeploymentData -import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory class FlinkStreamingProcessMainSpec extends AnyFlatSpec with Matchers with Inside { import pl.touk.nussknacker.engine.spel.SpelExtension._ + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + + private lazy val streamExecutionEnvironment = flinkMiniClusterWithServices.createStreamExecutionEnvironment(true) + object TestFlinkStreamingProcessMain extends BaseFlinkStreamingProcessMain { override protected def getExecutionEnvironment: StreamExecutionEnvironment = { - StreamExecutionEnvironment.getExecutionEnvironment(FlinkTestConfiguration.configuration()) + streamExecutionEnvironment } } diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala index 7ebaa242c62..502a2f05a7c 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/exception/KafkaExceptionConsumerSpec.scala @@ -12,7 +12,7 @@ import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.kafka.KafkaSpec import pl.touk.nussknacker.engine.process.helpers.SampleNodes import pl.touk.nussknacker.engine.process.helpers.SampleNodes.SimpleRecord -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData @@ -85,24 +85,25 @@ class KafkaExceptionConsumerSpec .filter("shouldFail", "1/{0, 1}[0] != 10".spel) .emptySink("end", "sink") - val env = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(process) - val message = env.withJobRunning(process.name.value) { - val consumed = kafkaClient.createConsumer().consumeWithJson[KafkaExceptionInfo](topicName).take(1).head + flinkMiniCluster.withExecutionEnvironment { env => + val executionResult = new FlinkScenarioUnitTestJob(modelData).run(process, env.env) + val message = env.withJobRunning(executionResult.getJobID) { + val consumed = kafkaClient.createConsumer().consumeWithJson[KafkaExceptionInfo](topicName).take(1).head - consumed.key() shouldBe s"$scenarioName-shouldFail" + consumed.key() shouldBe s"$scenarioName-shouldFail" - consumed.message() - } + consumed.message() + } - message.processName.value shouldBe scenarioName - message.nodeId shouldBe Some("shouldFail") - message.message shouldBe Some("Expression [1/{0, 1}[0] != 10] evaluation failed, message: / by zero") - message.exceptionInput shouldBe Some("1/{0, 1}[0] != 10") - message.stackTrace.value should include("evaluation failed, message:") - message.additionalData shouldBe Map("configurableKey" -> "sampleValue") + message.processName.value shouldBe scenarioName + message.nodeId shouldBe Some("shouldFail") + message.message shouldBe Some("Expression [1/{0, 1}[0] != 10] evaluation failed, message: / by zero") + message.exceptionInput shouldBe Some("1/{0, 1}[0] != 10") + message.stackTrace.value should include("evaluation failed, message:") + message.additionalData shouldBe Map("configurableKey" -> "sampleValue") - message + message + } } def extractInputEventMap(message: KafkaExceptionInfo): Map[String, String] = diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/KafkaSourceFactoryProcessMixin.scala b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/KafkaSourceFactoryProcessMixin.scala index 0e07038b721..fd9631bfd4d 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/KafkaSourceFactoryProcessMixin.scala +++ b/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/KafkaSourceFactoryProcessMixin.scala @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.kafka.KafkaFactory.{SinkValueParamName, TopicP import pl.touk.nussknacker.engine.kafka.source.InputMeta import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryMixin.ObjToSerialize import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator.ResultsHolders -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.test.NuScalaTestAssertions @@ -43,9 +43,10 @@ trait KafkaSourceFactoryProcessMixin } protected def run(process: CanonicalProcess)(action: => Unit): Unit = { - val env = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(process) - env.withJobRunning(process.name.value)(action) + flinkMiniCluster.withExecutionEnvironment { env => + val executionResult = new FlinkScenarioUnitTestJob(modelData).run(process, env.env) + env.withJobRunning(executionResult.getJobID)(action) + } } protected def runAndVerifyResult( diff --git a/engine/flink/management/dev-model/src/test/scala/pl/touk/nussknacker/engine/process/SampleComponentProviderTest.scala b/engine/flink/management/dev-model/src/test/scala/pl/touk/nussknacker/engine/process/SampleComponentProviderTest.scala index 4ffc6862c7c..b2c2a16243c 100644 --- a/engine/flink/management/dev-model/src/test/scala/pl/touk/nussknacker/engine/process/SampleComponentProviderTest.scala +++ b/engine/flink/management/dev-model/src/test/scala/pl/touk/nussknacker/engine/process/SampleComponentProviderTest.scala @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.management.sample.DevProcessConfigCreator -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import pl.touk.nussknacker.engine.{ClassLoaderModelData, ConfigWithUnresolvedVersion} @@ -50,9 +50,10 @@ class SampleComponentProviderTest extends AnyFunSuite with FlinkSpec with Matche ) private def run(process: CanonicalProcess)(action: => Unit): Unit = { - val env = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(process) - env.withJobRunning(process.name.value)(action) + flinkMiniCluster.withExecutionEnvironment { env => + val executionResult = new FlinkScenarioUnitTestJob(modelData).run(process, env.env) + env.withJobRunning(executionResult.getJobID)(action) + } } } diff --git a/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/MiniClusterJobStatusCheckingOps.scala b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/MiniClusterJobStatusCheckingOps.scala new file mode 100644 index 00000000000..7353ee6ed27 --- /dev/null +++ b/engine/flink/minicluster/src/main/scala/pl/touk/nussknacker/engine/flink/minicluster/MiniClusterJobStatusCheckingOps.scala @@ -0,0 +1,144 @@ +package pl.touk.nussknacker.engine.flink.minicluster + +import cats.data.EitherT +import org.apache.flink.api.common.{JobID, JobStatus} +import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, AccessExecutionVertex} +import org.apache.flink.runtime.minicluster.MiniCluster + +import scala.compat.java8.FutureConverters.CompletionStageOps +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters.iterableAsScalaIterableConverter + +object MiniClusterJobStatusCheckingOps { + + implicit class Ops(miniCluster: MiniCluster)(implicit ec: ExecutionContext) { + + def waitForFinished( + jobID: JobID + )(retryPolicy: retry.Policy): Future[Either[JobVerticesStatesCheckError, Unit]] = { + waitForJobVerticesStates(jobID, checkIfNotFailing = true, Set(ExecutionState.FINISHED))(retryPolicy) + } + + def withJobRunning[T](jobID: JobID, retryPolicy: retry.Policy)( + actionToInvokeWithJobRunning: => Future[T] + ): Future[Either[JobVerticesStatesCheckError, T]] = { + val resultFuture = (for { + _ <- EitherT(waitForRunningOrFinished(jobID)(retryPolicy)) + result <- EitherT.right(actionToInvokeWithJobRunning) + _ <- EitherT(assertJobNotFailing[JobVerticesStatesCheckError](jobID)) + } yield result).value + // We transform it to ensure that job will be cancelled + resultFuture.transformWith { resultTry => + for { + _ <- miniCluster.cancelJob(jobID).toScala + _ <- waitForAnyTerminalState(jobID)(retryPolicy) + result <- Future.fromTry(resultTry) + } yield result + } + } + + private def waitForRunningOrFinished( + jobID: JobID + )(retryPolicy: retry.Policy): Future[Either[JobVerticesStatesCheckError, Unit]] = { + waitForJobVerticesStates( + jobID, + checkIfNotFailing = true, + Set(ExecutionState.RUNNING, ExecutionState.FINISHED) + )(retryPolicy) + } + + private def waitForAnyTerminalState( + jobID: JobID + )(retryPolicy: retry.Policy): Future[Either[JobVerticesStatesCheckError, Unit]] = { + waitForJobVerticesStates( + jobID, + checkIfNotFailing = false, + Set(ExecutionState.CANCELED, ExecutionState.FINISHED, ExecutionState.FAILED) + )(retryPolicy) + } + + private def waitForJobVerticesStates( + jobID: JobID, + checkIfNotFailing: Boolean, + // We check vertices states instead of job status because of two reasons: + // 1. Flink reports job as RUNNING even if some of vertices are only scheduled (not running yet) + // 2. We want to more precisely return info about not matching vertices states + expectedVerticesStates: Set[ExecutionState] + )( + retryPolicy: retry.Policy + ): Future[Either[JobVerticesStatesCheckError, Unit]] = + retryPolicy { + (for { + // we have to verify if job is initialized, because otherwise, not all vertices are available so vertices status check would be misleading + executionGraph <- EitherT.right(miniCluster.getExecutionGraph(jobID).toScala): EitherT[ + Future, + JobVerticesStatesCheckError, + AccessExecutionGraph + ] + _ <- EitherT.cond[Future]( + executionGraph.getState != JobStatus.INITIALIZING, + (), + JobIsNotInitializedError(jobID, executionGraph.getJobName) + ) + executionVertices = executionGraph.getAllExecutionVertices.asScala + verticesNotInExpectedState = executionVertices.filterNot(v => + expectedVerticesStates.contains(v.getExecutionState) + ) + _ <- EitherT.cond[Future][JobVerticesStatesCheckError, Unit]( + verticesNotInExpectedState.isEmpty, + (), + JobVerticesNotInExpectedStateError( + executionGraph.getJobID, + executionGraph.getJobName, + verticesNotInExpectedState, + expectedVerticesStates + ) + ) + } yield ()).value + } + + def assertJobNotFailing[E >: JobIsFailingError](jobID: JobID): Future[Either[E, Unit]] = + (for { + executionGraph <- EitherT.right(miniCluster.getExecutionGraph(jobID).toScala) + _ <- assertJobNotFailing[E](executionGraph) + } yield ()).value + + private def assertJobNotFailing[E >: JobIsFailingError]( + executionGraph: AccessExecutionGraph + ): EitherT[Future, E, Unit] = { + EitherT.cond[Future][E, Unit]( + !Set(JobStatus.FAILING, JobStatus.FAILED, JobStatus.RESTARTING).contains(executionGraph.getState), + (), + JobIsFailingError(executionGraph) + ) + } + + } + + sealed abstract class JobVerticesStatesCheckError(msg: String) extends Exception(msg) + + case class JobIsNotInitializedError(jobID: JobID, jobName: String) + extends JobVerticesStatesCheckError(s"Job [id=$jobID, name=$jobName] is not initialized") + + case class JobIsFailingError(executionGraph: AccessExecutionGraph) + extends JobVerticesStatesCheckError( + s"Job [id=${executionGraph.getJobID}, name=${executionGraph.getJobName}] is in failing state. Failure info: ${Option(executionGraph.getFailureInfo).map(_.getExceptionAsString).orNull}" + ) + + case class JobVerticesNotInExpectedStateError( + jobID: JobID, + jobName: String, + verticesNotInExpectedState: Iterable[AccessExecutionVertex], + expectedVerticesStates: Set[ExecutionState] + ) extends JobVerticesStatesCheckError( + verticesNotInExpectedState + .map(rs => s"${rs.getTaskNameWithSubtaskIndex} - ${rs.getExecutionState}") + .mkString( + s"Some vertices of ob [id=$jobID, name=$jobName] are not in expected (${expectedVerticesStates.mkString(", ")}) state): ", + ", ", + "" + ) + ) + +} diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala index bc7335aeb0f..b78342235a2 100644 --- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala +++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala @@ -4,6 +4,7 @@ import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.serialization.SerializerConfigImpl import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.configuration.Configuration import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} @@ -12,7 +13,6 @@ import org.scalatest.Assertion import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.must.Matchers import org.scalatest.prop.TableDrivenPropertyChecks -import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration import pl.touk.nussknacker.test.ProcessUtils.convertToAnyShouldWrapper import java.io.{ByteArrayInputStream, ByteArrayOutputStream} @@ -30,8 +30,7 @@ class ConsumerRecordSerializerSpec extends AnyFunSuite with Matchers with TableD private val serializerConfig = { val executionConfig = new ExecutionConfig() - val configuration = FlinkTestConfiguration.configuration() - new SerializerConfigImpl(configuration, executionConfig) + new SerializerConfigImpl(new Configuration, executionConfig) } test("should serialize and deserialize simple record") { diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala index 31bf3049c0d..74c94774627 100644 --- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala +++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala @@ -31,7 +31,7 @@ import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer.{ ProcessSettingsPreparer, UnoptimizedSerializationPreparer } -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer._ import pl.touk.nussknacker.engine.schemedkafka.kryo.AvroSerializersRegistrar @@ -212,9 +212,10 @@ trait KafkaAvroSpecMixin } protected def run(process: CanonicalProcess)(action: => Unit): Unit = { - val env = flinkMiniCluster.createExecutionEnvironment() - UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(process) - env.withJobRunning(process.name.value)(action) + flinkMiniCluster.withExecutionEnvironment { env => + val executionResult = new FlinkScenarioUnitTestJob(modelData).run(process, env.env) + env.withJobRunning(executionResult.getJobID)(action) + } } sealed trait SourceAvroParam { diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/KafkaUniversalSinkExceptionHandlingSpec.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/KafkaUniversalSinkExceptionHandlingSpec.scala index 3f35e05ca7c..ab882d78a13 100644 --- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/KafkaUniversalSinkExceptionHandlingSpec.scala +++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/KafkaUniversalSinkExceptionHandlingSpec.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.schemedkafka.sink.flink import cats.data.NonEmptyList import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import org.apache.flink.api.common.JobExecutionResult import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.ModelData @@ -12,7 +13,7 @@ import pl.touk.nussknacker.engine.build.GraphBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.flink.test.{CorrectExceptionHandlingSpec, FlinkSpec, MiniClusterExecutionEnvironment} import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName.ToUnspecializedTopicName -import pl.touk.nussknacker.engine.process.runner.UnitTestsFlinkRunner +import pl.touk.nussknacker.engine.process.runner.FlinkScenarioUnitTestJob import pl.touk.nussknacker.engine.schemedkafka.KafkaAvroIntegrationMockSchemaRegistry.schemaRegistryMockClient import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer._ import pl.touk.nussknacker.engine.schemedkafka.helpers.SchemaRegistryMixin @@ -36,11 +37,11 @@ class KafkaUniversalSinkExceptionHandlingSpec override protected def schemaRegistryClient: SchemaRegistryClient = schemaRegistryMockClient - override protected def registerInEnvironment( + override protected def runScenario( env: MiniClusterExecutionEnvironment, modelData: ModelData, scenario: CanonicalProcess - ): Unit = UnitTestsFlinkRunner.registerInEnvironmentWithModel(env, modelData)(scenario) + ): JobExecutionResult = new FlinkScenarioUnitTestJob(modelData).run(scenario, env.env) test("should handle exceptions in kafka sinks") { registerSchema(topic.toUnspecialized, FullNameV1.schema, isKey = false) diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/CorrectExceptionHandlingSpec.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/CorrectExceptionHandlingSpec.scala index f6f7c6dedce..7523bb3ad18 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/CorrectExceptionHandlingSpec.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/CorrectExceptionHandlingSpec.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.flink.test import cats.data.NonEmptyList +import org.apache.flink.api.common.JobExecutionResult import org.scalatest.Suite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.ModelData @@ -31,25 +32,25 @@ trait CorrectExceptionHandlingSpec extends FlinkSpec with Matchers { val scenario = ScenarioBuilder.streaming("test").sources(start, rest: _*) val sourceComponentDefinition = ComponentDefinition("source", SamplesComponent.create(generator.count)) - val env = flinkMiniCluster.createExecutionEnvironment() - registerInEnvironment( - env, - LocalModelData(config, sourceComponentDefinition :: components), - scenario - ) - - env.executeAndWaitForFinished("test")() + flinkMiniCluster.withExecutionEnvironment { env => + val executionResult = runScenario( + env, + LocalModelData(config, sourceComponentDefinition :: components), + scenario + ) + env.waitForFinished(executionResult.getJobID) + } RecordingExceptionConsumer.exceptionsFor(runId) should have length generator.count } /** * TestFlinkRunner should be invoked, it's not accessible in this module */ - protected def registerInEnvironment( + protected def runScenario( env: MiniClusterExecutionEnvironment, modelData: ModelData, scenario: CanonicalProcess - ): Unit + ): JobExecutionResult class ExceptionGenerator { diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkMiniClusterHolder.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkMiniClusterHolder.scala index d9aa3b7bd0f..e6dd9e8f9da 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkMiniClusterHolder.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkMiniClusterHolder.scala @@ -1,120 +1,47 @@ package pl.touk.nussknacker.engine.flink.test -import com.github.ghik.silencer.silent - -import java.util.concurrent.CompletableFuture -import org.apache.flink.api.common.{JobID, JobStatus} -import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration._ -import org.apache.flink.runtime.client.JobStatusMessage -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph -import org.apache.flink.runtime.jobgraph.JobGraph -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration -import org.apache.flink.test.util.MiniClusterWithClientResource -import org.scalatest.concurrent.Eventually.{scaled, _} -import org.scalatest.time.{Millis, Seconds, Span} -import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder._ - -import scala.jdk.CollectionConverters._ - -/** - * This interface provides compatibility for another Flink's version. - * Instance of mini cluster holder should be created only once for many jobs. - */ -trait FlinkMiniClusterHolder { - - protected def userFlinkClusterConfig: Configuration - - protected def envConfig: AdditionalEnvironmentConfig - - def start(): Unit +import org.apache.flink.runtime.minicluster.MiniCluster +import pl.touk.nussknacker.engine.flink.minicluster.{FlinkMiniClusterFactory, FlinkMiniClusterWithServices} +import pl.touk.nussknacker.engine.util.loader.ModelClassLoader - def stop(): Unit +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.Using - def cancelJob(jobID: JobID): Unit +class FlinkMiniClusterHolder( + miniClusterWithServices: FlinkMiniClusterWithServices, +) extends AutoCloseable { - def submitJob(jobGraph: JobGraph): JobID + def miniCluster: MiniCluster = miniClusterWithServices.miniCluster - def runningJobs(): Iterable[JobID] - - def listJobs(): Iterable[JobStatusMessage] - - def createExecutionEnvironment(): MiniClusterExecutionEnvironment = { - new MiniClusterExecutionEnvironment(this, userFlinkClusterConfig, envConfig) + override def close(): Unit = { + miniClusterWithServices.close() } - // We access miniCluster because ClusterClient doesn't expose getExecutionGraph and getJobStatus doesn't satisfy us - // It returns RUNNING even when some vertices are not started yet - def getExecutionGraph(jobId: JobID): CompletableFuture[_ <: AccessExecutionGraph] - -} - -class FlinkMiniClusterHolderImpl( - flinkMiniCluster: MiniClusterWithClientResource, - protected val userFlinkClusterConfig: Configuration, - protected val envConfig: AdditionalEnvironmentConfig -) extends FlinkMiniClusterHolder { - - override def start(): Unit = { - flinkMiniCluster.before() + def withExecutionEnvironment[T](action: MiniClusterExecutionEnvironment => T): T = { + Using.resource(createExecutionEnvironment)(action) } - override def stop(): Unit = { - flinkMiniCluster.after() + private def createExecutionEnvironment: MiniClusterExecutionEnvironment = { + new MiniClusterExecutionEnvironment( + miniCluster = miniClusterWithServices.miniCluster, + env = miniClusterWithServices.createStreamExecutionEnvironment(attached = false) + ) } - override def cancelJob(jobID: JobID): Unit = - flinkMiniCluster.getClusterClient.cancel(jobID) - - override def submitJob(jobGraph: JobGraph): JobID = - flinkMiniCluster.getClusterClient.submitJob(jobGraph).get() - - override def listJobs(): List[JobStatusMessage] = - flinkMiniCluster.getClusterClient.listJobs().get().asScala.toList - - override def runningJobs(): List[JobID] = - listJobs().filter(_.getJobState == JobStatus.RUNNING).map(_.getJobId) - - def getClusterClient: ClusterClient[_] = flinkMiniCluster.getClusterClient - - override def getExecutionGraph(jobId: JobID): CompletableFuture[_ <: AccessExecutionGraph] = - flinkMiniCluster.getMiniCluster.getExecutionGraph(jobId) - } object FlinkMiniClusterHolder { def apply( - userFlinkClusterConfig: Configuration, - envConfig: AdditionalEnvironmentConfig = AdditionalEnvironmentConfig() + userFlinkClusterConfig: Configuration ): FlinkMiniClusterHolder = { - userFlinkClusterConfig.set[java.lang.Boolean](CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true) - val resource = prepareMiniClusterResource(userFlinkClusterConfig) - new FlinkMiniClusterHolderImpl(resource, userFlinkClusterConfig, envConfig) + val miniclusterWithServices = FlinkMiniClusterFactory.createMiniClusterWithServices( + ModelClassLoader.flinkWorkAroundEmptyClassloader, + userFlinkClusterConfig, + new Configuration() + ) + new FlinkMiniClusterHolder(miniclusterWithServices) } - @silent("deprecated") - def prepareMiniClusterResource(userFlinkClusterConfig: Configuration): MiniClusterWithClientResource = { - val taskManagerNumber = ConfigOptions - .key(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER) - .intType() - .defaultValue(ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER) - val clusterConfig: MiniClusterResourceConfiguration = new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(userFlinkClusterConfig.get(taskManagerNumber)) - .setNumberSlotsPerTaskManager( - userFlinkClusterConfig - .getInteger(TaskManagerOptions.NUM_TASK_SLOTS, TaskManagerOptions.NUM_TASK_SLOTS.defaultValue()) - ) - .setConfiguration(userFlinkClusterConfig) - .build - new MiniClusterWithClientResource(clusterConfig) - } - - case class AdditionalEnvironmentConfig( - detachedClient: Boolean = true, - // On the CI, 10 seconds is sometimes too low - defaultWaitForStatePatience: PatienceConfig = - PatienceConfig(timeout = scaled(Span(20, Seconds)), interval = scaled(Span(10, Millis))) - ) - } diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkSpec.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkSpec.scala index d1b3fdd140b..b2f3328121d 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkSpec.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkSpec.scala @@ -4,7 +4,6 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigValueFactory.fromAnyRef import org.apache.flink.configuration.Configuration import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Suite} -import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder.AdditionalEnvironmentConfig import pl.touk.nussknacker.test.WithConfig import java.util.UUID @@ -20,8 +19,7 @@ trait FlinkSpec extends BeforeAndAfterAll with BeforeAndAfter with WithConfig { override protected def beforeAll(): Unit = { super.beforeAll() - flinkMiniCluster = createFlinkMiniClusterHolder() - flinkMiniCluster.start() + flinkMiniCluster = FlinkMiniClusterHolder(prepareFlinkConfiguration()) } override protected def resolveConfig(config: Config): Config = @@ -33,26 +31,15 @@ trait FlinkSpec extends BeforeAndAfterAll with BeforeAndAfter with WithConfig { ) // avoid long waits for closing on test Flink minicluster, it's needed for proper testing /** - * Override this when you use own Configuration implementation (e.g. Flink 1.9) + * Override this when you use own Configuration implementation */ protected def prepareFlinkConfiguration(): Configuration = { - FlinkTestConfiguration.configuration() - } - - protected def prepareEnvConfig(): AdditionalEnvironmentConfig = { - AdditionalEnvironmentConfig() - } - - /** - * Override this when you use own FlikMiniClusterHolder implementation (e.g. Flink 1.9) - */ - protected def createFlinkMiniClusterHolder(): FlinkMiniClusterHolder = { - FlinkMiniClusterHolder(prepareFlinkConfiguration(), prepareEnvConfig()) + new Configuration } override protected def afterAll(): Unit = { try { - flinkMiniCluster.stop() + flinkMiniCluster.close() } finally { super.afterAll() } diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala deleted file mode 100644 index edb7017c770..00000000000 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala +++ /dev/null @@ -1,42 +0,0 @@ -package pl.touk.nussknacker.engine.flink.test - -import com.github.ghik.silencer.silent -import org.apache.flink.configuration._ - -import java.net.URL - -object FlinkTestConfiguration { - - // better to create each time because is mutable - @silent("deprecated") def configuration(taskManagersCount: Int = 2, taskSlotsCount: Int = 8): Configuration = { - import scala.collection.JavaConverters._ - - val config = new Configuration - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagersCount) - config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskSlotsCount) - - config.set(PipelineOptions.CLASSPATHS, classpathWorkaround.asJava) - - setupMemory(config) - } - - // FIXME: better describe which classpath is used in this case - // This is a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265 - // Flink overwrite user classloader by the AppClassLoader if classpaths parameter is empty - // (implementation in org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager) - // which holds all needed jars/classes in case of running from Scala plugin in IDE. - // but in case of running from sbt it contains only sbt-launcher.jar - def classpathWorkaround: List[String] = List("http://dummy-classpath.invalid") - - def setupMemory(config: Configuration): Configuration = { - // to prevent OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool on low memory env (like Travis) - config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m")) - config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m")) - // This is to prevent memory problem in tests with mutliple Table API based aggregations. An IllegalArgExceptionon - // is thrown with message "The minBucketMemorySize is not valid!" in - // org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap.java:121 where memorySize is set - // inside code-generated operator (like LocalHashAggregateWithKeys). - config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("100m")) - } - -} diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/MiniClusterExecutionEnvironment.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/MiniClusterExecutionEnvironment.scala index 4eb932f4426..c9000927ac0 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/MiniClusterExecutionEnvironment.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/MiniClusterExecutionEnvironment.scala @@ -1,172 +1,63 @@ package pl.touk.nussknacker.engine.flink.test import com.typesafe.scalalogging.LazyLogging -import org.apache.flink.api.common.{JobExecutionResult, JobID, JobStatus} -import org.apache.flink.client.deployment.executors.PipelineExecutorUtils -import org.apache.flink.configuration._ -import org.apache.flink.runtime.execution.ExecutionState -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings +import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.minicluster.MiniCluster import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.streaming.api.graph.StreamGraph -import org.apache.flink.util.OptionalFailure -import org.scalactic.source.Position -import org.scalatest.Assertion -import org.scalatest.concurrent.Eventually -import org.scalatest.enablers.Retrying -import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder.AdditionalEnvironmentConfig +import org.scalatest.concurrent.ScalaFutures.{PatienceConfig, convertScalaFuture, scaled} +import org.scalatest.time.{Millis, Seconds, Span} +import pl.touk.nussknacker.engine.flink.minicluster.MiniClusterJobStatusCheckingOps._ +import pl.touk.nussknacker.engine.flink.test.MiniClusterExecutionEnvironment._ -import scala.jdk.CollectionConverters._ +import scala.concurrent.{ExecutionContext, Future, blocking} class MiniClusterExecutionEnvironment( - flinkMiniClusterHolder: FlinkMiniClusterHolder, - userFlinkClusterConfig: Configuration, - envConfig: AdditionalEnvironmentConfig -) extends StreamExecutionEnvironment(userFlinkClusterConfig) - with LazyLogging - with Matchers { - - // Warning: this method assume that will be one job for all checks inside action. We highly recommend to execute - // job once per test class and then do many concurrent scenarios basing on own unique keys in input. - // Running multiple parallel instances of job in one test class can cause stealing of data from sources between those instances. - def withJobRunning[T](jobName: String)(actionToInvokeWithJobRunning: => T): T = - withJobRunning(jobName, _ => actionToInvokeWithJobRunning) - - def withJobRunning[T](jobName: String, actionToInvokeWithJobRunning: JobExecutionResult => T): T = { - val executionResult: JobExecutionResult = executeAndWaitForStart(jobName) - try { - val res = actionToInvokeWithJobRunning(executionResult) - val jobID = executionResult.getJobID - assertJobNotFailing(jobID) - res - } finally { - stopJob(jobName, executionResult) - } - } - - def stopJob(jobName: String, executionResult: JobExecutionResult): Unit = { - stopJob(jobName, executionResult.getJobID) - } - - def stopJob(jobName: String, jobID: JobID): Unit = { - flinkMiniClusterHolder.cancelJob(jobID) - waitForJobState(jobID, jobName, ExecutionState.CANCELED, ExecutionState.FINISHED, ExecutionState.FAILED)() - cleanupGraph() - } - - def executeAndWaitForStart(jobName: String): JobExecutionResult = { - val res = execute(jobName) - waitForStart(res.getJobID, jobName)() - res - } - - def executeAndWaitForFinished( - jobName: String - )(patience: Eventually.PatienceConfig = envConfig.defaultWaitForStatePatience): JobExecutionResult = { - val res = execute(jobName) - waitForJobStatusWithAdditionalCheck(res.getJobID, jobName, assertJobNotFailing(res.getJobID), JobStatus.FINISHED)( - patience - ) - res - } - - def waitForStart(jobID: JobID, name: String)( - patience: Eventually.PatienceConfig = envConfig.defaultWaitForStatePatience + miniCluster: MiniCluster, + val env: StreamExecutionEnvironment +)(implicit ec: ExecutionContext) + extends AutoCloseable + with LazyLogging { + + def waitForFinished( + jobID: JobID ): Unit = { - waitForJobStateWithNotFailingCheck(jobID, name, ExecutionState.RUNNING, ExecutionState.FINISHED)(patience) + miniCluster.waitForFinished(jobID)(toRetryPolicy(WaitForJobStatusPatience)).futureValue.toTry.get } - def waitForJobStateWithNotFailingCheck(jobID: JobID, name: String, expectedState: ExecutionState*)( - patience: Eventually.PatienceConfig = envConfig.defaultWaitForStatePatience - ): Unit = { - waitForJobStateWithAdditionalCheck(jobID, name, assertJobNotFailing(jobID), expectedState: _*)(patience) - } - - def waitForJobState(jobID: JobID, name: String, expectedState: ExecutionState*)( - patience: Eventually.PatienceConfig = envConfig.defaultWaitForStatePatience - ): Unit = { - waitForJobStateWithAdditionalCheck(jobID, name, {}, expectedState: _*)(patience) - } - - def waitForJobStatusWithAdditionalCheck( - jobID: JobID, - name: String, - additionalChecks: => Unit, - expectedJobStatus: JobStatus - )( - patience: Eventually.PatienceConfig = envConfig.defaultWaitForStatePatience - ): Unit = { - Eventually.eventually { - val executionGraph = flinkMiniClusterHolder.getExecutionGraph(jobID).get() - additionalChecks - assert( - executionGraph.getState.equals(expectedJobStatus), - s"Job $name does not have expected status: $expectedJobStatus" - ) - }(patience, implicitly[Retrying[Assertion]], implicitly[Position]) - } - - def waitForJobStateWithAdditionalCheck( - jobID: JobID, - name: String, - additionalChecks: => Unit, - expectedState: ExecutionState* - )( - patience: Eventually.PatienceConfig = envConfig.defaultWaitForStatePatience - ): Unit = { - Eventually.eventually { - val executionGraph = flinkMiniClusterHolder.getExecutionGraph(jobID).get() - // we have to verify if job is initialized, because otherwise, not all vertices are available so vertices status check - // would be misleading - assertJobInitialized(executionGraph) - additionalChecks - val executionVertices = executionGraph.getAllExecutionVertices.asScala - val notInExpectedState = executionVertices.filterNot(v => expectedState.contains(v.getExecutionState)) - assert( - notInExpectedState.isEmpty, - notInExpectedState - .map(rs => s"${rs.getTaskNameWithSubtaskIndex} - ${rs.getExecutionState}") - .mkString(s"Some vertices of $name are not in expected (${expectedState.mkString(", ")}) state): ", ", ", "") - ) - }(patience, implicitly[Retrying[Assertion]], implicitly[Position]) - } - - // Protected, to be overridden in Flink < 1.13 compatibility layer - protected def assertJobInitialized(executionGraph: AccessExecutionGraph): Assertion = { - assert(executionGraph.getState != JobStatus.INITIALIZING) + def withJobRunning[T](jobID: JobID)(actionToInvokeWithJobRunning: => T): T = { + miniCluster + .withJobRunning(jobID, toRetryPolicy(WaitForJobStatusPatience)) { + Future { + blocking { + actionToInvokeWithJobRunning + } + } + } + .futureValue + .toTry + .get } def assertJobNotFailing(jobID: JobID): Unit = { - val executionGraph = flinkMiniClusterHolder.getExecutionGraph(jobID).get() - assert( - !Set(JobStatus.FAILING, JobStatus.FAILED, JobStatus.RESTARTING).contains(executionGraph.getState), - s"Job: $jobID has failing state. Failure info: ${Option(executionGraph.getFailureInfo).map(_.getExceptionAsString).orNull}" - ) + miniCluster.assertJobNotFailing(jobID).futureValue.toTry.get } - override def execute(streamGraph: StreamGraph): JobExecutionResult = { - val jobGraph = PipelineExecutorUtils.getJobGraph(streamGraph, userFlinkClusterConfig, getUserClassloader) - if (jobGraph.getSavepointRestoreSettings == SavepointRestoreSettings.none) { - // similar behaviour to MiniClusterExecutor.execute - PipelineExecutorUtils.getJobGraph overrides a few settings done directly on StreamGraph by settings from configuration - jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings) - } - - logger.debug("Running job on local embedded Flink flinkMiniCluster cluster") - - jobGraph.getJobConfiguration.addAll(userFlinkClusterConfig) + override def close(): Unit = { + env.close() + } - val jobId = flinkMiniClusterHolder.submitJob(jobGraph) +} - new JobExecutionResult(jobId, 0, new java.util.HashMap[String, OptionalFailure[AnyRef]]()) - } +object MiniClusterExecutionEnvironment { - def cancel(jobId: JobID): Unit = - flinkMiniClusterHolder.cancelJob(jobId) + private implicit val WaitForJobStatusPatience: PatienceConfig = + PatienceConfig(timeout = scaled(Span(20, Seconds)), interval = scaled(Span(10, Millis))) - // this *has* to be done between tests, otherwise next .execute() will execute also current operators - def cleanupGraph(): Unit = { - transformations.clear() + private def toRetryPolicy(patience: PatienceConfig) = { + val maxAttempts = Math.max(Math.round(patience.timeout / patience.interval).toInt, 1) + val delta = Span(50, Millis) + val interval = (patience.timeout - delta) / maxAttempts + retry.Pause(maxAttempts, interval) } } diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala index 36f865bb16b..8cdea32f351 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala @@ -161,9 +161,11 @@ abstract class FlinkWithKafkaSuite ) protected def run(process: CanonicalProcess)(action: => Unit): Unit = { - val env = flinkMiniCluster.createExecutionEnvironment() - registrar.register(env, process, ProcessVersion.empty, DeploymentData.empty) - env.withJobRunning(process.name.value)(action) + flinkMiniCluster.withExecutionEnvironment { env => + registrar.register(env.env, process, ProcessVersion.empty, DeploymentData.empty) + val executionResult = env.env.execute() + env.withJobRunning(executionResult.getJobID)(action) + } } protected def sendAvro( diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/SerializationTest.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/SerializationTest.scala index 5f81073328e..f1cdf7d5b76 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/SerializationTest.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/SerializationTest.scala @@ -2,25 +2,37 @@ package pl.touk.nussknacker.defaultmodel import com.typesafe.config.ConfigFactory import com.typesafe.scalalogging.LazyLogging +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.defaultmodel.SerializationTest.DataStructureWithOptionals import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.util.test.{RunResult, TestScenarioRunner} import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage._ -class SerializationTest extends FlinkWithKafkaSuite with LazyLogging { +class SerializationTest extends AnyFunSuite with Matchers with LazyLogging with BeforeAndAfterAll { import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.spel.SpelExtension._ + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + + private lazy val testScenarioRunner = TestScenarioRunner + .flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices) + .build() + + override protected def afterAll(): Unit = { + super.afterAll() + flinkMiniClusterWithServices.close() + } + test("some serialization test") { val scenario = ScenarioBuilder .streaming("serialization-test") .parallelism(1) .source("start", TestScenarioRunner.testDataSource) .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#input.field2".spel) - val testScenarioRunner = TestScenarioRunner - .flinkBased(ConfigFactory.empty(), flinkMiniCluster) - .build() val result = testScenarioRunner.runWithData(scenario, List(DataStructureWithOptionals("firstField", Option("optionalField")))) result.validValue shouldBe RunResult.success("optionalField") diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/StateCompatibilityTest.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/StateCompatibilityTest.scala index 956d1175494..cb31940896a 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/StateCompatibilityTest.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/StateCompatibilityTest.scala @@ -14,7 +14,6 @@ import pl.touk.nussknacker.engine.api.validation.ValidationMode import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData -import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolderImpl import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ExistingSchemaVersion import pl.touk.nussknacker.engine.util.config.ScalaMajorVersionConfig @@ -132,7 +131,6 @@ class StateCompatibilityTest extends FlinkWithKafkaSuite with PatientScalaFuture val inputTopicConfig = createAndRegisterAvroTopicConfig(inTopic, RecordSchemaV1) val outputTopicConfig = createAndRegisterTopicConfig(outTopic, JsonSchemaV1) - val clusterClient = flinkMiniCluster.asInstanceOf[FlinkMiniClusterHolderImpl].getClusterClient sendAvro(givenMatchingAvroObj, inputTopicConfig.input) run( @@ -141,8 +139,8 @@ class StateCompatibilityTest extends FlinkWithKafkaSuite with PatientScalaFuture verifyOutputEvent(outputTopicConfig.output, input = event1, previousInput = event1) val savepointLocation = eventually { - clusterClient - .triggerSavepoint(jobExecutionResult.getJobID, savepointDir.toString, SavepointFormatType.DEFAULT) + flinkMiniCluster.miniCluster + .triggerSavepoint(jobExecutionResult.getJobID, savepointDir.toString, false, SavepointFormatType.DEFAULT) .get() } @@ -156,24 +154,25 @@ class StateCompatibilityTest extends FlinkWithKafkaSuite with PatientScalaFuture val outputTopicConfig = createAndRegisterTopicConfig(outTopic, JsonSchemaV1) val existingSavepointLocation = Files.list(savepointDir).iterator().asScala.toList.head - val env = flinkMiniCluster.createExecutionEnvironment() val process1 = stateCompatibilityProcess(inputTopicConfig.input, outputTopicConfig.output) - registrar.register(env, process1, ProcessVersion.empty, DeploymentData.empty) - val streamGraph = env.getStreamGraph - val allowNonRestoredState = false - streamGraph.setSavepointRestoreSettings( - SavepointRestoreSettings.forPath(existingSavepointLocation.toString, allowNonRestoredState) - ) - // Send one artificial message to mimic offsets saved in savepoint from the above test because kafka commit cannot be performed. - sendAvro(givenMatchingAvroObj, inputTopicConfig.input).futureValue - - val jobExecutionResult = env.execute(streamGraph) - env.waitForStart(jobExecutionResult.getJobID, process1.name.value)() - sendAvro(givenNotMatchingAvroObj, inputTopicConfig.input).futureValue - - env.assertJobNotFailing(jobExecutionResult.getJobID) - verifyOutputEvent(outputTopicConfig.output, input = event2, previousInput = event1) - env.stopJob(process1.name.value, jobExecutionResult) + flinkMiniCluster.withExecutionEnvironment { env => + registrar.register(env.env, process1, ProcessVersion.empty, DeploymentData.empty) + val streamGraph = env.env.getStreamGraph + val allowNonRestoredState = false + streamGraph.setSavepointRestoreSettings( + SavepointRestoreSettings.forPath(existingSavepointLocation.toString, allowNonRestoredState) + ) + // Send one artificial message to mimic offsets saved in savepoint from the above test because kafka commit cannot be performed. + sendAvro(givenMatchingAvroObj, inputTopicConfig.input).futureValue + + val jobExecutionResult = env.env.execute(streamGraph) + env.withJobRunning(jobExecutionResult.getJobID) { + sendAvro(givenNotMatchingAvroObj, inputTopicConfig.input).futureValue + + env.assertJobNotFailing(jobExecutionResult.getJobID) + verifyOutputEvent(outputTopicConfig.output, input = event2, previousInput = event1) + } + } } private def verifyOutputEvent(outTopic: TopicName.ForSink, input: InputEvent, previousInput: InputEvent): Unit = { @@ -191,9 +190,11 @@ class StateCompatibilityTest extends FlinkWithKafkaSuite with PatientScalaFuture } private def run(process: CanonicalProcess, action: JobExecutionResult => Unit): Unit = { - val env = flinkMiniCluster.createExecutionEnvironment() - registrar.register(env, process, ProcessVersion.empty, DeploymentData.empty) - env.withJobRunning(process.name.value, action) + flinkMiniCluster.withExecutionEnvironment { env => + registrar.register(env.env, process, ProcessVersion.empty, DeploymentData.empty) + val executionResult = env.env.execute(process.name.value) + env.withJobRunning(executionResult.getJobID)(action(executionResult)) + } } } diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala index 58194bbddf4..cb2cae6b3f6 100644 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala +++ b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala @@ -2,8 +2,11 @@ package pl.touk.nussknacker.engine.flink.util.test import com.typesafe.config.{Config, ConfigValueFactory} import org.apache.flink.api.connector.source.Boundedness +import org.scalatest.concurrent.Futures.PatienceConfig +import org.scalatest.concurrent.ScalaFutures.scaled +import org.scalatest.time.{Millis, Seconds, Span} import pl.touk.nussknacker.defaultmodel.DefaultConfigCreator -import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} +import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodesDeploymentData} import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, SourceFactory} import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult, Unknown} @@ -11,7 +14,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.flink.FlinkBaseUnboundedComponentProvider import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler -import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterWithServices import pl.touk.nussknacker.engine.flink.util.source.CollectionSource import pl.touk.nussknacker.engine.flink.util.test.TestResultSinkFactory.Output import pl.touk.nussknacker.engine.flink.util.test.testComponents._ @@ -25,9 +28,13 @@ import pl.touk.nussknacker.engine.testmode.TestRunId import pl.touk.nussknacker.engine.util.test.TestScenarioCollectorHandler.TestScenarioCollectorHandler import pl.touk.nussknacker.engine.util.test.TestScenarioRunner.{RunnerListResult, RunnerResultUnit} import pl.touk.nussknacker.engine.util.test._ +import pl.touk.nussknacker.engine.flink.minicluster.MiniClusterJobStatusCheckingOps._ +import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner.{WaitForJobStatusPatience, toRetryPolicy} +import scala.concurrent.ExecutionContext.Implicits.global import scala.reflect.ClassTag import scala.util.Using +import org.scalatest.concurrent.ScalaFutures.convertScalaFuture private object testComponents { @@ -69,7 +76,7 @@ class FlinkTestScenarioRunner( val components: List[ComponentDefinition], val globalVariables: Map[String, AnyRef], val config: Config, - flinkMiniCluster: FlinkMiniClusterHolder, + flinkMiniClusterWithServices: FlinkMiniClusterWithServices, componentUseCase: ComponentUseCase, ) extends ClassBasedTestScenarioRunner { @@ -180,10 +187,10 @@ class FlinkTestScenarioRunner( configCreator = new DefaultConfigCreator ) - // TODO: get flink mini cluster through composition - val env = flinkMiniCluster.createExecutionEnvironment() - - Using.resource(TestScenarioCollectorHandler.createHandler(componentUseCase)) { testScenarioCollectorHandler => + Using.resources( + flinkMiniClusterWithServices.createStreamExecutionEnvironment(attached = true), + TestScenarioCollectorHandler.createHandler(componentUseCase) + ) { (env, testScenarioCollectorHandler) => val compilerFactory = FlinkProcessCompilerDataFactoryWithTestComponents( testExtensionsHolder, @@ -216,7 +223,12 @@ class FlinkTestScenarioRunner( testScenarioCollectorHandler.resultCollector ) - env.executeAndWaitForFinished(scenario.name.value)() + val jobExecutionResult = env.execute(scenario.name.value) + flinkMiniClusterWithServices.miniCluster + .waitForFinished(jobExecutionResult.getJobID)(toRetryPolicy(WaitForJobStatusPatience)) + .futureValue + .toTry + .get val successes = TestResultSinkFactory.extractOutputFor(testExtensionsHolder.runId) match { case Output.NotAvailable => @@ -253,19 +265,38 @@ object FlinkTestScenarioRunner { implicit class FlinkTestScenarioRunnerExt(testScenarioRunner: TestScenarioRunner.type) { - def flinkBased(config: Config, flinkMiniCluster: FlinkMiniClusterHolder): FlinkTestScenarioRunnerBuilder = { - FlinkTestScenarioRunnerBuilder(List.empty, Map.empty, config, flinkMiniCluster, testRuntimeMode = false) + def flinkBased( + config: Config, + flinkMiniClusterWithServices: FlinkMiniClusterWithServices + ): FlinkTestScenarioRunnerBuilder = { + FlinkTestScenarioRunnerBuilder( + List.empty, + Map.empty, + config, + flinkMiniClusterWithServices, + testRuntimeMode = false + ) } } + private implicit val WaitForJobStatusPatience: PatienceConfig = + PatienceConfig(timeout = scaled(Span(20, Seconds)), interval = scaled(Span(10, Millis))) + + private def toRetryPolicy(patience: PatienceConfig) = { + val maxAttempts = Math.max(Math.round(patience.timeout / patience.interval).toInt, 1) + val delta = Span(50, Millis) + val interval = (patience.timeout - delta) / maxAttempts + retry.Pause(maxAttempts, interval) + } + } case class FlinkTestScenarioRunnerBuilder( components: List[ComponentDefinition], globalVariables: Map[String, AnyRef], config: Config, - flinkMiniCluster: FlinkMiniClusterHolder, + flinkMiniClusterWithServices: FlinkMiniClusterWithServices, testRuntimeMode: Boolean ) extends TestScenarioRunnerBuilder[FlinkTestScenarioRunner, FlinkTestScenarioRunnerBuilder] { @@ -291,7 +322,7 @@ case class FlinkTestScenarioRunnerBuilder( components, globalVariables, config, - flinkMiniCluster, + flinkMiniClusterWithServices, componentUseCase(testRuntimeMode) ) diff --git a/utils/flink-components-testkit/src/test/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunnerSpec.scala b/utils/flink-components-testkit/src/test/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunnerSpec.scala index 1128caeac6f..728a7492ab0 100644 --- a/utils/flink-components-testkit/src/test/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunnerSpec.scala +++ b/utils/flink-components-testkit/src/test/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunnerSpec.scala @@ -1,5 +1,7 @@ package pl.touk.nussknacker.engine.flink.util.test +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api._ @@ -8,7 +10,7 @@ import pl.touk.nussknacker.engine.api.process.ComponentUseCase import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.spel.SpelExpressionEvaluationException import pl.touk.nussknacker.engine.util.test.TestScenarioRunner import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage @@ -16,11 +18,24 @@ import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters._ -class FlinkTestScenarioRunnerSpec extends AnyFunSuite with Matchers with FlinkSpec with ValidatedValuesDetailedMessage { +class FlinkTestScenarioRunnerSpec + extends AnyFunSuite + with Matchers + with ValidatedValuesDetailedMessage + with BeforeAndAfterAll { import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ import pl.touk.nussknacker.engine.spel.SpelExtension._ + private val config = ConfigFactory.empty() + + private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices() + + override protected def afterAll(): Unit = { + super.afterAll() + flinkMiniClusterWithServices.close() + } + test("should return service invoke value") { val input = "input" @@ -33,7 +48,7 @@ class FlinkTestScenarioRunnerSpec extends AnyFunSuite with Matchers with FlinkSp val runResults = TestScenarioRunner - .flinkBased(config, flinkMiniCluster) + .flinkBased(config, flinkMiniClusterWithServices) .withExtraComponents(List(ComponentDefinition(TestService.ServiceId, TestService))) .build() .runWithData[String, String](scenario, List(input)) @@ -53,7 +68,7 @@ class FlinkTestScenarioRunnerSpec extends AnyFunSuite with Matchers with FlinkSp val runResults = TestScenarioRunner - .flinkBased(config, flinkMiniCluster) + .flinkBased(config, flinkMiniClusterWithServices) .withExtraComponents(List(ComponentDefinition(TestService.ServiceId, TestService))) .inTestRuntimeMode .build() @@ -71,7 +86,7 @@ class FlinkTestScenarioRunnerSpec extends AnyFunSuite with Matchers with FlinkSp val runResults = TestScenarioRunner - .flinkBased(config, flinkMiniCluster) + .flinkBased(config, flinkMiniClusterWithServices) .withExtraGlobalVariables(Map("SAMPLE" -> SampleHelper)) .build() .runWithData[String, String](scenario, List("lcl")) @@ -88,7 +103,7 @@ class FlinkTestScenarioRunnerSpec extends AnyFunSuite with Matchers with FlinkSp val runResults = TestScenarioRunner - .flinkBased(config, flinkMiniCluster) + .flinkBased(config, flinkMiniClusterWithServices) .build() .runWithData[String, String]( scenario, @@ -111,7 +126,7 @@ class FlinkTestScenarioRunnerSpec extends AnyFunSuite with Matchers with FlinkSp val runResults = TestScenarioRunner - .flinkBased(config, flinkMiniCluster) + .flinkBased(config, flinkMiniClusterWithServices) .build() .runWithData[Int, Int](scenario, List(123)) @@ -128,7 +143,7 @@ class FlinkTestScenarioRunnerSpec extends AnyFunSuite with Matchers with FlinkSp val runResults = TestScenarioRunner - .flinkBased(config, flinkMiniCluster) + .flinkBased(config, flinkMiniClusterWithServices) .inTestRuntimeMode .build() .runWithData[Int, Int](scenario, List(10))