diff --git a/build.sbt b/build.sbt index bcd09bc56a6..b19adb77a37 100644 --- a/build.sbt +++ b/build.sbt @@ -631,7 +631,9 @@ lazy val flinkDeploymentManager = (project in flink("management")) componentsApi % Provided, httpUtils % Provided, flinkScalaUtils % Provided, - flinkTestUtils % IntegrationTest, + // test->test dependency is needed to load SimpleProcessConfigCreator + flinkExecutor % "test,test->test", + flinkTestUtils % "it,test", kafkaTestUtils % "it,test" ) @@ -710,18 +712,22 @@ lazy val flinkTests = (project in flink("tests")) } ) .dependsOn( - defaultModel % Test, - flinkExecutor % Test, - flinkKafkaComponents % Test, - flinkBaseComponents % Test, - flinkBaseUnboundedComponents % Test, - flinkTableApiComponents % Test, - flinkTestUtils % Test, - kafkaTestUtils % Test, - flinkComponentsTestkit % Test, + defaultModel % Test, + flinkExecutor % Test, + flinkKafkaComponents % Test, + flinkBaseComponents % Test, + flinkBaseUnboundedComponents % Test, + flinkTableApiComponents % Test, + flinkTestUtils % Test, + kafkaTestUtils % Test, + flinkComponentsTestkit % Test, + flinkDeploymentManager % Test, + // test->test dependencies are needed to load components from these modules + flinkKafkaComponentsUtils % "test->test", + flinkSchemedKafkaComponentsUtils % "test->test", // for local development - designer % Test, - deploymentManagerApi % Test + designer % Test, + deploymentManagerApi % Test ) lazy val defaultModel = (project in (file("defaultModel"))) @@ -965,7 +971,7 @@ lazy val flinkSchemedKafkaComponentsUtils = (project in flink("schemed-kafka-com componentsUtils % Provided, kafkaTestUtils % Test, flinkTestUtils % Test, - flinkExecutor % Test + flinkExecutor % Test, ) lazy val flinkKafkaComponentsUtils = (project in flink("kafka-components-utils")) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/TestingApiEndpoints.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/TestingApiEndpoints.scala index b04ef1f1712..51536d7edc8 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/TestingApiEndpoints.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/TestingApiEndpoints.scala @@ -1,34 +1,33 @@ package pl.touk.nussknacker.ui.api.description +import io.circe.Encoder import pl.touk.nussknacker.engine.api.StreamMetaData import pl.touk.nussknacker.engine.api.definition.Parameter import pl.touk.nussknacker.engine.api.graph.{ProcessProperties, ScenarioGraph} import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process.ProcessName -import pl.touk.nussknacker.engine.api.typed.typing.Typed +import pl.touk.nussknacker.engine.api.typed.typing._ import pl.touk.nussknacker.engine.definition.test.TestingCapabilities import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions.SecuredEndpoint -import pl.touk.nussknacker.restmodel.definition.{UIParameter, UISourceParameters} +import pl.touk.nussknacker.restmodel.definition.UISourceParameters import pl.touk.nussknacker.restmodel.validation.ValidationResults.{NodeValidationError, NodeValidationErrorType} -import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._ import pl.touk.nussknacker.security.AuthCredentials import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioGraphCodec._ +import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._ import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioTestingCodecs._ -import pl.touk.nussknacker.ui.definition.DefinitionsService -import sttp.model.StatusCode.Ok -import sttp.tapir.EndpointIO.Example -import sttp.tapir._ -import sttp.tapir.json.circe.jsonBody -import io.circe.Encoder -import pl.touk.nussknacker.engine.api.typed.typing._ import pl.touk.nussknacker.ui.api.TestingApiHttpService.Examples.{ malformedTypingResultExample, noScenarioExample, testDataGenerationErrorExample } import pl.touk.nussknacker.ui.api.TestingApiHttpService.TestingError +import pl.touk.nussknacker.ui.definition.DefinitionsService +import sttp.model.StatusCode.Ok +import sttp.tapir.EndpointIO.Example +import sttp.tapir._ +import sttp.tapir.json.circe.jsonBody class TestingApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseEndpointDefinitions { import NodesApiEndpoints.Dtos._ diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessDeploymentsTableFactory.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessDeploymentsTableFactory.scala index 7518b5cda0a..c46626ac88c 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessDeploymentsTableFactory.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/legacy/db/LegacyPeriodicProcessDeploymentsTableFactory.scala @@ -1,7 +1,11 @@ package pl.touk.nussknacker.ui.process.periodic.legacy.db import pl.touk.nussknacker.ui.process.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus -import pl.touk.nussknacker.ui.process.periodic.model.{PeriodicProcessDeploymentId, PeriodicProcessDeploymentStatus, PeriodicProcessId} +import pl.touk.nussknacker.ui.process.periodic.model.{ + PeriodicProcessDeploymentId, + PeriodicProcessDeploymentStatus, + PeriodicProcessId +} import slick.jdbc.{JdbcProfile, JdbcType} import slick.lifted.ProvenShape import slick.sql.SqlProfile.ColumnOption.NotNull diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala index 7f2a7b58edf..61fe7fa79c9 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala @@ -13,7 +13,11 @@ import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment._ -import pl.touk.nussknacker.engine.management.{FlinkDeploymentManager, FlinkStreamingDeploymentManagerProvider} +import pl.touk.nussknacker.engine.management.{ + FlinkDeploymentManager, + FlinkStreamingDeploymentManagerProvider, + ScenarioTestingConfig +} import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import pl.touk.nussknacker.test.config.ConfigWithScalaVersion import pl.touk.nussknacker.test.utils.domain.TestFactory @@ -55,7 +59,8 @@ class MockDeploymentManager( SttpBackendStub.asynchronousFuture ), shouldVerifyBeforeDeploy = false, - mainClassName = "UNUSED" + mainClassName = "UNUSED", + scenarioTestingConfig = ScenarioTestingConfig() ) { import MockDeploymentManager._ diff --git a/docs/Changelog.md b/docs/Changelog.md index c008dee443c..41953483bee 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -60,6 +60,7 @@ * implementation provided for Flink DM * [#7443](https://github.com/TouK/nussknacker/pull/7443) Indexing on record is more similar to indexing on map. The change lets us access record values dynamically. For example now spel expression "{a: 5, b: 10}[#input.field]" compiles and has type "Integer" inferred from types of values of the record. This lets us access record value based on user input, for instance if user passes "{"field": "b"}" to scenario we will get value "10", whereas input {"field": "c"} would result in "null". Expression "{a: 5}["b"]" still does not compile because it is known at compile time that record does not have property "b". * [#7324](https://github.com/TouK/nussknacker/pull/7324) Fix: Passing Flink Job Global Params +* [#7458](https://github.com/TouK/nussknacker/pull/7458) Flink scenario testing mechanism and scenario state verification mechanism: mini cluster created once and reused each time ## 1.18 diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 43052d7b8f0..586455cb579 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -73,6 +73,9 @@ To see the biggest differences please consult the [changelog](Changelog.md). shouldVerifyBeforeDeploy: true } ``` +* [#7458](https://github.com/TouK/nussknacker/pull/7458) Flink scenario testing mechanism and scenario state verification mechanism: by default mini cluster is created once and reused each time + To revert previous behaviour (creating minicluster each time), change `deploymentConfig.scenarioTesting.reuseMiniClusterForScenarioTesting` or/and + `deploymentConfig.scenarioTesting.reuseMiniClusterForScenarioStateVerification` to `false` ### Code API changes * [#7368](https://github.com/TouK/nussknacker/pull/7368) Renamed `PeriodicSourceFactory` to `SampleGeneratorSourceFactory` diff --git a/docs/configuration/ScenarioDeploymentConfiguration.md b/docs/configuration/ScenarioDeploymentConfiguration.md index 9e172b9bf85..3ada853ac4f 100644 --- a/docs/configuration/ScenarioDeploymentConfiguration.md +++ b/docs/configuration/ScenarioDeploymentConfiguration.md @@ -5,10 +5,15 @@ sidebar_position: 2 # Scenario Deployment configuration -Deployment of a scenario onto the [Engine](../about/engines/Engines.md) is managed by the Designer's extension called [Deployment Manager](../about/GLOSSARY.md#deployment-manager). -To enable a given [Deployment Manager](../about/GLOSSARY.md#deployment-manager) its jar package has to be placed in the Designer's classpath. Nussknacker is distributed with three default [Deployment Managers](../about/GLOSSARY.md#deployment-manager) (`flinkStreaming`, `lite-k8s`, `lite-embedded`). Their jars are located in the `managers` directory. +Deployment of a scenario onto the [Engine](../about/engines/Engines.md) is managed by the Designer's extension +called [Deployment Manager](../about/GLOSSARY.md#deployment-manager). +To enable a given [Deployment Manager](../about/GLOSSARY.md#deployment-manager) its jar package has to be placed in the +Designer's classpath. Nussknacker is distributed with three +default [Deployment Managers](../about/GLOSSARY.md#deployment-manager) (`flinkStreaming`, `lite-k8s`, `lite-embedded`). +Their jars are located in the `managers` directory. -Deployment specific configuration is provided in the `deploymentConfig` section of the configuration file - check [configuration areas](./index.mdx#configuration-areas) to understand the structure of the configuration file. +Deployment specific configuration is provided in the `deploymentConfig` section of the configuration file - +check [configuration areas](./index.mdx#configuration-areas) to understand the structure of the configuration file. Below is a snippet of scenario deployment configuration. @@ -23,22 +28,32 @@ deploymentConfig { ``` Parameters: -- `type` parameter determines the type of the [Deployment Manager](../about/GLOSSARY.md#deployment-manager). Possible options are: `flinkStreaming`, `lite-k8s`, `lite-embedded` -- `engineSetupName` parameter is optional. It specifies how the engine will be displayed in the GUI. If not specified, default name will be used instead (e.g. `Flink` for `flinkStreaming` Deployment Manager). + +- `type` parameter determines the type of the [Deployment Manager](../about/GLOSSARY.md#deployment-manager). Possible + options are: `flinkStreaming`, `lite-k8s`, `lite-embedded` +- `engineSetupName` parameter is optional. It specifies how the engine will be displayed in the GUI. If not specified, + default name will be used instead (e.g. `Flink` for `flinkStreaming` Deployment Manager). ## Kubernetes native Lite engine configuration -Please check high level [Lite engine description](../about/engines/LiteArchitecture.md#scenario-deployment) before proceeding to configuration details. +Please check high level [Lite engine description](../about/engines/LiteArchitecture.md#scenario-deployment) before +proceeding to configuration details. -Please note, that K8s Deployment Manager has to be run with properly configured K8s access. If you install the Designer in K8s cluster (e.g. via Helm chart) this comes out of the box. If you want to run the Designer outside the cluster, you have to configure `.kube/config` properly. +Please note, that K8s Deployment Manager has to be run with properly configured K8s access. If you install the Designer +in K8s cluster (e.g. via Helm chart) this comes out of the box. If you want to run the Designer outside the cluster, you +have to configure `.kube/config` properly. Except the `servicePort` configuration option, all remaining configuration options apply to both `streaming` and `request-response` processing modes. -The table below contains configuration options for the Lite engine. If you install Designer with Helm, you can use Helm values override mechanism to supply your own values for these [options](https://artifacthub.io/packages/helm/touk/nussknacker#configuration-in-values-yaml). As the the result of the Helm template rendering "classic" Nussknacker configuration file will be generated. +The table below contains configuration options for the Lite engine. If you install Designer with Helm, you can use Helm +values override mechanism to supply your own values for +these [options](https://artifacthub.io/packages/helm/touk/nussknacker#configuration-in-values-yaml). As the the result +of the Helm template rendering "classic" Nussknacker configuration file will be generated.   -If you install Designer outside the K8s cluster then the required changes should be applied under the `deploymentConfig` key as any other Nussknacker non K8s configuration. +If you install Designer outside the K8s cluster then the required changes should be applied under the `deploymentConfig` +key as any other Nussknacker non K8s configuration. | Parameter | Type | Default value | Description | |------------------------------------------------------|---------------------------|-----------------------------------|------------------------------------------------------------------------------------------| @@ -204,7 +219,9 @@ fixedReplicasCount configuration key; its default value is 2: `{ fixedReplicasCount: x }`. -​In the **Streaming** processing mode the scenario parallelism is set in the scenario properties; it determines the minimal number of tasks used to process events. The count of replicas, scenario parallelism and number of tasks per replica are connected with a simple formula: +​In the **Streaming** processing mode the scenario parallelism is set in the scenario properties; it determines the +minimal number of tasks used to process events. The count of replicas, scenario parallelism and number of tasks per +replica are connected with a simple formula: *scenarioParallelism = replicasCount \* tasksPerReplica* @@ -258,6 +275,7 @@ It can be configured with following options. #### Configuring custom ingress class By default, ingress resource will be created without any ingress class. If you want to use different class, you can set + ```hocon ingress { enabled: true, @@ -285,7 +303,7 @@ Deployment Manager of type `lite-embedded` has the following configuration optio | Parameter | Type | Default value | Description | |-----------------------------------------------------------|--------|-----------------|----------------------------------------------------------------------------------------------------------------------------------------------------------| -| mode | string | | Processing mode: either streaming-lite or request-response | +| mode | string | | Processing mode: either streaming-lite or request-response | | http.interface | string | 0.0.0.0 | (Request-Response only) Interface on which REST API of scenarios will be exposed | | http.port | int | 8181 | (Request-Response only) Port on which REST API of scenarios will be exposed | | request-response.definitionMetadata.servers | string | [{"url": "./"}] | (Request-Response only) Configuration of exposed servers in scenario's OpenAPI definition. When not configured, will be used server with ./ relative url | @@ -298,16 +316,20 @@ Deployment Manager of type `lite-embedded` has the following configuration optio Deployment Manager of type `flinkStreaming` has the following configuration options: -| Parameter | Type | Default value | Description | -|-------------------------------------|----------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| restUrl | string | | The only required parameter, REST API endpoint of the Flink cluster | -| jobManagerTimeout | duration | 1 minute | Timeout for communication with FLink cluster. Consider extending if e.g. you have long savepoint times etc. | -| shouldVerifyBeforeDeploy | boolean | true | By default, before redeployment of scenario with state from savepoint, verification of savepoint compatibility is performed. There are some cases when it can be too time consuming or not possible. Use this flag to disable it. | -| shouldCheckAvailableSlots | boolean | true | When set to true, Nussknacker checks if there are free slots to run new job. This check should be disabled on Flink Kubernetes Native deployments, where Taskmanager is started on demand. | -| waitForDuringDeployFinish.enabled | boolean | true | When set to true, after Flink job execution, we check if tasks were successfully started on TaskMangers, before marking version as deployed. Otherwise version is marked as deployed immediately after successful response from JobManager. | -| waitForDuringDeployFinish.maxChecks | boolean | 180 | It works when `waitForDuringDeployFinish.enabled` option is set to `true`. This parameter describe how many times we should check if tasks were successfully started on TaskMangers before notifying about deployment failure. | -| waitForDuringDeployFinish.delay | boolean | 1 second | It works when `waitForDuringDeployFinish.enabled` option is set to `true`. This parameter describe how long should be delay between checks. | -| scenarioStateCaching.enabled | boolean | true | Enables scenario state caching in scenario list view | -| scenarioStateCaching.cacheTTL | duration | 10 seconds | TimeToLeave for scenario state cache entries | -| scenarioStateRequestTimeout | duration | 3 seconds | Request timeout for fetching scenario state from Flink | -| jobConfigsCacheSize | int | 1000 | Maximum number of cached job configuration elements. | +| Parameter | Type | Default value | Description | +|--------------------------------------------------------------|----------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| restUrl | string | | The only required parameter, REST API endpoint of the Flink cluster | +| jobManagerTimeout | duration | 1 minute | Timeout for communication with FLink cluster. Consider extending if e.g. you have long savepoint times etc. | +| shouldVerifyBeforeDeploy | boolean | true | By default, before redeployment of scenario with state from savepoint, verification of savepoint compatibility is performed. There are some cases when it can be too time consuming or not possible. Use this flag to disable it. | +| shouldCheckAvailableSlots | boolean | true | When set to true, Nussknacker checks if there are free slots to run new job. This check should be disabled on Flink Kubernetes Native deployments, where Taskmanager is started on demand. | +| waitForDuringDeployFinish.enabled | boolean | true | When set to true, after Flink job execution, we check if tasks were successfully started on TaskMangers, before marking version as deployed. Otherwise version is marked as deployed immediately after successful response from JobManager. | +| waitForDuringDeployFinish.maxChecks | boolean | 180 | It works when `waitForDuringDeployFinish.enabled` option is set to `true`. This parameter describe how many times we should check if tasks were successfully started on TaskMangers before notifying about deployment failure. | +| waitForDuringDeployFinish.delay | boolean | 1 second | It works when `waitForDuringDeployFinish.enabled` option is set to `true`. This parameter describe how long should be delay between checks. | +| scenarioStateCaching.enabled | boolean | true | Enables scenario state caching in scenario list view | +| scenarioStateCaching.cacheTTL | duration | 10 seconds | TimeToLeave for scenario state cache entries | +| scenarioStateRequestTimeout | duration | 3 seconds | Request timeout for fetching scenario state from Flink | +| jobConfigsCacheSize | int | 1000 | Maximum number of cached job configuration elements. | +| scenarioTesting.reuseMiniClusterForScenarioTesting | boolean | true | Creates mini cluster once and reuses it for each scenario testing attempt | +| scenarioTesting.reuseMiniClusterForScenarioStateVerification | boolean | true | Creates mini cluster once and reuses it for each scenario state verification | +| scenarioTesting.parallelism | int | 1 | Parallelism that will be used for scenario testing and scenario state verifications mechanisms when mini cluster reusage is enabled; when mini cluster reusage is disabled, parallelism is taken from scenario properties | +| scenarioTesting.streamExecutionConfig | map of strings | [:] | Configuration that will be passed to `StreamExecutionEnvironment` for scenario testing and scenario state verifications mechanisms when mini cluster reusage is enabled; when mini cluster reusage is disabled, empty configuration will be used | diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala index 912431416a1..3b38799e3f4 100644 --- a/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala +++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala @@ -27,8 +27,9 @@ class BatchDataGenerationSpec private val designerServiceUrl = "http://localhost:8080" - private val liveDataGenScenarioName = "SumTransactions-LiveData" - private val randomDataGenScenarioName = "SumTransactions-RandomData" + private val timeBasedRandomSuffix = System.currentTimeMillis() + private val liveDataGenScenarioName = s"SumTransactions-LiveData-$timeBasedRandomSuffix" + private val randomDataGenScenarioName = s"SumTransactions-RandomData-$timeBasedRandomSuffix" override def beforeAll(): Unit = { createEmptyBatchScenario(liveDataGenScenarioName, "Default") @@ -106,7 +107,7 @@ class BatchDataGenerationSpec | "nodeResults": { | "sourceId": [ | { - | "id": "SumTransactions-LiveData-sourceId-0-0", + | "id": "$liveDataGenScenarioName-sourceId-0-0", | "variables": { | "input": { | "pretty": { @@ -122,7 +123,7 @@ class BatchDataGenerationSpec | ], | "end": [ | { - | "id": "SumTransactions-LiveData-sourceId-0-0", + | "id": "$liveDataGenScenarioName-sourceId-0-0", | "variables": { | "input": { | "pretty": { @@ -183,7 +184,7 @@ class BatchDataGenerationSpec | "nodeResults": { | "sourceId": [ | { - | "id": "SumTransactions-LiveData-sourceId-0-0", + | "id": "$liveDataGenScenarioName-sourceId-0-0", | "variables": { | "input": { | "pretty": { @@ -199,7 +200,7 @@ class BatchDataGenerationSpec | ], | "end": [ | { - | "id": "SumTransactions-LiveData-sourceId-0-0", + | "id": "$liveDataGenScenarioName-sourceId-0-0", | "variables": { | "input": { | "pretty": { diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala index 8d9ac57b817..ac89554a763 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala @@ -4,6 +4,7 @@ import akka.actor.ActorSystem import cats.data.{Validated, ValidatedNel} import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging +import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.development.manager.DevelopmentStateStatus._ import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.ProcessVersion @@ -19,7 +20,11 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment._ -import pl.touk.nussknacker.engine.management.{FlinkProcessTestRunner, FlinkStreamingPropertiesConfig} +import pl.touk.nussknacker.engine.management.{FlinkStreamingPropertiesConfig, ScenarioTestingConfig} +import pl.touk.nussknacker.engine.management.scenariotesting.{ + FlinkProcessTestRunner, + ScenarioTestingMiniClusterWrapperFactory +} import java.net.URI import java.util.UUID @@ -48,7 +53,14 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode private val memory: TrieMap[ProcessName, StatusDetails] = TrieMap[ProcessName, StatusDetails]() private val random = new scala.util.Random() - private lazy val flinkTestRunner = new FlinkProcessTestRunner(modelData.asInvokableModelData) + private lazy val scenarioTestingMiniClusterWrapperOpt = + ScenarioTestingMiniClusterWrapperFactory.createIfConfigured( + modelData.asInvokableModelData.modelClassLoader, + ScenarioTestingConfig() + ) + + private lazy val flinkTestRunner = + new FlinkProcessTestRunner(modelData.asInvokableModelData, scenarioTestingMiniClusterWrapperOpt) implicit private class ProcessStateExpandable(processState: StatusDetails) { @@ -83,7 +95,10 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode case command: DMRunOffScheduleCommand => runOffSchedule(command) case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult("")) case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) => - flinkTestRunner.test(canonicalProcess, scenarioTestData) // it's just for streaming e2e tests from file purposes + flinkTestRunner.runTestsAsync( + canonicalProcess, + scenarioTestData + ) // it's just for streaming e2e tests from file purposes } private def description(canonicalProcess: CanonicalProcess) = { @@ -148,7 +163,9 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode notImplemented } - override def close(): Unit = {} + override def close(): Unit = { + scenarioTestingMiniClusterWrapperOpt.foreach(_.close()) + } private def changeState(name: ProcessName, stateStatus: StateStatus): Unit = memory.get(name).foreach { processState => diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index 8ae2fa10945..410323ba0be 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -8,12 +8,15 @@ import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider import pl.touk.nussknacker.engine.ModelData.BaseModelDataExt import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig -import pl.touk.nussknacker.engine.api.definition.{NotBlankParameterValidator, StringParameterEditor} import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId -import pl.touk.nussknacker.engine.management.{FlinkProcessTestRunner, FlinkStreamingPropertiesConfig} +import pl.touk.nussknacker.engine.management.scenariotesting.{ + FlinkProcessTestRunner, + ScenarioTestingMiniClusterWrapperFactory +} +import pl.touk.nussknacker.engine.management.{FlinkStreamingPropertiesConfig, ScenarioTestingConfig} import pl.touk.nussknacker.engine.newdeployment.DeploymentId import pl.touk.nussknacker.engine.testing.StubbingCommands import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults @@ -58,8 +61,17 @@ object MockableDeploymentManagerProvider { with ManagerSpecificScenarioActivitiesStoredByManager with StubbingCommands { + private lazy val scenarioTestingMiniClusterWrapperOpt = modelDataOpt.flatMap { modelData => + ScenarioTestingMiniClusterWrapperFactory.createIfConfigured( + modelData.asInvokableModelData.modelClassLoader, + ScenarioTestingConfig() + ) + } + private lazy val testRunnerOpt = - modelDataOpt.map(modelData => new FlinkProcessTestRunner(modelData.asInvokableModelData)) + modelDataOpt.map { modelData => + new FlinkProcessTestRunner(modelData.asInvokableModelData, scenarioTestingMiniClusterWrapperOpt) + } override def resolve( idWithName: ProcessIdWithName, @@ -102,7 +114,7 @@ object MockableDeploymentManagerProvider { .get() .get(processVersion.processName.value) .map(Future.successful) - .orElse(testRunnerOpt.map(_.test(scenario, testData))) + .orElse(testRunnerOpt.map(_.runTestsAsync(scenario, testData))) .getOrElse( throw new IllegalArgumentException( s"Tests results not mocked for scenario [${processVersion.processName.value}] and no model data provided" @@ -125,7 +137,10 @@ object MockableDeploymentManagerProvider { ): Future[List[ScenarioActivity]] = Future.successful(MockableDeploymentManager.managerSpecificScenarioActivities.get()) - override def close(): Unit = {} + override def close(): Unit = { + scenarioTestingMiniClusterWrapperOpt.foreach(_.close()) + } + } // note: At the moment this manager cannot be used in tests which are executed in parallel. It can be obviously diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala index da869ea2d97..66889fe2521 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkProcessMain.scala @@ -1,18 +1,21 @@ package pl.touk.nussknacker.engine.process.runner -import java.io.File import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.ExecutionConfig -import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} import pl.touk.nussknacker.engine.api.{CirceUtil, ProcessVersion} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData +import pl.touk.nussknacker.engine.marshall.ScenarioParser import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer +import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} +import java.io.File +import java.nio.charset.StandardCharsets +import scala.util.Using import scala.util.control.NonFatal -trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging { +trait FlinkProcessMain[Env] extends LazyLogging { def main(argsWithHack: Array[String]): Unit = { try { @@ -61,6 +64,15 @@ trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging { prepareExecutionConfig: ExecutionConfigPreparer ): Unit + protected def readProcessFromArg(arg: String): CanonicalProcess = { + val canonicalJson = if (arg.startsWith("@")) { + Using.resource(scala.io.Source.fromFile(arg.substring(1), StandardCharsets.UTF_8.name()))(_.mkString) + } else { + arg + } + ScenarioParser.parseUnsafe(canonicalJson) + } + private def parseProcessVersion(json: String): ProcessVersion = CirceUtil.decodeJsonUnsafe[ProcessVersion](json, "invalid scenario version") diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkRunner.scala deleted file mode 100644 index 9089c1d0807..00000000000 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkRunner.scala +++ /dev/null @@ -1,20 +0,0 @@ -package pl.touk.nussknacker.engine.process.runner - -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.marshall.ScenarioParser - -import java.nio.charset.StandardCharsets -import scala.util.Using - -trait FlinkRunner { - - protected def readProcessFromArg(arg: String): CanonicalProcess = { - val canonicalJson = if (arg.startsWith("@")) { - Using.resource(scala.io.Source.fromFile(arg.substring(1), StandardCharsets.UTF_8.name()))(_.mkString) - } else { - arg - } - ScenarioParser.parseUnsafe(canonicalJson) - } - -} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala deleted file mode 100644 index a51db93cf39..00000000000 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala +++ /dev/null @@ -1,96 +0,0 @@ -package pl.touk.nussknacker.engine.process.runner - -import org.apache.flink.configuration.{ - ConfigUtils, - Configuration, - CoreOptions, - PipelineOptions, - RestOptions, - TaskManagerOptions -} -import org.apache.flink.core.fs.FileSystem -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings -import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration} -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.StreamMetaData -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.util.MetaDataExtractor - -import java.net.{MalformedURLException, URL} -import scala.jdk.CollectionConverters._ -import scala.util.Using - -trait FlinkStubbedRunner { - - protected def modelData: ModelData - - protected def process: CanonicalProcess - - protected def configuration: Configuration - - protected def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment( - MetaDataExtractor - .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) - .parallelism - .getOrElse(1), - configuration - ) - - // we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath... - protected def execute[T]( - env: StreamExecutionEnvironment, - savepointRestoreSettings: SavepointRestoreSettings - ): Unit = { - // Checkpoints are disabled to prevent waiting for checkpoint to happen - // before finishing execution. - env.getCheckpointConfig.disableCheckpointing() - - val streamGraph = env.getStreamGraph - streamGraph.setJobName(process.name.value) - - val jobGraph = streamGraph.getJobGraph() - jobGraph.setClasspaths(classpathsFromModelWithFallbackToConfiguration) - jobGraph.setSavepointRestoreSettings(savepointRestoreSettings) - - val configuration: Configuration = new Configuration - configuration.addAll(jobGraph.getJobConfiguration) - configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, env.getParallelism) - configuration.set[Integer](RestOptions.PORT, 0) - - // FIXME: reversing flink default order - configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") - - // it is required for proper working of HadoopFileSystem - FileSystem.initialize(configuration, null) - - Using.resource( - new MiniCluster( - new MiniClusterConfiguration.Builder() - .setNumSlotsPerTaskManager(env.getParallelism) - .setConfiguration(configuration) - .build() - ) - ) { exec => - exec.start() - val id = exec.submitJob(jobGraph).get().getJobID - exec.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader) - } - } - - private def classpathsFromModelWithFallbackToConfiguration = { - // The class is also used in some scala tests - // and this fallback is to work with a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265 - // see details in pl.touk.nussknacker.engine.flink.test.MiniClusterExecutionEnvironment#execute - modelData.modelClassLoaderUrls match { - case Nil => - ConfigUtils.decodeListFromConfig[String, URL, MalformedURLException]( - configuration, - PipelineOptions.CLASSPATHS, - new URL(_) - ) - case list => list.asJava - } - } - -} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala deleted file mode 100644 index 3463bed6546..00000000000 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala +++ /dev/null @@ -1,56 +0,0 @@ -package pl.touk.nussknacker.engine.process.runner - -import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings -import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.ProcessVersion -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.DeploymentData -import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory -import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar -import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} -import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestRunId, TestServiceInvocationCollector} - -object FlinkVerificationMain extends FlinkRunner { - - def run( - modelData: ModelData, - process: CanonicalProcess, - processVersion: ProcessVersion, - deploymentData: DeploymentData, - savepointPath: String, - configuration: Configuration - ): Unit = - new FlinkVerificationMain(modelData, process, processVersion, deploymentData, savepointPath, configuration) - .runTest() - -} - -class FlinkVerificationMain( - val modelData: ModelData, - val process: CanonicalProcess, - processVersion: ProcessVersion, - deploymentData: DeploymentData, - savepointPath: String, - val configuration: Configuration -) extends FlinkStubbedRunner { - - def runTest(): Unit = { - val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener - val resultCollector = new TestServiceInvocationCollector(collectingListener) - val registrar = prepareRegistrar() - val env = createEnv - - registrar.register(env, process, processVersion, deploymentData, resultCollector) - execute(env, SavepointRestoreSettings.forPath(savepointPath, true)) - } - - protected def prepareRegistrar(): FlinkProcessRegistrar = { - FlinkProcessRegistrar( - VerificationFlinkProcessCompilerDataFactory(process, modelData), - FlinkJobConfig.parse(modelData.modelConfig), - ExecutionConfigPreparer.defaultChain(modelData) - ) - } - -} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala new file mode 100644 index 00000000000..dfc9d4dfacd --- /dev/null +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/AdHocMiniClusterFallbackHandler.scala @@ -0,0 +1,44 @@ +package pl.touk.nussknacker.engine.process.scenariotesting + +import com.typesafe.scalalogging.LazyLogging +import org.apache.flink.configuration.Configuration +import pl.touk.nussknacker.engine.api.StreamMetaData +import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.engine.util.MetaDataExtractor + +// This class handles a legacy ad-hoc way to create minicluster. +// After we fully switch to single mini cluster approach, it should be removed +object AdHocMiniClusterFallbackHandler extends LazyLogging { + + def handleAdHocMniClusterFallback[R]( + reusableMiniClusterWrapperOpt: Option[ScenarioTestingMiniClusterWrapper], + scenario: CanonicalProcess, + useCaseForDebug: String + )(f: ScenarioTestingMiniClusterWrapper => R): R = { + val miniClusterWrapper = reusableMiniClusterWrapperOpt + .map { reusableMiniClusterWrapper => + logger.debug(s"reusableMiniClusterWrapper passed - using it for $useCaseForDebug") + reusableMiniClusterWrapper + } + .getOrElse { + logger.debug(s"reusableMiniClusterWrapper not passed - creating a new MiniCluster for $useCaseForDebug") + createAdHocMiniClusterWrapper(scenario) + } + try { + f(miniClusterWrapper) + } finally { + if (reusableMiniClusterWrapperOpt.isEmpty) { + miniClusterWrapper.close() + } + } + } + + private def createAdHocMiniClusterWrapper(process: CanonicalProcess) = { + val scenarioParallelism = MetaDataExtractor + .extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData()) + .parallelism + .getOrElse(1) + ScenarioTestingMiniClusterWrapper.create(scenarioParallelism, new Configuration()) + } + +} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala similarity index 50% rename from engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala rename to engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala index 62e2efce685..bf6f02931c4 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkTestMain.scala @@ -1,12 +1,10 @@ -package pl.touk.nussknacker.engine.process.runner +package pl.touk.nussknacker.engine.process.scenariotesting import io.circe.Json -import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} -import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.test.ScenarioTestData +import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData} import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory @@ -19,52 +17,45 @@ import pl.touk.nussknacker.engine.testmode.{ TestServiceInvocationCollector } -import scala.util.Using - -object FlinkTestMain extends FlinkRunner { +object FlinkTestMain { def run( + miniClusterWrapperOpt: Option[ScenarioTestingMiniClusterWrapper], modelData: ModelData, - process: CanonicalProcess, - scenarioTestData: ScenarioTestData, - configuration: Configuration, + scenario: CanonicalProcess, + scenarioTestData: ScenarioTestData ): TestResults[Json] = { - val processVersion = ProcessVersion.empty.copy(processName = - ProcessName("snapshot version") - ) // testing process may be unreleased, so it has no version - new FlinkTestMain( - modelData, - process, - scenarioTestData, - processVersion, - DeploymentData.empty.copy(additionalModelConfigs = - AdditionalModelConfigs(modelData.additionalConfigsFromProvider) - ), - configuration - ).runTest + new FlinkTestMain(miniClusterWrapperOpt, modelData).testScenario(scenario, scenarioTestData) } } -class FlinkTestMain( - val modelData: ModelData, - val process: CanonicalProcess, - scenarioTestData: ScenarioTestData, - processVersion: ProcessVersion, - deploymentData: DeploymentData, - val configuration: Configuration -) extends FlinkStubbedRunner { +class FlinkTestMain(miniClusterWrapperOpt: Option[ScenarioTestingMiniClusterWrapper], modelData: ModelData) { - def runTest: TestResults[Json] = { + def testScenario(scenario: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = { val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener try { - val resultCollector = new TestServiceInvocationCollector(collectingListener) - val registrar = prepareRegistrar(collectingListener, scenarioTestData) - val env = createEnv - - registrar.register(env, process, processVersion, deploymentData, resultCollector) - execute(env, SavepointRestoreSettings.none()) - collectingListener.results + AdHocMiniClusterFallbackHandler.handleAdHocMniClusterFallback( + miniClusterWrapperOpt, + scenario, + "scenario testing" + ) { miniClusterWrapper => + val alignedScenario = miniClusterWrapper.alignParallelism(scenario) + val resultCollector = new TestServiceInvocationCollector(collectingListener) + // ProcessVersion can't be passed from DM because testing mechanism can be used with not saved scenario + val processVersion = ProcessVersion.empty.copy(processName = alignedScenario.name) + val deploymentData = DeploymentData.empty.copy(additionalModelConfigs = + AdditionalModelConfigs(modelData.additionalConfigsFromProvider) + ) + val registrar = prepareRegistrar(collectingListener, alignedScenario, scenarioTestData, processVersion) + registrar.register(miniClusterWrapper.env, alignedScenario, processVersion, deploymentData, resultCollector) + miniClusterWrapper.submitJobAndCleanEnv( + alignedScenario.name, + SavepointRestoreSettings.none(), + modelData.modelClassLoader + ) + collectingListener.results + } } finally { collectingListener.clean() } @@ -72,7 +63,9 @@ class FlinkTestMain( protected def prepareRegistrar( collectingListener: ResultsCollectingListener[Json], - scenarioTestData: ScenarioTestData + process: CanonicalProcess, + scenarioTestData: ScenarioTestData, + processVersion: ProcessVersion, ): FlinkProcessRegistrar = { FlinkProcessRegistrar( TestFlinkProcessCompilerDataFactory( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala new file mode 100644 index 00000000000..5d43d07f9d6 --- /dev/null +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/FlinkVerificationMain.scala @@ -0,0 +1,64 @@ +package pl.touk.nussknacker.engine.process.scenariotesting + +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings +import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.engine.deployment.DeploymentData +import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory +import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar +import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig} +import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestServiceInvocationCollector} + +object FlinkVerificationMain { + + def run( + miniClusterWrapperOpt: Option[ScenarioTestingMiniClusterWrapper], + modelData: ModelData, + scenario: CanonicalProcess, + processVersion: ProcessVersion, + savepointPath: String + ): Unit = + new FlinkVerificationMain(miniClusterWrapperOpt, modelData).runTest(scenario, processVersion, savepointPath) + +} + +class FlinkVerificationMain( + miniClusterWrapperOpt: Option[ScenarioTestingMiniClusterWrapper], + modelData: ModelData, +) { + + def runTest(scenario: CanonicalProcess, processVersion: ProcessVersion, savepointPath: String): Unit = { + val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener + try { + AdHocMiniClusterFallbackHandler.handleAdHocMniClusterFallback( + miniClusterWrapperOpt, + scenario, + "scenario state verification" + ) { miniClusterWrapper => + val alignedScenario = miniClusterWrapper.alignParallelism(scenario) + val resultCollector = new TestServiceInvocationCollector(collectingListener) + val registrar = prepareRegistrar(alignedScenario) + val deploymentData = DeploymentData.empty + + registrar.register(miniClusterWrapper.env, alignedScenario, processVersion, deploymentData, resultCollector) + miniClusterWrapper.submitJobAndCleanEnv( + alignedScenario.name, + SavepointRestoreSettings.forPath(savepointPath, true), + modelData.modelClassLoader + ) + } + } finally { + collectingListener.clean() + } + } + + protected def prepareRegistrar(scenario: CanonicalProcess): FlinkProcessRegistrar = { + FlinkProcessRegistrar( + VerificationFlinkProcessCompilerDataFactory(scenario, modelData), + FlinkJobConfig.parse(modelData.modelConfig), + ExecutionConfigPreparer.defaultChain(modelData) + ) + } + +} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala new file mode 100644 index 00000000000..fbdeee9fbfd --- /dev/null +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterFactory.scala @@ -0,0 +1,39 @@ +package pl.touk.nussknacker.engine.process.scenariotesting + +import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions} +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration} + +object ScenarioTestingMiniClusterFactory { + + def createConfiguredMiniCluster(numTaskSlots: Int): MiniCluster = { + val miniClusterConfiguration = prepareMiniClusterConfiguration(numTaskSlots = numTaskSlots) + + // it is required for proper working of HadoopFileSystem + FileSystem.initialize(miniClusterConfiguration, null) + + createMiniCluster(miniClusterConfiguration, numSlotsPerTaskManager = numTaskSlots) + } + + private def prepareMiniClusterConfiguration(numTaskSlots: Int) = { + val configuration: Configuration = new Configuration + configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, numTaskSlots) + configuration.set[Integer](RestOptions.PORT, 0) + + // FIXME: reversing flink default order + configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") + configuration + } + + private def createMiniCluster(configuration: Configuration, numSlotsPerTaskManager: Int) = { + val miniCluster = new MiniCluster( + new MiniClusterConfiguration.Builder() + .setNumSlotsPerTaskManager(numSlotsPerTaskManager) + .setConfiguration(configuration) + .build() + ) + miniCluster.start() + miniCluster + } + +} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala new file mode 100644 index 00000000000..96baca04f7b --- /dev/null +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingMiniClusterWrapper.scala @@ -0,0 +1,86 @@ +package pl.touk.nussknacker.engine.process.scenariotesting + +import com.typesafe.scalalogging.LazyLogging +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointRestoreSettings} +import org.apache.flink.runtime.minicluster.MiniCluster +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import pl.touk.nussknacker.engine.api.StreamMetaData +import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.engine.util.MetaDataExtractor +import pl.touk.nussknacker.engine.util.loader.ModelClassLoader + +import scala.jdk.CollectionConverters._ + +// We use MiniCluster directly, instead of LocalExecutionEnvironment, to be able to pass own classpath... +final class ScenarioTestingMiniClusterWrapper( + miniCluster: MiniCluster, + val env: StreamExecutionEnvironment, + parallelism: Int +) extends AutoCloseable { + + def alignParallelism(canonicalProcess: CanonicalProcess): CanonicalProcess = { + val scenarioParallelism = MetaDataExtractor + .extractTypeSpecificDataOrDefault[StreamMetaData](canonicalProcess.metaData, StreamMetaData()) + .parallelism + if (scenarioParallelism.exists(_ > parallelism)) { + canonicalProcess.copy(metaData = + canonicalProcess.metaData.copy(additionalFields = + canonicalProcess.metaData.additionalFields.copy(properties = + canonicalProcess.metaData.additionalFields.properties + (StreamMetaData.parallelismName -> parallelism.toString) + ) + ) + ) + } else { + canonicalProcess + } + } + + def submitJobAndCleanEnv( + scenarioName: ProcessName, + savepointRestoreSettings: SavepointRestoreSettings, + modelClassLoader: ModelClassLoader + ): Unit = { + // This step clean env transformations. It allows to reuse the same StreamExecutionEnvironment many times + val streamGraph = env.getStreamGraph + streamGraph.setJobName(scenarioName.value) + val jobGraph = streamGraph.getJobGraph + setupJobGraph(jobGraph, savepointRestoreSettings, modelClassLoader) + + val id = miniCluster.submitJob(jobGraph).get().getJobID + miniCluster.requestJobResult(id).get().toJobExecutionResult(getClass.getClassLoader) + } + + private def setupJobGraph( + jobGraph: JobGraph, + savepointRestoreSettings: SavepointRestoreSettings, + modelClassLoader: ModelClassLoader + ): Unit = { + jobGraph.setClasspaths(modelClassLoader.urls.asJava) + jobGraph.setSavepointRestoreSettings(savepointRestoreSettings) + } + + def close(): Unit = { + env.close() + miniCluster.close() + } + +} + +object ScenarioTestingMiniClusterWrapper extends LazyLogging { + + def create(parallelism: Int, streamExecutionConfig: Configuration): ScenarioTestingMiniClusterWrapper = { + logger.debug(s"Creating MiniCluster with numTaskSlots = $parallelism") + val miniCluster = ScenarioTestingMiniClusterFactory.createConfiguredMiniCluster(parallelism) + logger.debug( + s"Creating local StreamExecutionEnvironment with parallelism = $parallelism and configuration = $streamExecutionConfig" + ) + val env = ScenarioTestingStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment( + parallelism, + streamExecutionConfig + ) + new ScenarioTestingMiniClusterWrapper(miniCluster, env, parallelism) + } + +} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingStreamExecutionEnvironmentFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingStreamExecutionEnvironmentFactory.scala new file mode 100644 index 00000000000..485e99e2080 --- /dev/null +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/scenariotesting/ScenarioTestingStreamExecutionEnvironmentFactory.scala @@ -0,0 +1,19 @@ +package pl.touk.nussknacker.engine.process.scenariotesting + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment + +object ScenarioTestingStreamExecutionEnvironmentFactory { + + def createStreamExecutionEnvironment(parallelism: Int, configuration: Configuration): StreamExecutionEnvironment = { + val env = StreamExecutionEnvironment.createLocalEnvironment( + parallelism, + configuration + ) + // Checkpoints are disabled to prevent waiting for checkpoint to happen + // before finishing execution. + env.getCheckpointConfig.disableCheckpointing() + env + } + +} diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala index cdbdb6b652e..8689d06c41e 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkStreamingProcessMainSpec.scala @@ -7,18 +7,9 @@ import org.scalatest.Inside import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api._ -import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration -import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ -import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder -import pl.touk.nussknacker.engine.process.runner.SimpleProcessConfigCreator.{ - sinkForIntsResultsHolder, - valueMonitorResultsHolder -} - -import java.net.ConnectException class FlinkStreamingProcessMainSpec extends AnyFlatSpec with Matchers with Inside { @@ -51,50 +42,3 @@ class FlinkStreamingProcessMainSpec extends AnyFlatSpec with Matchers with Insid } } - -class SimpleProcessConfigCreator extends EmptyProcessConfigCreator { - - override def services(modelDependencies: ProcessObjectDependencies): Map[String, WithCategories[Service]] = - Map( - "logService" -> WithCategories(LogService, "c1"), - "throwingService" -> WithCategories(new ThrowingService(new RuntimeException("Thrown as expected")), "c1"), - "throwingTransientService" -> WithCategories(new ThrowingService(new ConnectException()), "c1"), - "returningDependentTypeService" -> WithCategories(ReturningDependentTypeService, "c1"), - "collectingEager" -> WithCategories(CollectingEagerService, "c1"), - "returningComponentUseCaseService" -> WithCategories(ReturningComponentUseCaseService, "c1") - ) - - override def sinkFactories( - modelDependencies: ProcessObjectDependencies - ): Map[String, WithCategories[SinkFactory]] = Map( - "monitor" -> WithCategories(SinkFactory.noParam(MonitorEmptySink), "c2"), - "valueMonitor" -> WithCategories(SinkForAny(valueMonitorResultsHolder), "c2"), - "sinkForInts" -> WithCategories.anyCategory(SinkForInts(sinkForIntsResultsHolder)) - ) - - override def customStreamTransformers( - modelDependencies: ProcessObjectDependencies - ): Map[String, WithCategories[CustomStreamTransformer]] = Map( - "stateCustom" -> WithCategories.anyCategory(StateCustomNode), - "transformWithTime" -> WithCategories.anyCategory(TransformerWithTime), - "joinBranchExpression" -> WithCategories.anyCategory(CustomJoinUsingBranchExpressions), - "transformerAddingComponentUseCase" -> WithCategories.anyCategory(TransformerAddingComponentUseCase) - ) - - override def sourceFactories( - modelDependencies: ProcessObjectDependencies - ): Map[String, WithCategories[SourceFactory]] = Map( - "input" -> WithCategories(simpleRecordSource(Nil), "cat2"), - "jsonInput" -> WithCategories(jsonSource, "cat2"), - "typedJsonInput" -> WithCategories(TypedJsonSource, "cat2"), - "genericSourceWithCustomVariables" -> WithCategories.anyCategory(GenericSourceWithCustomVariables) - ) - -} - -object SimpleProcessConfigCreator extends Serializable { - - val valueMonitorResultsHolder = new TestResultsHolder[AnyRef] - val sinkForIntsResultsHolder = new TestResultsHolder[java.lang.Integer] - -} diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala new file mode 100644 index 00000000000..dc4206f898c --- /dev/null +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala @@ -0,0 +1,59 @@ +package pl.touk.nussknacker.engine.process.runner + +import pl.touk.nussknacker.engine.api.process._ +import pl.touk.nussknacker.engine.api.{CustomStreamTransformer, Service} +import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ +import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder +import pl.touk.nussknacker.engine.process.runner.SimpleProcessConfigCreator.{ + sinkForIntsResultsHolder, + valueMonitorResultsHolder +} + +import java.net.ConnectException + +class SimpleProcessConfigCreator extends EmptyProcessConfigCreator { + + override def services(modelDependencies: ProcessObjectDependencies): Map[String, WithCategories[Service]] = + Map( + "logService" -> WithCategories(LogService, "c1"), + "throwingService" -> WithCategories(new ThrowingService(new RuntimeException("Thrown as expected")), "c1"), + "throwingTransientService" -> WithCategories(new ThrowingService(new ConnectException()), "c1"), + "returningDependentTypeService" -> WithCategories(ReturningDependentTypeService, "c1"), + "collectingEager" -> WithCategories(CollectingEagerService, "c1"), + "returningComponentUseCaseService" -> WithCategories(ReturningComponentUseCaseService, "c1") + ) + + override def sinkFactories( + modelDependencies: ProcessObjectDependencies + ): Map[String, WithCategories[SinkFactory]] = Map( + "monitor" -> WithCategories(SinkFactory.noParam(MonitorEmptySink), "c2"), + "valueMonitor" -> WithCategories(SinkForAny(valueMonitorResultsHolder), "c2"), + "sinkForInts" -> WithCategories.anyCategory(SinkForInts(sinkForIntsResultsHolder)) + ) + + override def customStreamTransformers( + modelDependencies: ProcessObjectDependencies + ): Map[String, WithCategories[CustomStreamTransformer]] = Map( + "stateCustom" -> WithCategories.anyCategory(StateCustomNode), + "transformWithTime" -> WithCategories.anyCategory(TransformerWithTime), + "joinBranchExpression" -> WithCategories.anyCategory(CustomJoinUsingBranchExpressions), + "transformerAddingComponentUseCase" -> WithCategories.anyCategory(TransformerAddingComponentUseCase) + ) + + override def sourceFactories( + modelDependencies: ProcessObjectDependencies + ): Map[String, WithCategories[SourceFactory]] = Map( + "input" -> WithCategories(simpleRecordSource(Nil), "cat2"), + "jsonInput" -> WithCategories(jsonSource, "cat2"), + "typedJsonInput" -> WithCategories(TypedJsonSource, "cat2"), + "genericSourceWithCustomVariables" -> WithCategories.anyCategory(GenericSourceWithCustomVariables) + ) + +} + +object SimpleProcessConfigCreator extends Serializable { + + val valueMonitorResultsHolder = new TestResultsHolder[AnyRef] + val sinkForIntsResultsHolder = new TestResultsHolder[java.lang.Integer] + +} diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerSpec.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerSpec.scala index 0916e833329..0de3599cc72 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerSpec.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerSpec.scala @@ -16,6 +16,7 @@ import pl.touk.nussknacker.engine.api.deployment.{ } import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} +import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.util.loader.ModelClassLoader @@ -102,23 +103,35 @@ class FlinkStreamingDeploymentManagerSpec extends AnyFunSuite with Matchers with } test("save state when redeploying") { - val processName = ProcessName("redeploy") - val outTopic = s"output-$processName" - val processEmittingOneElementAfterStart = StatefulSampleProcess.prepareProcess(processName) + val processEmittingOneElementAfterStart = StatefulSampleProcess.prepareProcess(ProcessName("redeploy")) + testRedeployWithStatefulSampleProcess(processEmittingOneElementAfterStart) + } + + test("redeploy scenario with greater parallelism than configured in mini cluster") { + val processEmittingOneElementAfterStart = + StatefulSampleProcess.prepareProcess(ProcessName("redeploy-parallelism-2"), parallelism = 2) + testRedeployWithStatefulSampleProcess(processEmittingOneElementAfterStart) + } + + private def testRedeployWithStatefulSampleProcess(processEmittingOneElementAfterStart: CanonicalProcess) = { + val outTopic = s"output-${processEmittingOneElementAfterStart.name}" kafkaClient.createTopic(outTopic, 1) - deployProcessAndWaitIfRunning(processEmittingOneElementAfterStart, empty(processName)) + deployProcessAndWaitIfRunning(processEmittingOneElementAfterStart, empty(processEmittingOneElementAfterStart.name)) try { // we wait for first element to appear in kafka to be sure it's processed, before we proceed to checkpoint messagesFromTopic(outTopic, 1) shouldBe List("[One element]") - deployProcessAndWaitIfRunning(processEmittingOneElementAfterStart, empty(processName)) + deployProcessAndWaitIfRunning( + processEmittingOneElementAfterStart, + empty(processEmittingOneElementAfterStart.name) + ) val messages = messagesFromTopic(outTopic, 2) messages shouldBe List("[One element]", "[One element, One element]") } finally { - cancelProcess(processName) + cancelProcess(processEmittingOneElementAfterStart.name) } } @@ -261,8 +274,6 @@ class FlinkStreamingDeploymentManagerSpec extends AnyFunSuite with Matchers with } } - def empty(processName: ProcessName): ProcessVersion = ProcessVersion.empty.copy(processName = processName) - test("extract scenario definition") { val modelData = ModelData( processingTypeConfig = processingTypeConfig, @@ -279,6 +290,8 @@ class FlinkStreamingDeploymentManagerSpec extends AnyFunSuite with Matchers with definition.components.components.map(_.id) should contain(ComponentId(ComponentType.Service, "accountService")) } + def empty(processName: ProcessName): ProcessVersion = ProcessVersion.empty.copy(processName = processName) + private def messagesFromTopic(outTopic: String, count: Int): List[String] = kafkaClient .createConsumer() diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala index 560206a719f..adf823079f6 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala @@ -17,6 +17,7 @@ import pl.touk.nussknacker.test.{KafkaConfigProperties, VeryPatientScalaFutures, import java.util.UUID import scala.concurrent.Await import scala.jdk.CollectionConverters._ +import scala.util.Using class FlinkStreamingProcessTestRunnerSpec extends AnyFlatSpec @@ -45,20 +46,26 @@ class FlinkStreamingProcessTestRunnerSpec ) it should "run scenario in test mode" in { - val deploymentManager = + Using.resource( FlinkStreamingDeploymentManagerProviderHelper.createDeploymentManager(ConfigWithUnresolvedVersion(config)) - - val processName = ProcessName(UUID.randomUUID().toString) - val processVersion = ProcessVersion.empty.copy(processName = processName) - - val process = SampleProcess.prepareProcess(processName) - - whenReady(deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData))) { r => - r.nodeResults shouldBe Map( - "startProcess" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))), - "nightFilter" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))), - "endSend" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))) - ) + ) { deploymentManager => + val processName = ProcessName(UUID.randomUUID().toString) + val processVersion = ProcessVersion.empty.copy(processName = processName) + + val process = SampleProcess.prepareProcess(processName) + + whenReady(deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData))) { + r => + r.nodeResults shouldBe Map( + "startProcess" -> List( + ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere"))) + ), + "nightFilter" -> List( + ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere"))) + ), + "endSend" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))) + ) + } } } @@ -71,16 +78,17 @@ class FlinkStreamingProcessTestRunnerSpec .source("startProcess", "kafka-transaction") .emptySink("endSend", "sendSmsNotExist") - val deploymentManager = + Using.resource( FlinkStreamingDeploymentManagerProviderHelper.createDeploymentManager(ConfigWithUnresolvedVersion(config)) - - val caught = intercept[IllegalArgumentException] { - Await.result( - deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData)), - patienceConfig.timeout - ) + ) { deploymentManager => + val caught = intercept[IllegalArgumentException] { + Await.result( + deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData)), + patienceConfig.timeout + ) + } + caught.getMessage shouldBe "Compilation errors: MissingSinkFactory(sendSmsNotExist,endSend)" } - caught.getMessage shouldBe "Compilation errors: MissingSinkFactory(sendSmsNotExist,endSend)" } private def variable(value: String): Json = diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StatefulSampleProcess.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StatefulSampleProcess.scala index 53101dd1ff2..68b2692b195 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StatefulSampleProcess.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StatefulSampleProcess.scala @@ -8,9 +8,10 @@ object StatefulSampleProcess { import pl.touk.nussknacker.engine.spel.SpelExtension._ - def prepareProcess(name: ProcessName): CanonicalProcess = { + def prepareProcess(name: ProcessName, parallelism: Int = 1): CanonicalProcess = { ScenarioBuilder .streaming(name.value) + .parallelism(parallelism) .source("state", "oneSource") .customNode("stateful", "stateVar", "stateful", "groupBy" -> "#input".spel) .emptySink("end", "kafka-string", "Topic" -> s"'output-$name'".spel, "Value" -> "#stateVar".spel) diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala index 176a27d84da..7418573086f 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala @@ -30,6 +30,7 @@ trait StreamingDockerTest extends DockerTest with BeforeAndAfterAll with Matcher override def afterAll(): Unit = { kafkaClient.shutdown() logger.info("Kafka client closed") + deploymentManager.close() super.afterAll() } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala index 0a5021c6bb5..b8ef97670d7 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala @@ -1,8 +1,11 @@ package pl.touk.nussknacker.engine.management -import pl.touk.nussknacker.engine.api.deployment.cache.ScenarioStateCachingConfig +import net.ceedubs.ficus.Ficus +import net.ceedubs.ficus.readers.ValueReader +import org.apache.flink.configuration.Configuration import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.jdk.CollectionConverters._ /** * FlinkConfig deployment configuration. @@ -15,12 +18,14 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration} final case class FlinkConfig( restUrl: Option[String], jobManagerTimeout: FiniteDuration = 1 minute, + // TODO: move to scenarioTesting shouldVerifyBeforeDeploy: Boolean = true, shouldCheckAvailableSlots: Boolean = true, waitForDuringDeployFinish: FlinkWaitForDuringDeployFinishedConfig = FlinkWaitForDuringDeployFinishedConfig(enabled = true, Some(180), Some(1 second)), scenarioStateRequestTimeout: FiniteDuration = 3 seconds, jobConfigsCacheSize: Int = 1000, + scenarioTesting: ScenarioTestingConfig = ScenarioTestingConfig() ) object FlinkConfig { @@ -51,3 +56,19 @@ final case class FlinkWaitForDuringDeployFinishedConfig( } final case class EnabledFlinkWaitForDuringDeployFinishedConfig(maxChecks: Int, delay: FiniteDuration) + +final case class ScenarioTestingConfig( + reuseMiniClusterForScenarioTesting: Boolean = true, + reuseMiniClusterForScenarioStateVerification: Boolean = true, + parallelism: Int = 1, + streamExecutionConfig: Configuration = new Configuration +) + +object ScenarioTestingConfig { + + import Ficus._ + + implicit val flinkConfigurationValueReader: ValueReader[Configuration] = + Ficus.mapValueReader[String].map(map => Configuration.fromMap(map.asJava)) + +} diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index b6f768ee7c1..180263c7b7c 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -13,6 +13,11 @@ import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, V import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId} import pl.touk.nussknacker.engine.management.FlinkDeploymentManager.prepareProgramArgs +import pl.touk.nussknacker.engine.management.scenariotesting.{ + FlinkProcessTestRunner, + FlinkProcessVerifier, + ScenarioTestingMiniClusterWrapperFactory +} import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment} import scala.concurrent.Future @@ -21,15 +26,28 @@ abstract class FlinkDeploymentManager( modelData: BaseModelData, dependencies: DeploymentManagerDependencies, shouldVerifyBeforeDeploy: Boolean, - mainClassName: String + mainClassName: String, + scenarioTestingConfig: ScenarioTestingConfig ) extends DeploymentManager with LazyLogging { import dependencies._ - private lazy val testRunner = new FlinkProcessTestRunner(modelData.asInvokableModelData) + private lazy val scenarioTestingMiniClusterWrapperOpt = + ScenarioTestingMiniClusterWrapperFactory.createIfConfigured( + modelData.asInvokableModelData.modelClassLoader, + scenarioTestingConfig + ) + + private lazy val testRunner = new FlinkProcessTestRunner( + modelData.asInvokableModelData, + scenarioTestingMiniClusterWrapperOpt.filter(_ => scenarioTestingConfig.reuseMiniClusterForScenarioTesting) + ) - private lazy val verification = new FlinkProcessVerifier(modelData.asInvokableModelData) + private lazy val verification = new FlinkProcessVerifier( + modelData.asInvokableModelData, + scenarioTestingMiniClusterWrapperOpt.filter(_ => scenarioTestingConfig.reuseMiniClusterForScenarioStateVerification) + ) /** * Gets status from engine, handles finished state, resolves possible inconsistency with lastAction and formats status using `ProcessStateDefinitionManager` @@ -115,7 +133,7 @@ abstract class FlinkDeploymentManager( makeSavepoint(_, savepointDir) } case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) => - testRunner.test(canonicalProcess, scenarioTestData) + testRunner.runTestsAsync(canonicalProcess, scenarioTestData) case _: DMRunOffScheduleCommand => notImplemented } @@ -262,6 +280,11 @@ abstract class FlinkDeploymentManager( override def processStateDefinitionManager: ProcessStateDefinitionManager = FlinkProcessStateDefinitionManager + override def close(): Unit = { + logger.info("Closing Flink Deployment Manager") + scenarioTestingMiniClusterWrapperOpt.foreach(_.close()) + } + } object FlinkDeploymentManager { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala deleted file mode 100644 index 853187e57d4..00000000000 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessTestRunner.scala +++ /dev/null @@ -1,29 +0,0 @@ -package pl.touk.nussknacker.engine.management - -import io.circe.Json -import org.apache.flink.configuration.Configuration -import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.test.ScenarioTestData -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults -import pl.touk.nussknacker.engine.util.StaticMethodRunner - -import scala.concurrent.{ExecutionContext, Future} - -class FlinkProcessTestRunner(modelData: ModelData) - extends StaticMethodRunner( - modelData.modelClassLoader.classLoader, - "pl.touk.nussknacker.engine.process.runner.FlinkTestMain", - "run" - ) { - - // NU-1455: We encode variable on the engine, because of classLoader's problems - def test(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)( - implicit ec: ExecutionContext - ): Future[TestResults[Json]] = - Future { - tryToInvoke(modelData, canonicalProcess, scenarioTestData, new Configuration()) - .asInstanceOf[TestResults[Json]] - } - -} diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala index 3f22e5a8d17..efa2e32762b 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala @@ -5,14 +5,7 @@ import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.{JobID, JobStatus} import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.deployment.scheduler._ -import pl.touk.nussknacker.engine.api.deployment.scheduler.services.{ - AdditionalDeploymentDataProvider, - ProcessConfigEnricherFactory, - SchedulePropertyExtractorFactory, - ScheduledExecutionPerformer, - ScheduledProcessListenerFactory -} +import pl.touk.nussknacker.engine.api.deployment.scheduler.services._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -30,8 +23,14 @@ class FlinkRestManager( config: FlinkConfig, modelData: BaseModelData, dependencies: DeploymentManagerDependencies, - mainClassName: String -) extends FlinkDeploymentManager(modelData, dependencies, config.shouldVerifyBeforeDeploy, mainClassName) + mainClassName: String, +) extends FlinkDeploymentManager( + modelData, + dependencies, + config.shouldVerifyBeforeDeploy, + mainClassName, + config.scenarioTesting + ) with LazyLogging { import dependencies._ @@ -318,10 +317,6 @@ class FlinkRestManager( Future.successful(()) } - override def close(): Unit = { - logger.info("Closing Flink REST manager") - } - } object FlinkRestManager { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkScheduledExecutionPerformer.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkScheduledExecutionPerformer.scala index c312c7834db..480620ea3d2 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkScheduledExecutionPerformer.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkScheduledExecutionPerformer.scala @@ -29,6 +29,7 @@ object FlinkScheduledExecutionPerformer { import dependencies._ import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ + import ScenarioTestingConfig._ val flinkConfig = config.rootAs[FlinkConfig] new FlinkScheduledExecutionPerformer( flinkClient = HttpFlinkClient.createUnsafe(flinkConfig), diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala index 7081194a2cd..b6ae35d6be3 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala @@ -20,6 +20,7 @@ class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ import pl.touk.nussknacker.engine.util.config.ConfigEnrichments._ + import ScenarioTestingConfig._ override def createDeploymentManager( modelData: BaseModelData, diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala new file mode 100644 index 00000000000..3e69d0ab93d --- /dev/null +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunner.scala @@ -0,0 +1,39 @@ +package pl.touk.nussknacker.engine.management.scenariotesting + +import io.circe.Json +import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.api.test.ScenarioTestData +import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults +import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker + +import scala.concurrent.{ExecutionContext, Future} + +class FlinkProcessTestRunner(modelData: ModelData, miniClusterWrapperOpt: Option[AutoCloseable]) { + + // We use reflection, because we don't want to bundle flinkExecutor.jar inside flinkDeploymentManager assembly jar + // because it is already in separate assembly for purpose of sending it to Flink during deployment. + // Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded + private val mainRunner = new ReflectiveMethodInvoker[TestResults[Json]]( + modelData.modelClassLoader.classLoader, + "pl.touk.nussknacker.engine.process.scenariotesting.FlinkTestMain", + "run" + ) + + def runTestsAsync(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData)( + implicit ec: ExecutionContext + ): Future[TestResults[Json]] = + Future { + runTests(canonicalProcess, scenarioTestData) + } + + // NU-1455: We encode variable on the engine, because of classLoader's problems + def runTests(canonicalProcess: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = + mainRunner.invokeStaticMethod( + miniClusterWrapperOpt, + modelData, + canonicalProcess, + scenarioTestData + ) + +} diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala similarity index 50% rename from engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala rename to engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala index 29b9226343a..8640660effe 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessVerifier.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessVerifier.scala @@ -1,23 +1,24 @@ -package pl.touk.nussknacker.engine.management +package pl.touk.nussknacker.engine.management.scenariotesting import com.typesafe.scalalogging.LazyLogging -import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.deployment.DeploymentData -import pl.touk.nussknacker.engine.util.StaticMethodRunner +import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker import scala.concurrent.Future import scala.util.control.NonFatal -class FlinkProcessVerifier(modelData: ModelData) - extends StaticMethodRunner( - modelData.modelClassLoader.classLoader, - "pl.touk.nussknacker.engine.process.runner.FlinkVerificationMain", - "run" - ) - with LazyLogging { +class FlinkProcessVerifier(modelData: ModelData, miniClusterWrapperOpt: Option[Any]) extends LazyLogging { + + // We use reflection, because we don't want to bundle flinkExecutor.jar inside flinkDeploymentManager assembly jar + // because it is already in separate assembly for purpose of sending it to Flink during deployment. + // Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded + private val mainRunner = new ReflectiveMethodInvoker[Unit]( + modelData.modelClassLoader.classLoader, + "pl.touk.nussknacker.engine.process.scenariotesting.FlinkVerificationMain", + "run" + ) def verify( processVersion: ProcessVersion, @@ -27,7 +28,13 @@ class FlinkProcessVerifier(modelData: ModelData) val processId = processVersion.processName try { logger.info(s"Starting to verify $processId") - tryToInvoke(modelData, canonicalProcess, processVersion, DeploymentData.empty, savepointPath, new Configuration()) + mainRunner.invokeStaticMethod( + miniClusterWrapperOpt, + modelData, + canonicalProcess, + processVersion, + savepointPath + ) logger.info(s"Verification of $processId successful") Future.successful(()) } catch { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala new file mode 100644 index 00000000000..9a86da446a6 --- /dev/null +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/scenariotesting/ScenarioTestingMiniClusterWrapperFactory.scala @@ -0,0 +1,34 @@ +package pl.touk.nussknacker.engine.management.scenariotesting + +import org.apache.flink.configuration.Configuration +import pl.touk.nussknacker.engine.management.ScenarioTestingConfig +import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker +import pl.touk.nussknacker.engine.util.loader.ModelClassLoader + +object ScenarioTestingMiniClusterWrapperFactory { + + // We use reflection, because we don't want to bundle flinkExecutor.jar inside flinkDeploymentManager assembly jar + // because it is already in separate assembly for purpose of sending it to Flink during deployment. + // Other option would be to add flinkExecutor.jar to classpath from which Flink DM is loaded + def createIfConfigured(modelClassLoader: ModelClassLoader, config: ScenarioTestingConfig): Option[AutoCloseable] = { + if (config.reuseMiniClusterForScenarioTesting || config.reuseMiniClusterForScenarioStateVerification) { + Some(create(modelClassLoader, config.parallelism, config.streamExecutionConfig)) + } else { + None + } + } + + private[nussknacker] def create( + modelClassLoader: ModelClassLoader, + parallelism: Int, + streamExecutionConfig: Configuration + ): AutoCloseable = { + val methodInvoker = new ReflectiveMethodInvoker[AutoCloseable]( + modelClassLoader.classLoader, + "pl.touk.nussknacker.engine.process.scenariotesting.ScenarioTestingMiniClusterWrapper", + "create" + ) + methodInvoker.invokeStaticMethod(parallelism, streamExecutionConfig) + } + +} diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala similarity index 85% rename from engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala rename to engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala index 13db1d66a46..80200d0c17d 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/scenariotesting/FlinkProcessTestRunnerSpec.scala @@ -1,11 +1,12 @@ -package pl.touk.nussknacker.engine.process.runner +package pl.touk.nussknacker.engine.management.scenariotesting import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import io.circe.Json +import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.client.JobExecutionException import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec -import org.scalatest.{BeforeAndAfterEach, Inside, OptionValues} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Inside, OptionValues} import pl.touk.nussknacker.engine.api.component.{ ComponentAdditionalConfig, DesignerWideComponentId, @@ -23,6 +24,7 @@ import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.canonicalgraph.canonicalnode.FlatNode import pl.touk.nussknacker.engine.compile.FragmentResolver +import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs import pl.touk.nussknacker.engine.flink.test.{ FlinkTestConfiguration, RecordingExceptionConsumer, @@ -31,22 +33,25 @@ import pl.touk.nussknacker.engine.flink.test.{ import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter} import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition} -import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ -import pl.touk.nussknacker.engine.process.runner.FlinkTestMainSpec.{ +import pl.touk.nussknacker.engine.management.scenariotesting.FlinkProcessTestRunnerSpec.{ fragmentWithValidationName, processWithFragmentParameterValidation } +import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ import pl.touk.nussknacker.engine.testmode.TestProcess._ -import pl.touk.nussknacker.engine.util.ThreadUtils +import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import pl.touk.nussknacker.engine.{ModelConfigs, ModelData} -import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs import java.util.{Date, UUID} -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} -class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with BeforeAndAfterEach with OptionValues { +class FlinkProcessTestRunnerSpec + extends AnyWordSpec + with Matchers + with Inside + with BeforeAndAfterEach + with BeforeAndAfterAll + with OptionValues { import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap @@ -54,6 +59,13 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor private val scenarioName = "proc1" private val sourceNodeId = "id" private val firstSubtaskIndex = 0 + private val modelClassLoader = ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround) + + private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create( + modelClassLoader, + parallelism = 1, + streamExecutionConfig = new Configuration + ) override def beforeEach(): Unit = { super.beforeEach() @@ -61,6 +73,11 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor LogService.clear() } + override protected def afterAll(): Unit = { + super.afterAll() + scenarioTestingMiniClusterWrapper.close() + } + "A scenario run on Flink engine" when { "IO monad interpreter is used" should { runTests(useIOMonadInInterpreter = true) @@ -85,15 +102,14 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor val input = SimpleRecord("0", 1, "2", new Date(3), Some(4), 5, "6") val input2 = SimpleRecord("0", 11, "2", new Date(3), Some(4), 5, "6") - val results = runFlinkTest( + val results = prepareTestRunner(useIOMonadInInterpreter).runTests( process, ScenarioTestData( List( ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")), ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6")) ) - ), - useIOMonadInInterpreter + ) ) val nodeResults = results.nodeResults @@ -136,6 +152,47 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor LogService.invocationsCount.get() shouldBe 0 } + "be able to run tests multiple time on the same mini cluster" in { + val process = + ScenarioBuilder + .streaming(scenarioName) + .source(sourceNodeId, "input") + .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) + + val input = SimpleRecord("0", 11, "2", new Date(3), Some(4), 5, "6") + + + val testRunner = prepareTestRunner(useIOMonadInInterpreter) + + def runTestAndVerify() = { + val results = testRunner.runTests( + process, + ScenarioTestData( + List( + ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6")) + ) + ) + ) + + val nodeResults = results.nodeResults + + nodeResults(sourceNodeId) shouldBe List(nodeResult(0, "input" -> input)) + nodeResults("out") shouldBe List(nodeResult(0, "input" -> input)) + + val invocationResults = results.invocationResults + + invocationResults("out") shouldBe + List(ExpressionInvocationResult(s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-0", "Value", variable(11))) + + results.externalInvocationResults("out") shouldBe List( + ExternalInvocationResult(s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-0", "valueMonitor", variable(11)) + ) + } + + runTestAndVerify() + runTestAndVerify() + } + "collect results for split" in { val process = ScenarioBuilder @@ -143,10 +200,9 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor .source(sourceNodeId, "input") .split("splitId1", GraphBuilder.emptySink("out1", "monitor"), GraphBuilder.emptySink("out2", "monitor")) - val results = runFlinkTest( + val results = prepareTestRunner(useIOMonadInInterpreter).runTests( process, ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))), - useIOMonadInInterpreter ) results.nodeResults("splitId1") shouldBe List( @@ -177,10 +233,9 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor val aggregate = SimpleRecordWithPreviousValue(input, 0, "s") val aggregate2 = SimpleRecordWithPreviousValue(input2, 1, "s") - val results = runFlinkTest( + val results = prepareTestRunner(useIOMonadInInterpreter).runTests( process, ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))), - useIOMonadInInterpreter ) val nodeResults = results.nodeResults @@ -231,16 +286,14 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor .emptySink("out", "monitor") val results = - runFlinkTest( + prepareTestRunner(useIOMonadInInterpreter).runTests( process, ScenarioTestData(createTestRecord() :: List.fill(4)(createTestRecord(value1 = 11))), - useIOMonadInInterpreter ) val nodeResults = results.nodeResults nodeResults(sourceNodeId) should have length 5 - } "detect errors" in { @@ -252,7 +305,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor .filter("filter", "1 / #input.value1 >= 0".spel) .emptySink("out", "monitor") - val results = runFlinkTest( + val results = prepareTestRunner(useIOMonadInInterpreter).runTests( process, ScenarioTestData( List( @@ -262,7 +315,6 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor createTestRecord(id = "3", value1 = 4) ) ), - useIOMonadInInterpreter ) val nodeResults = results.nodeResults @@ -309,8 +361,11 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor .emptySink("out", "monitor") val exceptionConsumerId = UUID.randomUUID().toString - val results = runFlinkTest( - process = process, + val results = prepareTestRunner( + useIOMonadInInterpreter, + enrichDefaultConfig = RecordingExceptionConsumerProvider.configWithProvider(_, exceptionConsumerId) + ).runTests( + process, scenarioTestData = ScenarioTestData( List( createTestRecord(id = "0", value1 = 1), @@ -318,9 +373,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor createTestRecord(id = "2", value1 = 2), createTestRecord(id = "3", value1 = 4) ) - ), - useIOMonadInInterpreter, - enrichDefaultConfig = RecordingExceptionConsumerProvider.configWithProvider(_, exceptionConsumerId) + ) ) val nodeResults = results.nodeResults @@ -340,11 +393,12 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor .processor("failing", "throwingTransientService", "throw" -> "#input.value1 == 2".spel) .emptySink("out", "monitor") - val run = Future { - runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + intercept[JobExecutionException] { + prepareTestRunner(useIOMonadInInterpreter).runTests( + process, + ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))) + ) } - - intercept[JobExecutionException](Await.result(run, 10 seconds)) } "handle json input" in { @@ -370,7 +424,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor ) ) - val results = runFlinkTest(process, testData, useIOMonadInInterpreter) + val results = prepareTestRunner(useIOMonadInInterpreter).runTests(process, testData) results.nodeResults(sourceNodeId) should have size 3 results.externalInvocationResults("out") shouldBe @@ -400,7 +454,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor .emptySink("out", "valueMonitor", "Value" -> "#additionalOne + '|' + #additionalTwo".spel) val testData = ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("abc")))) - val results = runFlinkTest(process, testData, useIOMonadInInterpreter) + val results = prepareTestRunner(useIOMonadInInterpreter).runTests(process, testData) results.nodeResults(sourceNodeId) should have size 1 results.externalInvocationResults("out") shouldBe @@ -421,13 +475,14 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor .emptySink("out", "sinkForInts", "Value" -> "15 / {0, 1}[0]".spel) val results = - runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + prepareTestRunner(useIOMonadInInterpreter).runTests( + process, + ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))) + ) results.exceptions should have length 1 results.exceptions.head.nodeId shouldBe Some("out") results.exceptions.head.throwable.getMessage should include("message: / by zero") - - SimpleProcessConfigCreator.sinkForIntsResultsHolder.results should have length 0 } "be able to test process with time windows" in { @@ -441,7 +496,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor def recordWithSeconds(duration: FiniteDuration) = ScenarioTestJsonRecord(sourceNodeId, Json.fromString(s"0|0|0|${duration.toMillis}|0|0|0")) - val results = runFlinkTest( + val results = prepareTestRunner(useIOMonadInInterpreter).runTests( process, ScenarioTestData( List( @@ -451,8 +506,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor recordWithSeconds(9 second), recordWithSeconds(20 second) ) - ), - useIOMonadInInterpreter + ) ) val nodeResults = results.nodeResults @@ -472,15 +526,14 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor ) .emptySink("out", "valueMonitor", "Value" -> "#input.field1 + #input.field2".spel) - val results = runFlinkTest( + val results = prepareTestRunner(useIOMonadInInterpreter).runTests( process, ScenarioTestData( ScenarioTestJsonRecord( sourceNodeId, Json.obj("field1" -> Json.fromString("abc"), "field2" -> Json.fromString("def")) ) :: Nil - ), - useIOMonadInInterpreter + ) ) results.invocationResults("out").map(_.value) shouldBe List(variable("abcdef")) @@ -504,7 +557,10 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor .emptySink("out", "valueMonitor", "Value" -> "#parsed.size + ' ' + #parsed[0].field2".spel) val results = - runFlinkTest(process, ScenarioTestData(List(createTestRecord(value1 = valueToReturn))), useIOMonadInInterpreter) + prepareTestRunner(useIOMonadInInterpreter).runTests( + process, + ScenarioTestData(List(createTestRecord(value1 = valueToReturn))) + ) results.invocationResults("out").map(_.value) shouldBe List(variable(s"$countToPass $valueToReturn")) } @@ -527,7 +583,8 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor val recordTrue = createTestRecord(id = "ala") val recordFalse = createTestRecord(id = "bela") - val results = runFlinkTest(process, ScenarioTestData(List(recordTrue, recordFalse)), useIOMonadInInterpreter) + val results = + prepareTestRunner(useIOMonadInInterpreter).runTests(process, ScenarioTestData(List(recordTrue, recordFalse))) val invocationResults = results.invocationResults @@ -565,7 +622,8 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor val recB = createTestRecord(id = "b") val recC = createTestRecord(id = "c") - val results = runFlinkTest(process, ScenarioTestData(List(recA, recB, recC)), useIOMonadInInterpreter) + val results = + prepareTestRunner(useIOMonadInInterpreter).runTests(process, ScenarioTestData(List(recA, recB, recC))) results.invocationResults("proc2").map(_.contextId) should contain only ( s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-1-end1", @@ -574,7 +632,9 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-2-end2" ) - results.externalInvocationResults("proc2").map(_.value.asInstanceOf[Json]) should contain theSameElementsAs List( + results + .externalInvocationResults("proc2") + .map(_.value.asInstanceOf[Json]) should contain theSameElementsAs List( "b", "a", "c", @@ -620,7 +680,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor val recordC = recordA.copy(id = "c") val recordD = recordA.copy(id = "d") - val results = runFlinkTest(process, scenarioTestData, useIOMonadInInterpreter) + val results = prepareTestRunner(useIOMonadInInterpreter).runTests(process, scenarioTestData) val nodeResults = results.nodeResults nodeResults("source1") shouldBe List( @@ -667,7 +727,10 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor .emptySink("out", "valueMonitor", "Value" -> "{#componentUseCaseService, #componentUseCaseCustomNode}".spel) val results = - runFlinkTest(process, ScenarioTestData(List(createTestRecord(sourceId = "start"))), useIOMonadInInterpreter) + prepareTestRunner(useIOMonadInInterpreter).runTests( + process, + ScenarioTestData(List(createTestRecord(sourceId = "start"))) + ) results.invocationResults("out").map(_.value) shouldBe List( variable(List(ComponentUseCase.TestRuntime, ComponentUseCase.TestRuntime)) @@ -687,10 +750,12 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor ) .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) - val run = Future { - runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter) + val dictEditorException = intercept[IllegalStateException] { + prepareTestRunner(useIOMonadInInterpreter).runTests( + process, + ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))) + ) } - val dictEditorException = intercept[IllegalStateException](Await.result(run, 10 seconds)) dictEditorException.getMessage shouldBe "DictKeyWithLabel expression can only be used with DictParameterEditor, got Some(DualParameterEditor(StringParameterEditor,RAW))" } @@ -709,9 +774,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor ) .emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel) - val results = runFlinkTest( - process, - ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), + val results = prepareTestRunner( useIOMonadInInterpreter, additionalConfigsFromProvider = Map( DesignerWideComponentId("service-" + modifiedComponentName) -> ComponentAdditionalConfig( @@ -726,7 +789,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor ) ) ) - ) + ).runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2)))) results.exceptions should have length 0 } @@ -739,10 +802,9 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor val resolved = FragmentResolver(List(processWithFragmentParameterValidation)).resolve(scenario) - val results = runFlinkTest( + val results = prepareTestRunner(useIOMonadInInterpreter).runTests( resolved.valueOr { _ => throw new IllegalArgumentException("Won't happen") }, ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")))), - useIOMonadInInterpreter ) results.exceptions.length shouldBe 0 } @@ -755,23 +817,21 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor ): ScenarioTestJsonRecord = ScenarioTestJsonRecord(sourceId, Json.fromString(s"$id|$value1|2|3|4|5|6")) - private def runFlinkTest( - process: CanonicalProcess, - scenarioTestData: ScenarioTestData, + private def prepareTestRunner( useIOMonadInInterpreter: Boolean, enrichDefaultConfig: Config => Config = identity, additionalConfigsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty - ): TestResults[_] = { + ): FlinkProcessTestRunner = { val config = enrichDefaultConfig(ConfigFactory.load("application.conf")) .withValue("globalParameters.useIOMonadInInterpreter", ConfigValueFactory.fromAnyRef(useIOMonadInInterpreter)) // We need to set context loader to avoid forking in sbt - val modelData = ModelData.duringFlinkExecution( - ModelConfigs(config, AdditionalModelConfigs(additionalConfigsFromProvider)) + val modelData = ModelData.duringExecution( + ModelConfigs(config, AdditionalModelConfigs(additionalConfigsFromProvider)), + modelClassLoader, + resolveConfigs = false ) - ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - FlinkTestMain.run(modelData, process, scenarioTestData, FlinkTestConfiguration.configuration()) - } + new FlinkProcessTestRunner(modelData, Some(scenarioTestingMiniClusterWrapper)) } private def nodeResult(count: Int, vars: (String, Any)*): ResultContext[_] = @@ -806,7 +866,7 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor } -object FlinkTestMainSpec { +object FlinkProcessTestRunnerSpec { private val fragmentWithValidationName = "fragmentWithValidation" private val processWithFragmentParameterValidation: CanonicalProcess = { diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala index 4335d07293e..6c92d6e70fd 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkTestConfiguration.scala @@ -3,6 +3,8 @@ package pl.touk.nussknacker.engine.flink.test import com.github.ghik.silencer.silent import org.apache.flink.configuration._ +import java.net.URL + object FlinkTestConfiguration { // better to create each time because is mutable @@ -12,17 +14,26 @@ object FlinkTestConfiguration { val config = new Configuration config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagersCount) config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskSlotsCount) + + config.set(PipelineOptions.CLASSPATHS, classpathWorkaround.map(_.toString).asJava) + + setupMemory(config) + } + + // FIXME: better describe which classpath is used in this case + // This is a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265 + // Flink overwrite user classloader by the AppClassLoader if classpaths parameter is empty + // (implementation in org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager) + // which holds all needed jars/classes in case of running from Scala plugin in IDE. + // but in case of running from sbt it contains only sbt-launcher.jar + def classpathWorkaround: List[URL] = { + List(new URL("http://dummy-classpath.invalid")) + } + + def setupMemory(config: Configuration): Configuration = { // to prevent OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool on low memory env (like Travis) config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("16m")) config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("16m")) - - // This is a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265 - // Flink overwrite user classloader by the AppClassLoader if classpaths parameter is empty - // (implementation in org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager) - // which holds all needed jars/classes in case of running from Scala plugin in IDE. - // but in case of running from sbt it contains only sbt-launcher.jar - config.set(PipelineOptions.CLASSPATHS, List("http://dummy-classpath.invalid").asJava) - // This is to prevent memory problem in tests with mutliple Table API based aggregations. An IllegalArgExceptionon // is thrown with message "The minBucketMemorySize is not valid!" in // org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap.java:121 where memorySize is set diff --git a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/TestFromFileSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala similarity index 75% rename from engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/TestFromFileSpec.scala rename to engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala index 136e4601b73..5cbb12a32ee 100644 --- a/engine/flink/kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/kafka/source/flink/TestFromFileSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/kafka/KafkaScenarioTestingSpec.scala @@ -1,41 +1,45 @@ -package pl.touk.nussknacker.engine.kafka.source.flink +package pl.touk.nussknacker.scenariotesting.kafka import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory.fromAnyRef import com.typesafe.scalalogging.LazyLogging import io.circe.Json import io.circe.Json.{Null, fromString, obj} +import org.apache.flink.configuration.Configuration import org.apache.kafka.common.record.TimestampType -import org.scalatest.OptionValues import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, OptionValues} import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleValueParamName import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName import pl.touk.nussknacker.engine.kafka.source.InputMeta +import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessConfigCreator.ResultsHolders -import pl.touk.nussknacker.engine.process.runner.FlinkTestMain +import pl.touk.nussknacker.engine.management.scenariotesting.{ + FlinkProcessTestRunner, + ScenarioTestingMiniClusterWrapperFactory +} import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData -import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults -import pl.touk.nussknacker.engine.util.ThreadUtils import pl.touk.nussknacker.engine.util.json.ToJsonEncoder +import pl.touk.nussknacker.engine.util.loader.ModelClassLoader import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties} import java.util.Collections -class TestFromFileSpec +class KafkaScenarioTestingSpec extends AnyFunSuite with Matchers with LazyLogging with EitherValuesDetailedMessage - with OptionValues { + with OptionValues + with BeforeAndAfterAll { - private lazy val config = ConfigFactory + private val config = ConfigFactory .empty() .withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef("kafka_should_not_be_used:9092")) .withValue( @@ -44,13 +48,28 @@ class TestFromFileSpec ) .withValue("kafka.topicsExistenceValidationConfig.enabled", fromAnyRef(false)) - protected lazy val modelData: ModelData = + private val modelData: ModelData = LocalModelData( inputConfig = config, components = List.empty, - configCreator = new KafkaSourceFactoryProcessConfigCreator(() => TestFromFileSpec.resultsHolders) + configCreator = new KafkaSourceFactoryProcessConfigCreator(() => KafkaScenarioTestingSpec.resultsHolders), + modelClassLoader = new ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround) ) + private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create( + modelData.modelClassLoader, + parallelism = 1, + streamExecutionConfig = new Configuration + ) + + private val testRunner = + new FlinkProcessTestRunner(modelData, Some(scenarioTestingMiniClusterWrapper)) + + override protected def afterAll(): Unit = { + super.afterAll() + scenarioTestingMiniClusterWrapper.close() + } + test("Should pass correct timestamp from test data") { val topic = "simple" val expectedTimestamp = System.currentTimeMillis() @@ -93,7 +112,7 @@ class TestFromFileSpec _.add("value", obj("id" -> fromString("fooId"), "field" -> fromString("fooField"))) ) - val results = run(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil)) + val results = testRunner.runTests(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil)) val testResultVars = results.nodeResults("end").head.variables testResultVars("extractedTimestamp").hcursor.downField("pretty").as[Long].rightValue shouldBe expectedTimestamp @@ -122,25 +141,14 @@ class TestFromFileSpec .add("value", obj("id" -> fromString("1234"), "field" -> fromString("abcd"))) ) - val results = run(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil)) + val results = testRunner.runTests(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil)) results.nodeResults shouldBe Symbol("nonEmpty") } - private def run(process: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = { - ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - FlinkTestMain.run( - modelData, - process, - scenarioTestData, - FlinkTestConfiguration.configuration(), - ) - } - } - } -object TestFromFileSpec extends Serializable { +object KafkaScenarioTestingSpec extends Serializable { private val resultsHolders = new ResultsHolders diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TestWithTestDataSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala similarity index 82% rename from engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TestWithTestDataSpec.scala rename to engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala index 9ca30564187..38874bc8e63 100644 --- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TestWithTestDataSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/scenariotesting/schemedkafka/SchemedKafkaScenarioTestingSpec.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.schemedkafka.source.flink +package pl.touk.nussknacker.scenariotesting.schemedkafka import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory.fromAnyRef @@ -6,56 +6,54 @@ import com.typesafe.scalalogging.LazyLogging import io.circe.Json import io.circe.Json._ import org.apache.avro.Schema +import org.apache.flink.configuration.Configuration import org.apache.kafka.common.record.TimestampType -import org.scalatest.{LoneElement, OptionValues} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, LoneElement, OptionValues} import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleValueParamName import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName import pl.touk.nussknacker.engine.kafka.source.InputMeta +import pl.touk.nussknacker.engine.management.scenariotesting.{FlinkProcessTestRunner, ScenarioTestingMiniClusterWrapperFactory} import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder -import pl.touk.nussknacker.engine.process.runner.FlinkTestMain import pl.touk.nussknacker.engine.schemedkafka.KafkaAvroIntegrationMockSchemaRegistry.schemaRegistryMockClient import pl.touk.nussknacker.engine.schemedkafka.KafkaAvroTestProcessConfigCreator -import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer.{ - schemaVersionParamName, - topicParamName -} +import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer.{schemaVersionParamName, topicParamName} import pl.touk.nussknacker.engine.schemedkafka.schema.{Address, Company} import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.MockSchemaRegistryClientFactory import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClientFactory, SchemaVersionOption} -import pl.touk.nussknacker.engine.schemedkafka.source.flink.TestWithTestDataSpec.sinkForInputMetaResultsHolder import pl.touk.nussknacker.engine.spel.SpelExtension._ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.testmode.TestProcess._ -import pl.touk.nussknacker.engine.util.ThreadUtils import pl.touk.nussknacker.engine.util.json.ToJsonEncoder +import pl.touk.nussknacker.engine.util.loader.ModelClassLoader +import pl.touk.nussknacker.scenariotesting.schemedkafka.SchemedKafkaScenarioTestingSpec.sinkForInputMetaResultsHolder import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties} import java.util.Collections -class TestWithTestDataSpec +class SchemedKafkaScenarioTestingSpec extends AnyFunSuite with Matchers with LazyLogging with EitherValuesDetailedMessage with OptionValues - with LoneElement { + with LoneElement + with BeforeAndAfterAll { - private lazy val creator: KafkaAvroTestProcessConfigCreator = + private val creator: KafkaAvroTestProcessConfigCreator = new KafkaAvroTestProcessConfigCreator(sinkForInputMetaResultsHolder) { override protected def schemaRegistryClientFactory: SchemaRegistryClientFactory = MockSchemaRegistryClientFactory.confluentBased(schemaRegistryMockClient) } - private lazy val config = ConfigFactory + private val config = ConfigFactory .empty() .withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef("kafka_should_not_be_used:9092")) .withValue( @@ -65,6 +63,28 @@ class TestWithTestDataSpec .withValue("kafka.topicsExistenceValidationConfig.enabled", fromAnyRef(false)) .withValue("kafka.avroKryoGenericRecordSchemaIdSerialization", fromAnyRef(false)) + private val modelData = + LocalModelData( + config, + List.empty, + configCreator = creator, + modelClassLoader = new ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround) + ) + + private val scenarioTestingMiniClusterWrapper = ScenarioTestingMiniClusterWrapperFactory.create( + modelData.modelClassLoader, + parallelism = 1, + streamExecutionConfig = new Configuration + ) + + private val testRunner = + new FlinkProcessTestRunner(modelData, Some(scenarioTestingMiniClusterWrapper)) + + override protected def afterAll(): Unit = { + super.afterAll() + scenarioTestingMiniClusterWrapper.close() + } + test("Should pass correct timestamp from test data") { val topic = UnspecializedTopicName("address") val expectedTimestamp = System.currentTimeMillis() @@ -111,7 +131,7 @@ class TestWithTestDataSpec val testRecordJson = obj("keySchemaId" -> Null, "valueSchemaId" -> fromInt(id), "consumerRecord" -> consumerRecord) val scenarioTestData = ScenarioTestData(ScenarioTestJsonRecord("start", testRecordJson) :: Nil) - val results = run(process, scenarioTestData) + val results = testRunner.runTests(process, scenarioTestData) val testResultVars = results.nodeResults("end").head.variables testResultVars("extractedTimestamp").hcursor.downField("pretty").as[Long].rightValue shouldBe expectedTimestamp @@ -143,7 +163,7 @@ class TestWithTestDataSpec ) val scenarioTestData = ScenarioTestData("start", parameterExpressions) - val results = run(process, scenarioTestData) + val results = testRunner.runTests(process, scenarioTestData) results .invocationResults("end") .head @@ -165,7 +185,7 @@ class TestWithTestDataSpec ParameterName("in") -> Expression.spel("'some-text-id'") ) val scenarioTestData = ScenarioTestData("fragment1", parameterExpressions) - val results = run(fragment, scenarioTestData) + val results = testRunner.runTests(fragment, scenarioTestData) results.nodeResults("fragment1").loneElement shouldBe ResultContext( "fragment1-fragment1-0-0", @@ -197,20 +217,9 @@ class TestWithTestDataSpec schemaRegistryMockClient.register(subject, parsedSchema) } - private def run(process: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = { - ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) { - FlinkTestMain.run( - LocalModelData(config, List.empty, configCreator = creator), - process, - scenarioTestData, - FlinkTestConfiguration.configuration(), - ) - } - } - } -object TestWithTestDataSpec { +object SchemedKafkaScenarioTestingSpec { private val sinkForInputMetaResultsHolder = new TestResultsHolder[java.util.Map[String @unchecked, _]] diff --git a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala index c59145cb0ad..bdf1386044c 100644 --- a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala +++ b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala @@ -278,7 +278,10 @@ class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesD } test("should figure out result type when dynamically indexing record") { - evaluate[Int]("{a: {g: 5, h: 10}, b: {g: 50, h: 100}}[#input.toString()].h", Context("abc").withVariable("input", "b")) shouldBe 100 + evaluate[Int]( + "{a: {g: 5, h: 10}, b: {g: 50, h: 100}}[#input.toString()].h", + Context("abc").withVariable("input", "b") + ) shouldBe 100 } test("parsing first selection on array") { diff --git a/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/StaticMethodRunner.scala b/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala similarity index 73% rename from utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/StaticMethodRunner.scala rename to utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala index 5311746a49d..0e013712222 100644 --- a/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/StaticMethodRunner.scala +++ b/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/ReflectiveMethodInvoker.scala @@ -2,7 +2,7 @@ package pl.touk.nussknacker.engine.util import java.lang.reflect.InvocationTargetException -abstract class StaticMethodRunner(classLoader: ClassLoader, className: String, methodName: String) { +final class ReflectiveMethodInvoker[Result](classLoader: ClassLoader, className: String, methodName: String) { import scala.reflect.runtime.{universe => ru} @@ -17,9 +17,9 @@ abstract class StaticMethodRunner(classLoader: ClassLoader, className: String, m } // we have to use context loader, as in UI we have don't have e.g. nussknacker-process or user model on classpath... - def tryToInvoke(args: Any*): Any = ThreadUtils.withThisAsContextClassLoader(classLoader) { + def invokeStaticMethod(args: Any*): Result = ThreadUtils.withThisAsContextClassLoader(classLoader) { try { - invoker(args: _*) + invoker(args: _*).asInstanceOf[Result] } catch { case e: InvocationTargetException => throw e.getTargetException } diff --git a/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/loader/ModelClassLoader.scala b/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/loader/ModelClassLoader.scala index 470d9190cac..9f8902685db 100644 --- a/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/loader/ModelClassLoader.scala +++ b/utils/utils-internal/src/main/scala/pl/touk/nussknacker/engine/util/loader/ModelClassLoader.scala @@ -6,7 +6,7 @@ import java.io.File import java.net.{URI, URL, URLClassLoader} import java.nio.file.Path -case class ModelClassLoader private (classLoader: ClassLoader, urls: List[URL]) { +case class ModelClassLoader(classLoader: ClassLoader, urls: List[URL]) { override def toString: String = s"ModelClassLoader(${toString(classLoader)})"