Skip to content

Commit

Permalink
FlinkDeploymentManager: close test runner + test runner reused in som…
Browse files Browse the repository at this point in the history
…e tests
  • Loading branch information
arkadius committed Jan 20, 2025
1 parent e5d1e00 commit 23cfd43
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ abstract class FlinkDeploymentManager(

override def processStateDefinitionManager: ProcessStateDefinitionManager = FlinkProcessStateDefinitionManager

override def close(): Unit = {
super.close()
testRunner.close()
}

}

object FlinkDeploymentManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker

import scala.concurrent.{ExecutionContext, Future}

class FlinkProcessTestRunner(modelData: ModelData, parallelism: Int, streamExecutionConfig: Configuration) {
class FlinkProcessTestRunner(modelData: ModelData, parallelism: Int, streamExecutionConfig: Configuration)
extends AutoCloseable {

private val streamExecutionEnvironment =
TestsMechanismStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment(parallelism, streamExecutionConfig)
Expand Down Expand Up @@ -42,4 +43,9 @@ class FlinkProcessTestRunner(modelData: ModelData, parallelism: Int, streamExecu
scenarioTestData
)

def close(): Unit = {
miniCluster.close()
streamExecutionEnvironment.close()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@ import org.apache.flink.runtime.client.JobExecutionException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.{BeforeAndAfterEach, Inside, OptionValues}
import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId, ParameterAdditionalUIConfig}
import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ParameterValueCompileTimeValidation, ValueInputWithDictEditor}
import pl.touk.nussknacker.engine.api.component.{
ComponentAdditionalConfig,
DesignerWideComponentId,
ParameterAdditionalUIConfig
}
import pl.touk.nussknacker.engine.api.parameter.{
ParameterName,
ParameterValueCompileTimeValidation,
ValueInputWithDictEditor
}
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord}
import pl.touk.nussknacker.engine.api.{DisplayJsonWithEncoder, FragmentSpecificData, MetaData, StreamMetaData}
Expand All @@ -17,11 +25,18 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.canonicalgraph.canonicalnode.FlatNode
import pl.touk.nussknacker.engine.compile.FragmentResolver
import pl.touk.nussknacker.engine.deployment.AdditionalModelConfigs
import pl.touk.nussknacker.engine.flink.test.{FlinkTestConfiguration, RecordingExceptionConsumer, RecordingExceptionConsumerProvider}
import pl.touk.nussknacker.engine.flink.test.{
FlinkTestConfiguration,
RecordingExceptionConsumer,
RecordingExceptionConsumerProvider
}
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter}
import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition}
import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunnerSpec.{fragmentWithValidationName, processWithFragmentParameterValidation}
import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunnerSpec.{
fragmentWithValidationName,
processWithFragmentParameterValidation
}
import pl.touk.nussknacker.engine.process.helpers.SampleNodes._
import pl.touk.nussknacker.engine.testmode.TestProcess._
import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ThreadUtils}
Expand Down Expand Up @@ -762,15 +777,13 @@ class FlinkProcessTestRunnerSpec
ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround),
resolveConfigs = false
)
ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) {
// TODO: reuse this instance between all test cases
val parallelism = MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
new FlinkProcessTestRunner(modelData, parallelism, FlinkTestConfiguration.setupMemory(new Configuration))
.runTests(process, scenarioTestData)
}
// TODO: reuse this instance between all test cases
val parallelism = MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
new FlinkProcessTestRunner(modelData, parallelism, FlinkTestConfiguration.setupMemory(new Configuration))
.runTests(process, scenarioTestData)
}

private def nodeResult(count: Int, vars: (String, Any)*): ResultContext[_] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import io.circe.Json
import io.circe.Json.{Null, fromString, obj}
import org.apache.flink.configuration.Configuration
import org.apache.kafka.common.record.TimestampType
import org.scalatest.OptionValues
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, OptionValues}
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord}
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration
import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory.SingleValueParamName
import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName
Expand All @@ -23,8 +22,6 @@ import pl.touk.nussknacker.engine.kafka.source.flink.KafkaSourceFactoryProcessCo
import pl.touk.nussknacker.engine.management.testsmechanism.FlinkProcessTestRunner
import pl.touk.nussknacker.engine.spel.SpelExtension._
import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.engine.util.ThreadUtils
import pl.touk.nussknacker.engine.util.json.ToJsonEncoder
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader
import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, KafkaConfigProperties}
Expand All @@ -36,9 +33,10 @@ class TestFromFileSpec
with Matchers
with LazyLogging
with EitherValuesDetailedMessage
with OptionValues {
with OptionValues
with BeforeAndAfterAll {

private lazy val config = ConfigFactory
private val config = ConfigFactory
.empty()
.withValue(KafkaConfigProperties.bootstrapServersProperty(), fromAnyRef("kafka_should_not_be_used:9092"))
.withValue(
Expand All @@ -47,14 +45,22 @@ class TestFromFileSpec
)
.withValue("kafka.topicsExistenceValidationConfig.enabled", fromAnyRef(false))

protected lazy val modelData: ModelData =
private val modelData: ModelData =
LocalModelData(
inputConfig = config,
components = List.empty,
configCreator = new KafkaSourceFactoryProcessConfigCreator(() => TestFromFileSpec.resultsHolders),
modelClassLoader = new ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround)
)

private val testRunner =
new FlinkProcessTestRunner(modelData, parallelism = 1, FlinkTestConfiguration.setupMemory(new Configuration))

override protected def afterAll(): Unit = {
super.afterAll()
testRunner.close()
}

test("Should pass correct timestamp from test data") {
val topic = "simple"
val expectedTimestamp = System.currentTimeMillis()
Expand Down Expand Up @@ -97,7 +103,7 @@ class TestFromFileSpec
_.add("value", obj("id" -> fromString("fooId"), "field" -> fromString("fooField")))
)

val results = run(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil))
val results = testRunner.runTests(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil))

val testResultVars = results.nodeResults("end").head.variables
testResultVars("extractedTimestamp").hcursor.downField("pretty").as[Long].rightValue shouldBe expectedTimestamp
Expand Down Expand Up @@ -126,19 +132,11 @@ class TestFromFileSpec
.add("value", obj("id" -> fromString("1234"), "field" -> fromString("abcd")))
)

val results = run(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil))
val results = testRunner.runTests(process, ScenarioTestData(ScenarioTestJsonRecord("start", consumerRecord) :: Nil))

results.nodeResults shouldBe Symbol("nonEmpty")
}

private def run(process: CanonicalProcess, scenarioTestData: ScenarioTestData): TestResults[Json] = {
ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) {
// TODO: reuse this instance between all test cases
new FlinkProcessTestRunner(modelData, parallelism = 1, FlinkTestConfiguration.setupMemory(new Configuration))
.runTests(process, scenarioTestData)
}
}

}

object TestFromFileSpec extends Serializable {
Expand Down

0 comments on commit 23cfd43

Please sign in to comment.