Skip to content

Commit

Permalink
[NU-1979] MiniCluster refactor: MiniClusterJobStatusCheckingOps + usi…
Browse files Browse the repository at this point in the history
…ng flinkMiniCluster module in scenario unit tests
  • Loading branch information
arkadius committed Jan 31, 2025
1 parent 6e75620 commit a2c8db7
Show file tree
Hide file tree
Showing 46 changed files with 708 additions and 657 deletions.
28 changes: 10 additions & 18 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -694,11 +694,9 @@ lazy val flinkTests = (project in flink("tests"))
name := "nussknacker-flink-tests",
libraryDependencies ++= {
Seq(
"org.apache.flink" % "flink-connector-base" % flinkV % Test,
"org.apache.flink" % "flink-streaming-java" % flinkV % Test,
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV % Test,
"org.apache.flink" % "flink-connector-kafka" % flinkConnectorKafkaV % Test,
"org.apache.flink" % "flink-json" % flinkV % Test
"org.apache.flink" % "flink-connector-base" % flinkV % Test,
"org.apache.flink" % "flink-connector-kafka" % flinkConnectorKafkaV % Test,
"org.apache.flink" % "flink-json" % flinkV % Test
)
}
)
Expand Down Expand Up @@ -785,7 +783,7 @@ lazy val flinkExecutor = (project in flink("executor"))
// Different versions of netty which is on the bottom of this stack causes NoClassDefFoundError.
// To overcome this problem and reduce size of model jar bundle, we add http utils as a compile time dependency.
httpUtils,
flinkTestUtils % Test
flinkTestUtils % Test,
)

lazy val scenarioCompiler = (project in file("scenario-compiler"))
Expand Down Expand Up @@ -1033,14 +1031,14 @@ lazy val flinkComponentsTestkit = (project in utils("flink-components-testkit"))
name := "nussknacker-flink-components-testkit",
libraryDependencies ++= {
Seq(
"org.apache.flink" % "flink-streaming-java" % flinkV exclude ("com.esotericsoftware", "kryo-shaded"),
"org.apache.flink" % "flink-metrics-dropwizard" % flinkV
)
}
)
.dependsOn(
componentsTestkit,
flinkExecutor,
flinkTestUtils,
flinkMiniCluster,
flinkBaseComponents,
flinkBaseUnboundedComponents,
defaultModel
Expand Down Expand Up @@ -1208,6 +1206,7 @@ lazy val flinkMiniCluster = (project in flink("minicluster"))
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV,
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV % Provided,
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV % Provided,
"com.softwaremill.retry" %% "retry" % retryV,
) ++ flinkLibScalaDeps(scalaVersion.value)
}
)
Expand All @@ -1222,21 +1221,13 @@ lazy val flinkTestUtils = (project in flink("test-utils"))
name := "nussknacker-flink-test-utils",
libraryDependencies ++= {
Seq(
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
// intellij has some problems with provided...
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV,
"org.apache.flink" % "flink-test-utils" % flinkV excludeAll (
// we use logback in NK
ExclusionRule("org.apache.logging.log4j", "log4j-slf4j-impl")
),
"org.apache.flink" % "flink-runtime" % flinkV % Compile classifier "tests",
"org.apache.flink" % "flink-metrics-dropwizard" % flinkV,
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV,
"com.dimafeng" %% "testcontainers-scala-kafka" % testContainersScalaV,
) ++ flinkLibScalaDeps(scalaVersion.value)
)
}
)
.dependsOn(testUtils, flinkComponentsUtils, flinkExtensionsApi, componentsUtils, scenarioCompiler)
.dependsOn(testUtils, flinkComponentsUtils, flinkExtensionsApi, scenarioCompiler, flinkMiniCluster)

lazy val requestResponseComponentsUtils = (project in lite("request-response/components-utils"))
.settings(commonSettings)
Expand Down Expand Up @@ -1825,6 +1816,7 @@ lazy val flinkBaseComponentsTests = (project in flink("components/base-tests"))
)
.dependsOn(
flinkComponentsTestkit % Test,
flinkTestUtils % Test,
flinkTableApiComponents % Test
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.typed.TypedMap
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.spel
import pl.touk.nussknacker.engine.util.test.{ClassBasedTestScenarioRunner, RunResult, TestScenarioRunner}
import pl.touk.nussknacker.openapi.enrichers.SwaggerEnricher
import pl.touk.nussknacker.openapi.parser.SwaggerParser
Expand All @@ -30,7 +29,6 @@ class OpenApiScenarioIntegrationTest
extends AnyFlatSpec
with BeforeAndAfterAll
with Matchers
with FlinkSpec
with LazyLogging
with VeryPatientScalaFutures
with ValidatedValuesDetailedMessage {
Expand Down Expand Up @@ -59,6 +57,13 @@ class OpenApiScenarioIntegrationTest
test(prepareScenarioRunner(port, sttpBackend, _.copy(allowedMethods = List("POST"))))
}

private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices()

override protected def afterAll(): Unit = {
super.afterAll()
flinkMiniClusterWithServices.close()
}

val stubbedBackend: SttpBackendStub[Future, Any] = SttpBackendStub.asynchronousFuture.whenRequestMatchesPartial {
case request =>
request.headers match {
Expand Down Expand Up @@ -150,7 +155,7 @@ class OpenApiScenarioIntegrationTest
val stubComponent = prepareStubbedComponent(sttpBackend, openAPIsConfig, url)
// TODO: switch to liteBased after adding ability to override components there (currently there is only option to append not conflicting once) and rename class to *FunctionalTest
TestScenarioRunner
.flinkBased(ConfigFactory.empty(), flinkMiniCluster)
.flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices)
.withExtraComponents(List(stubComponent))
.build()
}
Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
* For `aggregate-session` - default `endSessionCondition` is now false
* Improved scenario visualization loading time
* [#7516](https://github.com/TouK/nussknacker/pull/7516) Scenario testing endpoints no longer perform full scenario compilation and validation
* [#7511](https://github.com/TouK/nussknacker/pull/7511) `flink-components-testkit` rework: easier ScenarioTestRunner creation - see [Migration guide](MigrationGuide.md) for details

## 1.18

Expand Down
14 changes: 12 additions & 2 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ To see the biggest differences please consult the [changelog](Changelog.md).

## In version 1.19.0 (Not released yet)


### Configuration changes

* [#7181](https://github.com/TouK/nussknacker/pull/7181) Added designer configuration: stickyNotesSettings
* maxContentLength - max length of a sticky notes content (characters)
* maxNotesCount - max count of sticky notes inside one scenario/fragment
* enabled - if set to false stickyNotes feature is disabled, stickyNotes cant be created, they are also not loaded to graph


### Other changes

* [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation
Expand Down Expand Up @@ -84,6 +82,18 @@ To see the biggest differences please consult the [changelog](Changelog.md).
### Code API changes
* [#7368](https://github.com/TouK/nussknacker/pull/7368) [#7502](https://github.com/TouK/nussknacker/pull/7502) Renamed `PeriodicSourceFactory` to `EventGeneratorSourceFactory`
* [#7364](https://github.com/TouK/nussknacker/pull/7364) The DeploymentManager must implement `def schedulingSupport: SchedulingSupport`. If support not added, then `NoSchedulingSupport` should be used.
* [#7511](https://github.com/TouK/nussknacker/pull/7511) Changes around flink-based scenario testing
* `TestScenarioRunner.flinkBased` now takes `FlinkMiniClusterWithServices` instead of `FlinkMiniClusterHolder`.
`flink-tests` module doesn't depend on `flink-test-utils` module. To create `FlinkMiniClusterWithServices` follow steps:
* Remove `FlinkSpec` inheritance from test class
* Extend `BeforeAndAfterAll` by your test class
* Create `FlinkMiniClusterWithServices` using `val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices()`
* Close `FlinkMiniClusterWithServices` in `afterAll` block
* Changes in `flink-test-utils` module
* `FlinkMiniClusterHolder` doesn't provide `MiniCluster` methods. If you want to use them, use `FlinkMiniClusterHolder.miniCluster`
* `FlinkMiniClusterHolder.createExecutionEnvironment` method is not available. Use `FlinkMiniClusterHolder.withExecutionEnvironment`
which properly closes created environment
* `MiniClusterExecutionEnvironment` created by `FlinkMiniClusterHolder` is not a `StreamExecutionEnvironment`, to access it, use `.env` nested method
## In version 1.18.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package pl.touk.nussknacker.engine.common.components

import cats.data.{NonEmptyList, Validated, ValidatedNel}
import org.scalatest.Inside
import com.typesafe.config.ConfigFactory
import org.scalatest.{BeforeAndAfterAll, Inside}
import org.scalatest.concurrent.Eventually
import org.scalatest.freespec.AnyFreeSpec
import org.scalatest.matchers.should.Matchers
Expand All @@ -15,7 +16,7 @@ import pl.touk.nussknacker.engine.api.generics.ExpressionParseError.{
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner
import pl.touk.nussknacker.engine.lite.util.test.LiteTestScenarioRunner._
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
Expand Down Expand Up @@ -272,11 +273,20 @@ trait DecisionTableSpec

private final case class TestMessage(id: String, minAge: Int)

class FlinkEngineRunDecisionTableSpec extends DecisionTableSpec with FlinkSpec {
class FlinkEngineRunDecisionTableSpec extends DecisionTableSpec with BeforeAndAfterAll {

private val config = ConfigFactory.empty()

private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices()

override protected def afterAll(): Unit = {
super.afterAll()
flinkMiniClusterWithServices.close()
}

override protected lazy val testScenarioRunner: FlinkTestScenarioRunner =
TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.flinkBased(config, flinkMiniClusterWithServices)
.build()

override protected def execute[DATA: ClassTag, RESULT](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.connector.source.Boundedness
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.util.Collector
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.TemplateRenderedPart.{RenderedLiteral, RenderedSubExpression}
Expand All @@ -30,7 +31,7 @@ import pl.touk.nussknacker.engine.flink.api.process.{
FlinkCustomNodeContext,
FlinkCustomStreamTransformation
}
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode
Expand All @@ -40,18 +41,25 @@ import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage

class SpelTemplateLazyParameterTest
extends AnyFunSuite
with FlinkSpec
with Matchers
with ValidatedValuesDetailedMessage {
with ValidatedValuesDetailedMessage
with BeforeAndAfterAll {

private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices()

private lazy val runner = TestScenarioRunner
.flinkBased(ConfigFactory.empty(), flinkMiniCluster)
.flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices)
.withExecutionMode(ExecutionMode.Batch)
.withExtraComponents(
List(ComponentDefinition("spelTemplatePartsCustomTransformer", SpelTemplatePartsCustomTransformer))
)
.build()

override protected def afterAll(): Unit = {
super.afterAll()
flinkMiniClusterWithServices.close()
}

test("flink custom transformer using spel template rendered parts") {
val scenario = ScenarioBuilder
.streaming("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ package pl.touk.nussknacker.engine.flink.table.aggregate

import com.typesafe.config.ConfigFactory
import org.apache.flink.api.connector.source.Boundedness
import org.scalatest.Inside
import org.scalatest.LoneElement._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.{BeforeAndAfterAll, Inside}
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory
import pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider
import pl.touk.nussknacker.engine.flink.table.SpelValues._
import pl.touk.nussknacker.engine.flink.table.aggregate.TableAggregationTest.{
AggregationParameters,
TestRecord,
buildMultipleAggregationsScenario
}
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode
Expand All @@ -32,17 +32,29 @@ import java.time.{LocalDate, OffsetDateTime, ZonedDateTime}
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

class TableAggregationTest extends AnyFunSuite with TableDrivenPropertyChecks with FlinkSpec with Matchers with Inside {
class TableAggregationTest
extends AnyFunSuite
with TableDrivenPropertyChecks
with Matchers
with Inside
with BeforeAndAfterAll {

private lazy val additionalComponents: List[ComponentDefinition] =
FlinkTableComponentProvider.configIndependentComponents

private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices()

private lazy val runner = TestScenarioRunner
.flinkBased(ConfigFactory.empty(), flinkMiniCluster)
.flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices)
.withExecutionMode(ExecutionMode.Batch)
.withExtraComponents(additionalComponents)
.build()

override protected def afterAll(): Unit = {
super.afterAll()
flinkMiniClusterWithServices.close()
}

test("should be able to aggregate by number types, string and boolean declared in spel") {
val aggregationParameters =
(spelBoolean :: spelStr :: spelBigDecimal :: numberPrimitiveLiteralExpressions).map { expr =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import org.apache.flink.api.connector.source.Boundedness
import org.apache.flink.types.Row
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.{Inside, LoneElement}
import org.scalatest.{BeforeAndAfterAll, Inside, LoneElement}
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult}
import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder}
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory
import pl.touk.nussknacker.engine.flink.table.{FlinkTableComponentProvider, SpelValues}
import pl.touk.nussknacker.engine.flink.table.join.TableJoinTest.OrderOrProduct
import pl.touk.nussknacker.engine.flink.table.join.TableJoinTest.OrderOrProduct._
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.util.transformer.join.BranchType
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode
Expand All @@ -23,11 +23,11 @@ import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage

class TableJoinTest
extends AnyFunSuite
with FlinkSpec
with Matchers
with Inside
with ValidatedValuesDetailedMessage
with LoneElement {
with LoneElement
with BeforeAndAfterAll {

import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
import pl.touk.nussknacker.engine.spel.SpelExtension._
Expand All @@ -37,12 +37,19 @@ class TableJoinTest
private lazy val additionalComponents: List[ComponentDefinition] =
FlinkTableComponentProvider.configIndependentComponents ::: Nil

private lazy val flinkMiniClusterWithServices = FlinkMiniClusterFactory.createUnitTestsMiniClusterWithServices()

private lazy val runner = TestScenarioRunner
.flinkBased(ConfigFactory.empty(), flinkMiniCluster)
.flinkBased(ConfigFactory.empty(), flinkMiniClusterWithServices)
.withExecutionMode(ExecutionMode.Batch)
.withExtraComponents(additionalComponents)
.build()

override protected def afterAll(): Unit = {
super.afterAll()
flinkMiniClusterWithServices.close()
}

private val mainBranchId = "main"
private val joinedBranchId = "joined"

Expand Down
Loading

0 comments on commit a2c8db7

Please sign in to comment.