Skip to content

Commit

Permalink
[NU-1777] Separate classloader for deployment managers (#7335)
Browse files Browse the repository at this point in the history
  • Loading branch information
coutoPL authored Jan 21, 2025
1 parent d495a91 commit 942db11
Show file tree
Hide file tree
Showing 106 changed files with 1,304 additions and 711 deletions.
9 changes: 5 additions & 4 deletions .run/NussknackerApp.run.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="NussknackerApp" type="Application" factoryName="Application">
<option name="ALTERNATIVE_JRE_PATH" value="corretto-11" />
<option name="ALTERNATIVE_JRE_PATH" value="11" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<envs>
<env name="AUTHENTICATION_USERS_FILE" value="../../../nussknacker-dist/src/universal/conf/users.conf" />
Expand All @@ -9,15 +9,16 @@
<env name="FLINK_SHOULD_VERIFY_BEFORE_DEPLOY" value="false" />
<env name="GRAFANA_URL" value="http://localhost:8081/grafana" />
<env name="INFLUXDB_URL" value="http://localhost:3086" />
<env name="INPUT_CONFIG_RESOLVE_ENV_VARIABLES" value="false" />
<env name="KAFKA_ADDRESS" value="localhost:3032" />
<env name="MANAGERS_DIR" value="managers" />
<env name="NUSSKNACKER_LOG_LEVEL" value="DEBUG" />
<env name="OPENAPI_SERVICE_URL" value="http://localhost:5000" />
<env name="SCHEMA_REGISTRY_URL" value="http://localhost:3082" />
<env name="SQL_ENRICHER_URL" value="localhost:5432" />
<env name="TABLES_DEFINITION_FILE" value="../../../nussknacker-dist/src/universal/conf/dev-tables-definition.sql" />
<env name="USAGE_REPORTS_FINGERPRINT" value="development" />
<env name="USAGE_REPORTS_SOURCE" value="sources" />
<env name="INPUT_CONFIG_RESOLVE_ENV_VARIABLES" value="false" />
<env name="TABLES_DEFINITION_FILE" value="../../../nussknacker-dist/src/universal/conf/dev-tables-definition.sql" />
</envs>
<option name="INCLUDE_PROVIDED_SCOPE" value="true" />
<option name="MAIN_CLASS_NAME" value="pl.touk.nussknacker.ui.NussknackerApp" />
Expand All @@ -34,4 +35,4 @@
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
</component>
27 changes: 13 additions & 14 deletions .run/NussknackerRemoteDebug.run.xml
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
<component name="ProjectRunConfigurationManager">
<configuration name="NussknackerRemoteDebug" type="Remote">
<module name="nussknacker-designer" />
<option name="USE_SOCKET_TRANSPORT" value="true" />
<option name="SERVER_MODE" value="false" />
<option name="SHMEM_ADDRESS" />
<option name="HOST" value="localhost" />
<option name="PORT" value="5005" />
<option name="AUTO_RESTART" value="false" />
<RunnerSettings RunnerId="Debug">
<option name="DEBUG_PORT" value="5005" />
<option name="LOCAL" value="false" />
</RunnerSettings>
<method v="2" />
</configuration>
<configuration default="false" name="NussknackerRemoteDebug" type="Remote">
<option name="USE_SOCKET_TRANSPORT" value="true" />
<option name="SERVER_MODE" value="false" />
<option name="SHMEM_ADDRESS" />
<option name="HOST" value="localhost" />
<option name="PORT" value="5005" />
<option name="AUTO_RESTART" value="false" />
<RunnerSettings RunnerId="Debug">
<option name="DEBUG_PORT" value="5005" />
<option name="LOCAL" value="false" />
</RunnerSettings>
<method v="2" />
</configuration>
</component>
89 changes: 48 additions & 41 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ lazy val silencerV_2_12 = "1.6.0"
def propOrEnv(name: String, default: String): String = propOrEnv(name).getOrElse(default)
def propOrEnv(name: String): Option[String] = Option(System.getProperty(name)).orElse(sys.env.get(name))

//by default we include flink and scala, we want to be able to disable this behaviour for performance reasons
val includeFlinkAndScala = propOrEnv("includeFlinkAndScala", "true").toBoolean

val flinkScope = if (includeFlinkAndScala) "compile" else "provided"
val nexusUrlFromProps = propOrEnv("nexusUrl")
//TODO: this is pretty clunky, but works so far for our case...
val nexusHostFromProps = nexusUrlFromProps.map(_.replaceAll("http[s]?://", "").replaceAll("[:/].*", ""))
Expand Down Expand Up @@ -103,7 +99,7 @@ lazy val publishSettings = Seq(
)

def defaultMergeStrategy: String => MergeStrategy = {
// remove JPMS module descriptors (a proper soultion would be to merge them)
// remove JPMS module descriptors (a proper solution would be to merge them)
case PathList(ps @ _*) if ps.last == "module-info.class" => MergeStrategy.discard
// we override Spring's class and we want to keep only our implementation
case PathList(ps @ _*) if ps.last == "NumberUtils.class" => MergeStrategy.first
Expand All @@ -118,7 +114,8 @@ def designerMergeStrategy: String => MergeStrategy = {
// https://tapir.softwaremill.com/en/latest/docs/openapi.html#using-swaggerui-with-sbt-assembly
case PathList("META-INF", "maven", "org.webjars", "swagger-ui", "pom.properties") =>
MergeStrategy.singleOrError
case x => defaultMergeStrategy(x)
case x =>
defaultMergeStrategy(x)
}

val scalaTestReports = Tests.Argument(TestFrameworks.ScalaTest, "-u", "target/surefire-reports", "-oFGD")
Expand Down Expand Up @@ -438,7 +435,8 @@ def assemblySettings(
includeScala: Boolean,
filterProvidedDeps: Boolean = true
): List[Def.SettingsDefinition] = {
// This work around need to be optional because for designer module it causes excluding of scala lib (because we has there other work around for Idea classpath and provided deps)
// This work around need to be optional because for designer module it causes excluding of scala lib
// (because we have there other work around for Idea classpath and provided deps)
val filterProvidedDepsSettingOpt = if (filterProvidedDeps) {
Some(
// For some reason problem described in https://github.com/sbt/sbt-assembly/issues/295 appears, workaround also works...
Expand Down Expand Up @@ -470,7 +468,7 @@ lazy val modelArtifacts = taskKey[List[(File, String)]]("model artifacts")

lazy val devArtifacts = taskKey[List[(File, String)]]("dev artifacts")

lazy val managerArtifacts = taskKey[List[(File, String)]]("manager artifacts")
lazy val deploymentManagerArtifacts = taskKey[List[(File, String)]]("deployment manager artifacts")

def filterDevConfigArtifacts(files: Seq[(File, String)]) = {
val devConfigFiles = Set("dev-tables-definition.sql", "dev-application.conf", "dev-oauth2-users.conf")
Expand All @@ -482,7 +480,7 @@ lazy val distribution: Project = sbt
.settings(commonSettings)
.enablePlugins(JavaAgent, SbtNativePackager, JavaServerAppPackaging)
.settings(
managerArtifacts := {
deploymentManagerArtifacts := {
List(
(flinkDeploymentManager / assembly).value -> "managers/nussknacker-flink-manager.jar",
(liteK8sDeploymentManager / assembly).value -> "managers/lite-k8s-manager.jar",
Expand Down Expand Up @@ -520,7 +518,7 @@ lazy val distribution: Project = sbt
else filterDevConfigArtifacts((Universal / mappings).value)

universalMappingsWithDevConfigFilter ++
(managerArtifacts).value ++
(deploymentManagerArtifacts).value ++
(componentArtifacts).value ++
(if (addDevArtifacts)
Seq((developmentTestsDeploymentManager / assembly).value -> "managers/development-tests-manager.jar")
Expand Down Expand Up @@ -610,17 +608,17 @@ lazy val flinkDeploymentManager = (project in flink("management"))
libraryDependencies ++= {
Seq(
"org.typelevel" %% "cats-core" % catsV % Provided,
("org.apache.flink" % "flink-streaming-java" % flinkV % flinkScope)
("org.apache.flink" % "flink-streaming-java" % flinkV)
.excludeAll(
ExclusionRule("log4j", "log4j"),
ExclusionRule("org.slf4j", "slf4j-log4j12"),
ExclusionRule("com.esotericsoftware", "kryo-shaded"),
),
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV % flinkScope,
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV,
"com.softwaremill.retry" %% "retry" % retryV,
"org.wiremock" % "wiremock" % wireMockV % Test,
"org.scalatestplus" %% "mockito-5-10" % scalaTestPlusV % Test,
) ++ flinkLibScalaDeps(scalaVersion.value, Some(flinkScope))
) ++ flinkLibScalaDeps(scalaVersion.value)
},
// override scala-collection-compat from com.softwaremill.retry:retry
dependencyOverrides += "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV
Expand Down Expand Up @@ -1908,6 +1906,10 @@ lazy val deploymentManagerApi = (project in file("designer/deployment-manager-ap
)
.dependsOn(extensionsApi, testUtils % Test)

lazy val prepareDesignerTests = taskKey[Unit]("Prepare all necessary artifacts before running designer module tests")
lazy val prepareDesignerSlowTests =
taskKey[Unit]("Prepare all necessary artifacts before running designer module slow tests")

lazy val designer = (project in file("designer/server"))
.configs(SlowTests)
.enablePlugins(GenerateDesignerOpenApiPlugin)
Expand All @@ -1916,7 +1918,7 @@ lazy val designer = (project in file("designer/server"))
.settings(
assemblySettings(
"nussknacker-designer-assembly.jar",
includeScala = includeFlinkAndScala,
includeScala = true,
filterProvidedDeps = false
): _*
)
Expand All @@ -1938,24 +1940,36 @@ lazy val designer = (project in file("designer/server"))
CopyOptions.apply(overwrite = true, preserveLastModified = true, preserveExecutable = false)
)
},
ThisBuild / parallelExecution := false,
SlowTests / test := (SlowTests / test)
.dependsOn(
flinkDevModel / Compile / assembly,
flinkExecutor / Compile / assembly
)
.value,
Test / test := (Test / test)
.dependsOn(
defaultModel / Compile / assembly,
flinkTableApiComponents / Compile / assembly,
flinkDevModel / Compile / assembly,
flinkExecutor / Compile / assembly,
flinkExecutor / prepareItLibs
)
.value,
prepareDesignerSlowTests := {
(flinkDeploymentManager / assembly).value
(liteEmbeddedDeploymentManager / assembly).value
(liteK8sDeploymentManager / assembly).value
(flinkDevModel / assembly).value
(flinkExecutor / assembly).value
},
prepareDesignerTests := {
(flinkDeploymentManager / assembly).value
(liteEmbeddedDeploymentManager / assembly).value
(liteK8sDeploymentManager / assembly).value
(defaultModel / assembly).value
(flinkTableApiComponents / assembly).value
(flinkDevModel / assembly).value
(flinkExecutor / assembly).value
(flinkExecutor / prepareItLibs).value
},
ThisBuild / parallelExecution := false,
SlowTests / test := (SlowTests / test).dependsOn(prepareDesignerSlowTests).value,
SlowTests / testOptions += Tests.Setup(() => prepareDesignerSlowTests.value),
Test / test := (Test / test).dependsOn(prepareDesignerTests).value,
Test / testOptions += Tests.Setup(() => prepareDesignerTests.value),
/*
We depend on copyClientDist in packageBin and assembly to be make sure fe files will be included in jar and fajar
We depend on copyClientDist in packageBin and assembly to be make sure FE files will be included in jar and fajar
We abuse sbt a little bit, but we don't want to put webpack in generate resources phase, as it's long and it would
make compilation v. long. This is not too nice, but so far only alternative is to put designer dists copyClientDist outside sbt and
use bash to control when it's done - and this can lead to bugs and edge cases (release, dist/docker, dist/tgz, assembly...)
Expand Down Expand Up @@ -1994,6 +2008,7 @@ lazy val designer = (project in file("designer/server"))
"org.apache.xmlgraphics" % "fop" % "2.9" exclude ("commons-logging", "commons-logging"),
"com.beachape" %% "enumeratum-circe" % enumeratumV,
"tf.tofu" %% "derevo-circe" % "0.13.0",
"com.softwaremill.retry" %% "retry" % retryV,
"com.softwaremill.sttp.apispec" %% "openapi-circe-yaml" % openapiCirceYamlV,
"com.github.tminglei" %% "slick-pg" % slickPgV,
"com.softwaremill.sttp.tapir" %% "tapir-akka-http-server" % tapirV,
Expand Down Expand Up @@ -2033,24 +2048,16 @@ lazy val designer = (project in file("designer/server"))
processReports,
security,
deploymentManagerApi,
componentsApi,
restmodel,
listenerApi,
configLoaderApi,
defaultHelpers % Test,
testUtils % Test,
flinkTestUtils % Test,
componentsApi % "test->test",
// All DeploymentManager dependencies are added because they are needed to run NussknackerApp* with
// dev-application.conf. Currently, we doesn't have a separate classpath for DMs like we have for components.
// schemedKafkaComponentsUtils is added because loading the provided liteEmbeddedDeploymentManager causes
// that are also load added their test dependencies on the classpath by the Idea. It causes that
// UniversalKafkaSourceFactory is loaded from app classloader and GenericRecord which is defined in typesToExtract
// is missing from this classloader
flinkDeploymentManager % Provided,
liteEmbeddedDeploymentManager % Provided,
liteK8sDeploymentManager % Provided,
developmentTestsDeploymentManager % Provided,
schemedKafkaComponentsUtils % Provided,
developmentTestsDeploymentManager % Test,
kafkaComponentsUtils % Test,
componentsApi % "test->test"
)

lazy val e2eTests = (project in file("e2e-tests"))
Expand Down Expand Up @@ -2254,10 +2261,10 @@ prepareDev := {
(flinkExecutor / prepareItLibs).value
val workTarget = (designer / baseDirectory).value / "work"
val artifacts =
(distribution / componentArtifacts).value ++ (distribution / devArtifacts).value ++ developmentTestsDeployManagerArtifacts.value ++
Def
.taskDyn(if (addManagerArtifacts) distribution / managerArtifacts else Def.task[List[(File, String)]](Nil))
.value ++
(distribution / componentArtifacts).value ++
(distribution / devArtifacts).value ++
developmentTestsDeployManagerArtifacts.value ++
(distribution / deploymentManagerArtifacts).value ++
(flinkExecutor / additionalBundledArtifacts).value
IO.copy(artifacts.map { case (source, target) => (source, workTarget / target) })
(designer / copyClientDist).value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ package object definition {
name: String,
typ: TypingResult,
editor: ParameterEditor,
// It it used for node parameter adjustment on FE side (see ParametersUtils.ts -> adjustParameters)
// It is used for node parameter adjustment on FE side (see ParametersUtils.ts -> adjustParameters)
defaultValue: Expression,
// additionalVariables and variablesToHide are served to FE because suggestions API requires full set of variables
// and ScenarioWithDetails.json.validationResult.nodeResults is not enough
Expand Down
9 changes: 6 additions & 3 deletions designer/server/src/main/resources/defaultDesignerConfig.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#We use defaultUConfig.conf instead of reference.conf, as we don't want these properties in config loaded in model configuration
#This configuration file contains sensible designer defaults for all Nussknacker deployments, without assumptions about deployment models and external tools (grafana, flink etc.)
#All models configurations also shouldn't be in this file
# We use defaultUConfig.conf instead of reference.conf, as we don't want these properties in config loaded in model configuration
# This configuration file contains sensible designer defaults for all Nussknacker deployments, without assumptions about deployment
# models and external tools (grafana, flink etc.). All models configurations also shouldn't be in this file

managersDirs: ["./managers"]
managersDirs: [ ${?MANAGERS_DIR} ]

storageDir: ./storage
storageDir: ${?STORAGE_DIR}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package pl.touk.nussknacker.ui
import cats.effect.{IO, Resource}
import com.typesafe.config.{Config, ConfigFactory, ConfigValue, ConfigValueFactory}
import org.apache.commons.io.FileUtils
import pl.touk.nussknacker.engine.util.loader.DeploymentManagersClassLoader
import pl.touk.nussknacker.engine.{DeploymentManagerProvider, ModelData}
import pl.touk.nussknacker.ui.config.DesignerConfigLoader
import pl.touk.nussknacker.ui.config.{DesignerConfig, SimpleConfigLoadingDesignerConfigLoader}
import pl.touk.nussknacker.ui.factory.NussknackerAppFactory
import pl.touk.nussknacker.ui.process.processingtype.loader.LocalProcessingTypeDataLoader

Expand Down Expand Up @@ -49,15 +50,21 @@ object LocalNussknackerWithSingleModel {
modelData = Map(typeName -> (category, modelData)),
deploymentManagerProvider = deploymentManagerProvider
)
val designerConfigLoader = DesignerConfigLoader.fromConfig(
val designerConfig = DesignerConfig.from(
// This map is ignored but must exist
appConfig.withValue("scenarioTypes", ConfigValueFactory.fromMap(Map.empty[String, ConfigValue].asJava))
)
val appFactory = new NussknackerAppFactory(
designerConfigLoader,
_ => local
)
appFactory.createApp()
for {
deploymentManagersClassLoader <- DeploymentManagersClassLoader.create(List.empty)
designerConfigLoader = new SimpleConfigLoadingDesignerConfigLoader(designerConfig.rawConfig.resolved)
appFactory = new NussknackerAppFactory(
designerConfig,
designerConfigLoader,
_ => local,
deploymentManagersClassLoader
)
app <- appFactory.createApp()
} yield app
}

// TODO: easier way of handling users file
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package pl.touk.nussknacker.ui

import cats.effect.{ExitCode, IO, IOApp}
import pl.touk.nussknacker.ui.config.{AlwaysLoadingFileBasedDesignerConfigLoader, DesignerConfigLoader}
import pl.touk.nussknacker.ui.config.AlwaysLoadingFileBasedDesignerConfigLoader
import pl.touk.nussknacker.ui.factory.NussknackerAppFactory

object NussknackerApp extends IOApp {

override def run(args: List[String]): IO[ExitCode] = {
for {
appFactory <- IO(NussknackerAppFactory(AlwaysLoadingFileBasedDesignerConfigLoader(getClass.getClassLoader)))
_ <- appFactory.createApp().use { _ => IO.never }
} yield ExitCode.Success
program.useForever.as(ExitCode.Success)
}

private def program = for {
appFactory <- NussknackerAppFactory.create(AlwaysLoadingFileBasedDesignerConfigLoader(getClass.getClassLoader))
_ <- appFactory.createApp()
} yield ()

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ class AdditionalInfoProviders(typeToConfig: ProcessingTypeDataProvider[ModelData
private val nodeProviders: ProcessingTypeDataProvider[Option[NodeData => Future[Option[AdditionalInfo]]], _] =
typeToConfig.mapValues(pt =>
ScalaServiceLoader
.load[AdditionalInfoProvider](pt.modelClassLoader.classLoader)
.load[AdditionalInfoProvider](pt.modelClassLoader)
.headOption
.map(_.nodeAdditionalInfo(pt.modelConfig))
)

private val propertiesProviders: ProcessingTypeDataProvider[Option[MetaData => Future[Option[AdditionalInfo]]], _] =
typeToConfig.mapValues(pt =>
ScalaServiceLoader
.load[AdditionalInfoProvider](pt.modelClassLoader.classLoader)
.load[AdditionalInfoProvider](pt.modelClassLoader)
.headOption
.map(_.propertiesAdditionalInfo(pt.modelConfig))
)
Expand Down
Loading

0 comments on commit 942db11

Please sign in to comment.