Skip to content

Commit

Permalink
[NU-1962] always using model classpath
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 15, 2025
1 parent 3a07ecf commit ff55969
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -1,33 +1,20 @@
package pl.touk.nussknacker.engine.process.runner

import org.apache.flink.configuration.{
ConfigUtils,
Configuration,
CoreOptions,
PipelineOptions,
RestOptions,
TaskManagerOptions
}
import org.apache.flink.configuration._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.util.MetaDataExtractor
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.util.loader.ModelClassLoader

import java.net.{MalformedURLException, URL}
import scala.jdk.CollectionConverters._
import scala.util.Using

class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, configuration: Configuration) {
class FlinkStubbedRunner(modelClassLoader: ModelClassLoader, configuration: Configuration) {

def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(
MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1),
def createEnv(parallelism: Int): StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(
parallelism,
configuration
)

Expand All @@ -46,17 +33,19 @@ class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, config
def execute[T](
miniCluster: MiniCluster,
env: StreamExecutionEnvironment,
scenarioName: ProcessName,
savepointRestoreSettings: SavepointRestoreSettings
): Unit = {
// Checkpoints are disabled to prevent waiting for checkpoint to happen
// before finishing execution.
env.getCheckpointConfig.disableCheckpointing()

val streamGraph = env.getStreamGraph
streamGraph.setJobName(process.name.value)
streamGraph.setJobName(scenarioName.value)

val jobGraph = streamGraph.getJobGraph()
jobGraph.setClasspaths(classpathsFromModelWithFallbackToConfiguration)
// FIXME abr: Is it fine?
jobGraph.setClasspaths(modelClassLoader.urls.asJava)
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings)

val configuration: Configuration = new Configuration
Expand All @@ -78,7 +67,7 @@ class FlinkStubbedRunner(modelData: ModelData, process: CanonicalProcess, config
// The class is also used in some scala tests
// and this fallback is to work with a work around for a behaviour added in https://issues.apache.org/jira/browse/FLINK-32265
// see details in pl.touk.nussknacker.engine.flink.test.MiniClusterExecutionEnvironment#execute
modelData.modelClassLoaderUrls match {
modelClassLoader.urls match {
case Nil =>
ConfigUtils.decodeListFromConfig[String, URL, MalformedURLException](
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{AdditionalModelConfigs, DeploymentData}
import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompilerDataFactory
Expand All @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.testmode.{
ResultsCollectingListenerHolder,
TestServiceInvocationCollector
}
import pl.touk.nussknacker.engine.util.MetaDataExtractor

import scala.util.Using

Expand Down Expand Up @@ -55,18 +56,22 @@ class FlinkTestMain(
val configuration: Configuration
) {

private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration)
private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration)

def runTest: TestResults[Json] = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
try {
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar(collectingListener, scenarioTestData)
val env = stubbedRunner.createEnv
val parallelism = MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
val env = stubbedRunner.createEnv(parallelism)

registrar.register(env, process, processVersion, deploymentData, resultCollector)
Using.resource(stubbedRunner.createMiniCluster(env.getParallelism)) { miniCluster =>
stubbedRunner.execute(miniCluster, env, SavepointRestoreSettings.none())
Using.resource(stubbedRunner.createMiniCluster(parallelism)) { miniCluster =>
stubbedRunner.execute(miniCluster, env, process.name, SavepointRestoreSettings.none())
collectingListener.results
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package pl.touk.nussknacker.engine.process.runner
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompilerDataFactory
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig}
import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestServiceInvocationCollector}
import pl.touk.nussknacker.engine.util.MetaDataExtractor

import scala.util.Using

Expand Down Expand Up @@ -37,17 +38,21 @@ class FlinkVerificationMain(
val configuration: Configuration
) {

private val stubbedRunner = new FlinkStubbedRunner(modelData, process, configuration)
private val stubbedRunner = new FlinkStubbedRunner(modelData.modelClassLoader, configuration)

def runTest(): Unit = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar()
val env = stubbedRunner.createEnv
val parallelism = MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
val env = stubbedRunner.createEnv(parallelism)

registrar.register(env, process, processVersion, deploymentData, resultCollector)
Using.resource(stubbedRunner.createMiniCluster(env.getParallelism)) { miniCluster =>
stubbedRunner.execute(miniCluster, env, SavepointRestoreSettings.forPath(savepointPath, true))
Using.resource(stubbedRunner.createMiniCluster(parallelism)) { miniCluster =>
stubbedRunner.execute(miniCluster, env, process.name, SavepointRestoreSettings.forPath(savepointPath, true))
}
}

Expand Down

0 comments on commit ff55969

Please sign in to comment.