Skip to content

Commit

Permalink
Closing FlinkDeploymentManager in more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 20, 2025
1 parent 22f432f commit 68eb84c
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,34 +1,33 @@
package pl.touk.nussknacker.ui.api.description

import io.circe.Encoder
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.api.definition.Parameter
import pl.touk.nussknacker.engine.api.graph.{ProcessProperties, ScenarioGraph}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.api.typed.typing._
import pl.touk.nussknacker.engine.definition.test.TestingCapabilities
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions
import pl.touk.nussknacker.restmodel.BaseEndpointDefinitions.SecuredEndpoint
import pl.touk.nussknacker.restmodel.definition.{UIParameter, UISourceParameters}
import pl.touk.nussknacker.restmodel.definition.UISourceParameters
import pl.touk.nussknacker.restmodel.validation.ValidationResults.{NodeValidationError, NodeValidationErrorType}
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._
import pl.touk.nussknacker.security.AuthCredentials
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioGraphCodec._
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioTestingCodecs._
import pl.touk.nussknacker.ui.definition.DefinitionsService
import sttp.model.StatusCode.Ok
import sttp.tapir.EndpointIO.Example
import sttp.tapir._
import sttp.tapir.json.circe.jsonBody
import io.circe.Encoder
import pl.touk.nussknacker.engine.api.typed.typing._
import pl.touk.nussknacker.ui.api.TestingApiHttpService.Examples.{
malformedTypingResultExample,
noScenarioExample,
testDataGenerationErrorExample
}
import pl.touk.nussknacker.ui.api.TestingApiHttpService.TestingError
import pl.touk.nussknacker.ui.definition.DefinitionsService
import sttp.model.StatusCode.Ok
import sttp.tapir.EndpointIO.Example
import sttp.tapir._
import sttp.tapir.json.circe.jsonBody

class TestingApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseEndpointDefinitions {
import NodesApiEndpoints.Dtos._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pl.touk.nussknacker.engine.management.periodic.flink

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.JobID
import pl.touk.nussknacker.engine.{BaseModelData, newdeployment}
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId}
Expand All @@ -16,6 +15,7 @@ import pl.touk.nussknacker.engine.management.{
FlinkStreamingRestManager
}
import pl.touk.nussknacker.engine.modelconfig.InputConfigDuringExecution
import pl.touk.nussknacker.engine.{BaseModelData, newdeployment}
import sttp.client3.SttpBackend

import java.nio.file.{Files, Path, Paths}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
package pl.touk.nussknacker.engine.management.streaming

import _root_.sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend
import akka.actor.ActorSystem
import org.asynchttpclient.DefaultAsyncHttpClientConfig
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.api.component.DesignerWideComponentId
import pl.touk.nussknacker.engine.api.deployment.{
DeploymentManager,
NoOpScenarioActivityManager,
ProcessingTypeActionServiceStub,
ProcessingTypeDeployedScenariosProviderStub,
ScenarioActivityManager
ProcessingTypeDeployedScenariosProviderStub
}
import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode
import pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider
import pl.touk.nussknacker.engine.{
ConfigWithUnresolvedVersion,
DeploymentManagerDependencies,
ModelData,
ModelDependencies,
ProcessingTypeConfig
}
import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend

object FlinkStreamingDeploymentManagerProviderHelper {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import pl.touk.nussknacker.test.{KafkaConfigProperties, VeryPatientScalaFutures,
import java.util.UUID
import scala.concurrent.Await
import scala.jdk.CollectionConverters._
import scala.util.Using

class FlinkStreamingProcessTestRunnerSpec
extends AnyFlatSpec
Expand Down Expand Up @@ -45,20 +46,26 @@ class FlinkStreamingProcessTestRunnerSpec
)

it should "run scenario in test mode" in {
val deploymentManager =
Using.resource(
FlinkStreamingDeploymentManagerProviderHelper.createDeploymentManager(ConfigWithUnresolvedVersion(config))

val processName = ProcessName(UUID.randomUUID().toString)
val processVersion = ProcessVersion.empty.copy(processName = processName)

val process = SampleProcess.prepareProcess(processName)

whenReady(deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData))) { r =>
r.nodeResults shouldBe Map(
"startProcess" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))),
"nightFilter" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))),
"endSend" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere"))))
)
) { deploymentManager =>
val processName = ProcessName(UUID.randomUUID().toString)
val processVersion = ProcessVersion.empty.copy(processName = processName)

val process = SampleProcess.prepareProcess(processName)

whenReady(deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData))) {
r =>
r.nodeResults shouldBe Map(
"startProcess" -> List(
ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))
),
"nightFilter" -> List(
ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere")))
),
"endSend" -> List(ResultContext(s"$processName-startProcess-0-0", Map("input" -> variable("terefere"))))
)
}
}
}

Expand All @@ -71,16 +78,17 @@ class FlinkStreamingProcessTestRunnerSpec
.source("startProcess", "kafka-transaction")
.emptySink("endSend", "sendSmsNotExist")

val deploymentManager =
Using.resource(
FlinkStreamingDeploymentManagerProviderHelper.createDeploymentManager(ConfigWithUnresolvedVersion(config))

val caught = intercept[IllegalArgumentException] {
Await.result(
deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData)),
patienceConfig.timeout
)
) { deploymentManager =>
val caught = intercept[IllegalArgumentException] {
Await.result(
deploymentManager.processCommand(DMTestScenarioCommand(processVersion, process, scenarioTestData)),
patienceConfig.timeout
)
}
caught.getMessage shouldBe "Compilation errors: MissingSinkFactory(sendSmsNotExist,endSend)"
}
caught.getMessage shouldBe "Compilation errors: MissingSinkFactory(sendSmsNotExist,endSend)"
}

private def variable(value: String): Json =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trait StreamingDockerTest extends DockerTest with BeforeAndAfterAll with Matcher
override def afterAll(): Unit = {
kafkaClient.shutdown()
logger.info("Kafka client closed")
deploymentManager.close()
super.afterAll()
}

Expand Down

0 comments on commit 68eb84c

Please sign in to comment.