From 68eb84cf0105ec4fb624387a1ca4340d8508c3e0 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Mon, 20 Jan 2025 16:07:01 +0100 Subject: [PATCH] Closing FlinkDeploymentManager in more tests --- .../api/description/TestingApiEndpoints.scala | 19 ++++--- .../periodic/flink/FlinkJarManager.scala | 2 +- ...amingDeploymentManagerProviderHelper.scala | 13 ++--- .../FlinkStreamingProcessTestRunnerSpec.scala | 50 +++++++++++-------- .../streaming/StreamingDockerTest.scala | 1 + 5 files changed, 43 insertions(+), 42 deletions(-) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/TestingApiEndpoints.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/TestingApiEndpoints.scala index b04ef1f1712..51536d7edc8 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/TestingApiEndpoints.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/TestingApiEndpoints.scala @@ -1,34 +1,33 @@ package pl.touk.nussknacker.ui.api.description +import io.circe.Encoder import pl.touk.nussknacker.engine.api.StreamMetaData import pl.touk.nussknacker.engine.api.definition.Parameter import pl.touk.nussknacker.engine.api.graph.{ProcessProperties, ScenarioGraph} import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process.ProcessName -import pl.touk.nussknacker.engine.api.typed.typing.Typed +import pl.touk.nussknacker.engine.api.typed.typing._ import pl.touk.nussknacker.engine.definition.test.TestingCapabilities import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions.SecuredEndpoint -import pl.touk.nussknacker.restmodel.definition.{UIParameter, UISourceParameters} +import pl.touk.nussknacker.restmodel.definition.UISourceParameters import pl.touk.nussknacker.restmodel.validation.ValidationResults.{NodeValidationError, NodeValidationErrorType} -import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._ import pl.touk.nussknacker.security.AuthCredentials import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioGraphCodec._ +import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._ import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioTestingCodecs._ -import pl.touk.nussknacker.ui.definition.DefinitionsService -import sttp.model.StatusCode.Ok -import sttp.tapir.EndpointIO.Example -import sttp.tapir._ -import sttp.tapir.json.circe.jsonBody -import io.circe.Encoder -import pl.touk.nussknacker.engine.api.typed.typing._ import pl.touk.nussknacker.ui.api.TestingApiHttpService.Examples.{ malformedTypingResultExample, noScenarioExample, testDataGenerationErrorExample } import pl.touk.nussknacker.ui.api.TestingApiHttpService.TestingError +import pl.touk.nussknacker.ui.definition.DefinitionsService +import sttp.model.StatusCode.Ok +import sttp.tapir.EndpointIO.Example +import sttp.tapir._ +import sttp.tapir.json.circe.jsonBody class TestingApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseEndpointDefinitions { import NodesApiEndpoints.Dtos._ diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala index 47b33e7bc93..1b398dea598 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala @@ -2,7 +2,6 @@ package pl.touk.nussknacker.engine.management.periodic.flink import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.JobID -import pl.touk.nussknacker.engine.{BaseModelData, newdeployment} import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId} @@ -16,6 +15,7 @@ import pl.touk.nussknacker.engine.management.{ FlinkStreamingRestManager } import pl.touk.nussknacker.engine.modelconfig.InputConfigDuringExecution +import pl.touk.nussknacker.engine.{BaseModelData, newdeployment} import sttp.client3.SttpBackend import java.nio.file.{Files, Path, Paths} diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala index 931b35d5794..81578b4fdd4 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerProviderHelper.scala @@ -1,25 +1,18 @@ package pl.touk.nussknacker.engine.management.streaming +import _root_.sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend import akka.actor.ActorSystem import org.asynchttpclient.DefaultAsyncHttpClientConfig +import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.component.DesignerWideComponentId import pl.touk.nussknacker.engine.api.deployment.{ DeploymentManager, NoOpScenarioActivityManager, ProcessingTypeActionServiceStub, - ProcessingTypeDeployedScenariosProviderStub, - ScenarioActivityManager + ProcessingTypeDeployedScenariosProviderStub } import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode import pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider -import pl.touk.nussknacker.engine.{ - ConfigWithUnresolvedVersion, - DeploymentManagerDependencies, - ModelData, - ModelDependencies, - ProcessingTypeConfig -} -import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend object FlinkStreamingDeploymentManagerProviderHelper { diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala index 560206a719f..adf823079f6 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala @@ -17,6 +17,7 @@ import pl.touk.nussknacker.test.{KafkaConfigProperties, VeryPatientScalaFutures, import java.util.UUID import scala.concurrent.Await import scala.jdk.CollectionConverters._ +import scala.util.Using class FlinkStreamingProcessTestRunnerSpec extends AnyFlatSpec @@ -45,20 +46,26 @@ class FlinkStreamingProcessTestRunnerSpec ) it should "run scenario in test mode" in { - val deploymentManager = + Using.resource( FlinkStreamingDeploymentManagerProviderHelper.createDeploymentManager(ConfigWithUnresolvedVersion(config)) - - val processName = ProcessName(UUID.randomUUID().toString) - val processVersion = ProcessVersion.empty.copy(processName = processName) - - val process = SampleProcess.prepareProcess(processName) - - whenReady(deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData))) { r => - r.nodeResults shouldBe Map( - "startProcess" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))), - "nightFilter" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))), - "endSend" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))) - ) + ) { deploymentManager => + val processName = ProcessName(UUID.randomUUID().toString) + val processVersion = ProcessVersion.empty.copy(processName = processName) + + val process = SampleProcess.prepareProcess(processName) + + whenReady(deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData))) { + r => + r.nodeResults shouldBe Map( + "startProcess" -> List( + ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere"))) + ), + "nightFilter" -> List( + ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere"))) + ), + "endSend" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))) + ) + } } } @@ -71,16 +78,17 @@ class FlinkStreamingProcessTestRunnerSpec .source("startProcess", "kafka-transaction") .emptySink("endSend", "sendSmsNotExist") - val deploymentManager = + Using.resource( FlinkStreamingDeploymentManagerProviderHelper.createDeploymentManager(ConfigWithUnresolvedVersion(config)) - - val caught = intercept[IllegalArgumentException] { - Await.result( - deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData)), - patienceConfig.timeout - ) + ) { deploymentManager => + val caught = intercept[IllegalArgumentException] { + Await.result( + deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData)), + patienceConfig.timeout + ) + } + caught.getMessage shouldBe "Compilation errors: MissingSinkFactory(sendSmsNotExist,endSend)" } - caught.getMessage shouldBe "Compilation errors: MissingSinkFactory(sendSmsNotExist,endSend)" } private def variable(value: String): Json = diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala index 176a27d84da..7418573086f 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala @@ -30,6 +30,7 @@ trait StreamingDockerTest extends DockerTest with BeforeAndAfterAll with Matcher override def afterAll(): Unit = { kafkaClient.shutdown() logger.info("Kafka client closed") + deploymentManager.close() super.afterAll() }