Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 31, 2025
1 parent fef65f5 commit 8de8378
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package pl.touk.nussknacker.engine.flink.minicluster
import org.apache.flink.configuration.Configuration

final case class FlinkMiniClusterConfig(
// TODO: remove after fully migration, see LegacySingleUseMiniClusterFallbackHandler
// TODO: remove after fully migration, see LegacyFallbackToSingleUseMiniClusterHandler
reuseMiniClusterForScenarioTesting: Boolean = true,
reuseMiniClusterForScenarioStateVerification: Boolean = true,
config: Configuration = new Configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterWithServices
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.ScenarioParallelismOverride.Ops
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.legacysingleuseminicluster.LegacySingleUseMiniClusterFallbackHandler
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.legacysingleuseminicluster.LegacyFallbackToSingleUseMiniClusterHandler
import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker

import scala.util.control.NonFatal
Expand All @@ -28,15 +28,15 @@ class FlinkMiniClusterScenarioStateVerifier(
"run"
)

private val singleUseMiniClusterFallbackHandler =
new LegacySingleUseMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario state verification")
private val legacyFallbackToSingleUseMiniClusterHandler =
new LegacyFallbackToSingleUseMiniClusterHandler(modelData.modelClassLoader, "scenario state verification")

def verify(
processVersion: ProcessVersion,
scenario: CanonicalProcess,
savepointPath: String
): Try[Unit] = {
singleUseMiniClusterFallbackHandler.withSharedOrSingleUseCluster(sharedMiniClusterServicesOpt, scenario) {
legacyFallbackToSingleUseMiniClusterHandler.withSharedOrSingleUseCluster(sharedMiniClusterServicesOpt, scenario) {
miniClusterWithServices =>
val scenarioWithOverrodeParallelism = sharedMiniClusterServicesOpt
.map(_ => scenario.overrideParallelismIfNeeded(StateVerificationParallelism))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterWithServices
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.ScenarioParallelismOverride._
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.legacysingleuseminicluster.LegacySingleUseMiniClusterFallbackHandler
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.legacysingleuseminicluster.LegacyFallbackToSingleUseMiniClusterHandler
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker

Expand All @@ -29,29 +29,31 @@ class FlinkMiniClusterScenarioTestRunner(
"run"
)

private val singleUseMiniClusterFallbackHandler =
new LegacySingleUseMiniClusterFallbackHandler(modelData.modelClassLoader, "scenario testing")
private val legacyFallbackToSingleUseMiniClusterHandler =
new LegacyFallbackToSingleUseMiniClusterHandler(modelData.modelClassLoader, "scenario testing")

// NU-1455: We encode variable on the engine, because of classLoader's problems
def runTests(scenario: CanonicalProcess, scenarioTestData: ScenarioTestData)(
implicit ec: ExecutionContext
): Future[TestResults[Json]] = {
singleUseMiniClusterFallbackHandler.withSharedOrSingleUseClusterAsync(sharedMiniClusterServicesOpt, scenario) {
miniClusterWithServices =>
val scenarioWithOverrodeParallelism = sharedMiniClusterServicesOpt
.map(_ => scenario.overrideParallelismIfNeeded(ScenarioTestingParallelism))
.getOrElse(scenario)
val env = miniClusterWithServices.createStreamExecutionEnvironment(attached = true)
val resultFuture = jobInvoker.invokeStaticMethod(
modelData,
scenarioWithOverrodeParallelism,
scenarioTestData,
env
)
resultFuture.onComplete { _ =>
env.close()
}
resultFuture
legacyFallbackToSingleUseMiniClusterHandler.withSharedOrSingleUseClusterAsync(
sharedMiniClusterServicesOpt,
scenario
) { miniClusterWithServices =>
val scenarioWithOverrodeParallelism = sharedMiniClusterServicesOpt
.map(_ => scenario.overrideParallelismIfNeeded(ScenarioTestingParallelism))
.getOrElse(scenario)
val env = miniClusterWithServices.createStreamExecutionEnvironment(attached = true)
val resultFuture = jobInvoker.invokeStaticMethod(
modelData,
scenarioWithOverrodeParallelism,
scenarioTestData,
env
)
resultFuture.onComplete { _ =>
env.close()
}
resultFuture
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader

// This class handles a legacy way to create single use minicluster
// TODO: After we fully switch to shared mini cluster approach, it should be removed
class LegacySingleUseMiniClusterFallbackHandler(modelClassLoader: URLClassLoader, useCaseForDebug: String)
class LegacyFallbackToSingleUseMiniClusterHandler(modelClassLoader: URLClassLoader, useCaseForDebug: String)
extends LazyLogging {

def withSharedOrSingleUseClusterAsync[R](
Expand Down

0 comments on commit 8de8378

Please sign in to comment.