Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1962] Flink test mechanism: caching flink mini cluster #7458

Draft
wants to merge 20 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
98ace54
[NU-1962] Flink test mechanism refactoring: inheritance replaced with…
arkadius Jan 16, 2025
107d3a2
[NU-1962] Flink test mechanism refactoring: less parameters passing
arkadius Jan 16, 2025
6504481
[NU-1962] Flink test mechanism refactoring: execute method splitted i…
arkadius Jan 16, 2025
e2cb08a
[NU-1962] Flink test mechanism refactoring: passing the same jobgraph…
arkadius Jan 16, 2025
42e2698
[NU-1962] Flink test mechanism refactoring: temporary removed fallbac…
arkadius Jan 16, 2025
dc81696
[NU-1962] Flink test mechanism refactoring: more cleanups
arkadius Jan 16, 2025
9e14ab9
[NU-1962] Flink test mechanism refactoring: temporary removed passing…
arkadius Jan 16, 2025
726c789
[NU-1962] Flink test mechanism refactoring: classpath trick moved fro…
arkadius Jan 16, 2025
7d7836c
tests fix attempt
arkadius Jan 17, 2025
3ac7e6e
referted classpath fix in test classes
arkadius Jan 17, 2025
a2f1899
wip
arkadius Jan 17, 2025
8118f73
StreamExecutionEnvironment and MiniCluster created once
arkadius Jan 17, 2025
e5d1e00
BatchDataGenerationSpec: added timestamp suffix for easier tests retry
arkadius Jan 20, 2025
22f432f
FlinkDeploymentManager: close test runner + test runner reused in som…
arkadius Jan 20, 2025
68eb84c
Closing FlinkDeploymentManager in more tests
arkadius Jan 20, 2025
b14fbd2
FlinkProcessTestRunnerSpec: reusing test runner
arkadius Jan 20, 2025
e63a246
Merge remote-tracking branch 'origin/staging' into flink-test-mechani…
arkadius Jan 20, 2025
09a5efb
Changed docs next to method invoker
arkadius Jan 21, 2025
6325dc4
removed SimpleProcessConfigCreator to allow run tests from IDE
arkadius Jan 21, 2025
cd419c6
Test for multiple tests run + refactor
arkadius Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,9 @@ lazy val flinkDeploymentManager = (project in flink("management"))
componentsApi % Provided,
httpUtils % Provided,
flinkScalaUtils % Provided,
flinkTestUtils % IntegrationTest,
// test->test dependency is needed to load SimpleProcessConfigCreator
flinkExecutor % "test,test->test",
flinkTestUtils % "it,test",
kafkaTestUtils % "it,test"
)

Expand Down Expand Up @@ -710,18 +712,22 @@ lazy val flinkTests = (project in flink("tests"))
}
)
.dependsOn(
defaultModel % Test,
flinkExecutor % Test,
flinkKafkaComponents % Test,
flinkBaseComponents % Test,
flinkBaseUnboundedComponents % Test,
flinkTableApiComponents % Test,
flinkTestUtils % Test,
kafkaTestUtils % Test,
flinkComponentsTestkit % Test,
defaultModel % Test,
flinkExecutor % Test,
flinkKafkaComponents % Test,
flinkBaseComponents % Test,
flinkBaseUnboundedComponents % Test,
flinkTableApiComponents % Test,
flinkTestUtils % Test,
kafkaTestUtils % Test,
flinkComponentsTestkit % Test,
flinkDeploymentManager % Test,
// test->test dependencies are needed to load components from these modules
flinkKafkaComponentsUtils % "test->test",
flinkSchemedKafkaComponentsUtils % "test->test",
// for local development
designer % Test,
deploymentManagerApi % Test
designer % Test,
deploymentManagerApi % Test
)

lazy val defaultModel = (project in (file("defaultModel")))
Expand Down Expand Up @@ -965,7 +971,7 @@ lazy val flinkSchemedKafkaComponentsUtils = (project in flink("schemed-kafka-com
componentsUtils % Provided,
kafkaTestUtils % Test,
flinkTestUtils % Test,
flinkExecutor % Test
flinkExecutor % Test,
)

lazy val flinkKafkaComponentsUtils = (project in flink("kafka-components-utils"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,33 @@
package pl.touk.nussknacker.ui.api.description

import io.circe.Encoder
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.api.definition.Parameter
import pl.touk.nussknacker.engine.api.graph.{ProcessProperties, ScenarioGraph}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.api.typed.typing._
import pl.touk.nussknacker.engine.definition.test.TestingCapabilities
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions
import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions.SecuredEndpoint
import pl.touk.nussknacker.restmodel.definition.{UIParameter, UISourceParameters}
import pl.touk.nussknacker.restmodel.definition.UISourceParameters
import pl.touk.nussknacker.restmodel.validation.ValidationResults.{NodeValidationError, NodeValidationErrorType}
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._
import pl.touk.nussknacker.security.AuthCredentials
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioGraphCodec._
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioTestingCodecs._
import pl.touk.nussknacker.ui.definition.DefinitionsService
import sttp.model.StatusCode.Ok
import sttp.tapir.EndpointIO.Example
import sttp.tapir._
import sttp.tapir.json.circe.jsonBody
import io.circe.Encoder
import pl.touk.nussknacker.engine.api.typed.typing._
import pl.touk.nussknacker.ui.api.TestingApiHttpService.Examples.{
malformedTypingResultExample,
noScenarioExample,
testDataGenerationErrorExample
}
import pl.touk.nussknacker.ui.api.TestingApiHttpService.TestingError
import pl.touk.nussknacker.ui.definition.DefinitionsService
import sttp.model.StatusCode.Ok
import sttp.tapir.EndpointIO.Example
import sttp.tapir._
import sttp.tapir.json.circe.jsonBody

class TestingApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseEndpointDefinitions {
import NodesApiEndpoints.Dtos._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package pl.touk.nussknacker.ui.process.periodic.legacy.db

import pl.touk.nussknacker.ui.process.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus
import pl.touk.nussknacker.ui.process.periodic.model.{PeriodicProcessDeploymentId, PeriodicProcessDeploymentStatus, PeriodicProcessId}
import pl.touk.nussknacker.ui.process.periodic.model.{
PeriodicProcessDeploymentId,
PeriodicProcessDeploymentStatus,
PeriodicProcessId
}
import slick.jdbc.{JdbcProfile, JdbcType}
import slick.lifted.ProvenShape
import slick.sql.SqlProfile.ColumnOption.NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ class BatchDataGenerationSpec

private val designerServiceUrl = "http://localhost:8080"

private val liveDataGenScenarioName = "SumTransactions-LiveData"
private val randomDataGenScenarioName = "SumTransactions-RandomData"
private val timeBasedRandomSuffix = System.currentTimeMillis()
private val liveDataGenScenarioName = s"SumTransactions-LiveData-$timeBasedRandomSuffix"
private val randomDataGenScenarioName = s"SumTransactions-RandomData-$timeBasedRandomSuffix"

override def beforeAll(): Unit = {
createEmptyBatchScenario(liveDataGenScenarioName, "Default")
Expand Down Expand Up @@ -106,7 +107,7 @@ class BatchDataGenerationSpec
| "nodeResults": {
| "sourceId": [
| {
| "id": "SumTransactions-LiveData-sourceId-0-0",
| "id": "$liveDataGenScenarioName-sourceId-0-0",
| "variables": {
| "input": {
| "pretty": {
Expand All @@ -122,7 +123,7 @@ class BatchDataGenerationSpec
| ],
| "end": [
| {
| "id": "SumTransactions-LiveData-sourceId-0-0",
| "id": "$liveDataGenScenarioName-sourceId-0-0",
| "variables": {
| "input": {
| "pretty": {
Expand Down Expand Up @@ -183,7 +184,7 @@ class BatchDataGenerationSpec
| "nodeResults": {
| "sourceId": [
| {
| "id": "SumTransactions-LiveData-sourceId-0-0",
| "id": "$liveDataGenScenarioName-sourceId-0-0",
| "variables": {
| "input": {
| "pretty": {
Expand All @@ -199,7 +200,7 @@ class BatchDataGenerationSpec
| ],
| "end": [
| {
| "id": "SumTransactions-LiveData-sourceId-0-0",
| "id": "$liveDataGenScenarioName-sourceId-0-0",
| "variables": {
| "input": {
| "pretty": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.ActorSystem
import cats.data.{Validated, ValidatedNel}
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.development.manager.DevelopmentStateStatus._
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.ProcessVersion
Expand All @@ -19,7 +20,8 @@ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefin
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.management.{FlinkProcessTestRunner, FlinkStreamingPropertiesConfig}
import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig
import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner

import java.net.URI
import java.util.UUID
Expand Down Expand Up @@ -48,7 +50,8 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode
private val memory: TrieMap[ProcessName, StatusDetails] = TrieMap[ProcessName, StatusDetails]()
private val random = new scala.util.Random()

private lazy val flinkTestRunner = new FlinkProcessTestRunner(modelData.asInvokableModelData)
private lazy val flinkTestRunner =
new FlinkProcessTestRunner(modelData.asInvokableModelData, parallelism = 1, new Configuration())

implicit private class ProcessStateExpandable(processState: StatusDetails) {

Expand Down Expand Up @@ -83,7 +86,10 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode
case command: DMRunOffScheduleCommand => runOffSchedule(command)
case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult(""))
case DMTestScenarioCommand(_, canonicalProcess, scenarioTestData) =>
flinkTestRunner.test(canonicalProcess, scenarioTestData) // it's just for streaming e2e tests from file purposes
flinkTestRunner.runTestsAsync(
canonicalProcess,
scenarioTestData
) // it's just for streaming e2e tests from file purposes
}

private def description(canonicalProcess: CanonicalProcess) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import cats.data.Validated.valid
import cats.data.ValidatedNel
import com.typesafe.config.Config
import io.circe.Json
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager
import pl.touk.nussknacker.engine.ModelData.BaseModelDataExt
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
import pl.touk.nussknacker.engine.api.definition.{NotBlankParameterValidator, StringParameterEditor}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus}
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId
import pl.touk.nussknacker.engine.management.{FlinkProcessTestRunner, FlinkStreamingPropertiesConfig}
import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig
import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner
import pl.touk.nussknacker.engine.newdeployment.DeploymentId
import pl.touk.nussknacker.engine.testing.StubbingCommands
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
Expand Down Expand Up @@ -59,7 +60,9 @@ object MockableDeploymentManagerProvider {
with StubbingCommands {

private lazy val testRunnerOpt =
modelDataOpt.map(modelData => new FlinkProcessTestRunner(modelData.asInvokableModelData))
modelDataOpt.map(modelData =>
new FlinkProcessTestRunner(modelData.asInvokableModelData, parallelism = 1, new Configuration())
)

override def resolve(
idWithName: ProcessIdWithName,
Expand Down Expand Up @@ -102,7 +105,7 @@ object MockableDeploymentManagerProvider {
.get()
.get(processVersion.processName.value)
.map(Future.successful)
.orElse(testRunnerOpt.map(_.test(scenario, testData)))
.orElse(testRunnerOpt.map(_.runTestsAsync(scenario, testData)))
.getOrElse(
throw new IllegalArgumentException(
s"Tests results not mocked for scenario [${processVersion.processName.value}] and no model data provided"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package pl.touk.nussknacker.engine.process.runner

import java.io.File
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.ExecutionConfig
import pl.touk.nussknacker.engine.{ModelConfigs, ModelData}
import pl.touk.nussknacker.engine.api.{CirceUtil, ProcessVersion}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.marshall.ScenarioParser
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer
import pl.touk.nussknacker.engine.{ModelConfigs, ModelData}

import java.io.File
import java.nio.charset.StandardCharsets
import scala.util.Using
import scala.util.control.NonFatal

trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging {
trait FlinkProcessMain[Env] extends LazyLogging {

def main(argsWithHack: Array[String]): Unit = {
try {
Expand Down Expand Up @@ -61,6 +64,15 @@ trait FlinkProcessMain[Env] extends FlinkRunner with LazyLogging {
prepareExecutionConfig: ExecutionConfigPreparer
): Unit

protected def readProcessFromArg(arg: String): CanonicalProcess = {
val canonicalJson = if (arg.startsWith("@")) {
Using.resource(scala.io.Source.fromFile(arg.substring(1), StandardCharsets.UTF_8.name()))(_.mkString)
} else {
arg
}
ScenarioParser.parseUnsafe(canonicalJson)
}

private def parseProcessVersion(json: String): ProcessVersion =
CirceUtil.decodeJsonUnsafe[ProcessVersion](json, "invalid scenario version")

Expand Down

This file was deleted.

This file was deleted.

Loading
Loading