Skip to content

Commit

Permalink
[NU-1954] Namespace improvements: configurable separator, namespacing…
Browse files Browse the repository at this point in the history
… Kafka consumer groups and missing Lite metric namespace tag (#7468)
  • Loading branch information
jedrz authored Jan 29, 2025
1 parent e3375ca commit f8960d1
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,45 @@ package pl.touk.nussknacker.engine.api.namespaces

import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus._
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy.namespaceSeparator

final case class NamingStrategy(namespace: Option[String]) {
final case class Namespace(value: String, separator: String)

private val namespacePattern = namespace.map(ns => s"^$ns$namespaceSeparator(.*)".r)
final case class NamingStrategy(namespace: Option[Namespace]) {

def prepareName(name: String): String = namespace match {
case Some(value) => s"$value$namespaceSeparator$name"
case None => name
case Some(Namespace(value, separator)) => s"$value$separator$name"
case None => name
}

def decodeName(name: String): Option[String] = namespacePattern match {
case Some(pattern) =>
name match {
case pattern(originalName) => Some(originalName)
case _ => None
}
case None => Some(name)
def decodeName(name: String): Option[String] = namespace match {
case Some(Namespace(value, separator)) if name.startsWith(s"$value$separator") =>
Some(name.stripPrefix(s"$value$separator"))
case Some(Namespace(_, _)) => None
case None => Some(name)
}

}

object NamingStrategy {
private val namespaceSeparator = "_"
private val namespacePath = "namespace"
private val defaultNamespaceSeparator = "_"
private val namespacePath = "namespace"
private val namespaceSeparatorPath = "namespaceSeparator"

val Disabled: NamingStrategy = NamingStrategy(None)

def fromConfig(modelConfig: Config): NamingStrategy = {
if (modelConfig.hasPath(namespacePath)) {
readNamespaceConfig(modelConfig)
} else {
Disabled
}
}

def fromConfig(modelConfig: Config): NamingStrategy = NamingStrategy(modelConfig.getAs[String](namespacePath))
private def readNamespaceConfig(modelConfig: Config): NamingStrategy = {
val value = modelConfig.as[String](namespacePath)
val separator = modelConfig.getAs[String](namespaceSeparatorPath).getOrElse(defaultNamespaceSeparator)
val namespace = Namespace(value, separator)
NamingStrategy(Some(namespace))
}

}
Original file line number Diff line number Diff line change
@@ -1,21 +1,48 @@
package pl.touk.nussknacker.engine.api.namespaces

import com.typesafe.config.ConfigFactory
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

class NamingStrategySpec extends AnyFunSuite with Matchers {

test("should leave original names if no namespace configured") {
val defaultNaming = NamingStrategy(None)
val defaultNaming = NamingStrategy.Disabled

defaultNaming.prepareName("original") shouldBe "original"
defaultNaming.decodeName("original") shouldBe Some("original")
}

test("should add namespace if configured") {
val namingStrategy = NamingStrategy(Some("customer1"))
val namingStrategy = NamingStrategy(Some(Namespace("customer1", "_")))

namingStrategy.prepareName("original") shouldBe "customer1_original"
namingStrategy.decodeName("customer1_someName") shouldBe Some("someName")
namingStrategy.decodeName("dummy??") shouldBe None
}

test("should read disabled naming strategy config") {
val namingStrategy = NamingStrategy.fromConfig(ConfigFactory.empty())

namingStrategy.prepareName("original") shouldBe "original"
}

test("should read naming strategy config with default separator") {
val config = ConfigFactory.parseString("""namespace: customer1""")

val namingStrategy = NamingStrategy.fromConfig(config)

namingStrategy.prepareName("original") shouldBe "customer1_original"
}

test("should read naming strategy config with specified separator") {
val config = ConfigFactory.parseString("""
|namespace: customer1
|namespaceSeparator: "."""".stripMargin)

val namingStrategy = NamingStrategy.fromConfig(config)

namingStrategy.prepareName("original") shouldBe "customer1.original"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class StubModelDataWithModelDefinition(
override def modelConfigLoader: ModelConfigLoader =
new DefaultModelConfigLoader(_ => true)

override def namingStrategy: NamingStrategy = NamingStrategy(None)
override def namingStrategy: NamingStrategy = NamingStrategy.Disabled

override def inputConfigDuringExecution: InputConfigDuringExecution = InputConfigDuringExecution(
configDuringExecution
Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
* [#7498](https://github.com/TouK/nussknacker/pull/7498) Support many migrations loaded using SPI. Loaded migration numbers
cannot overlap, if they do, an exception is thrown.
* [#7504](https://github.com/TouK/nussknacker/pull/7504) Return scenario validation error when an incompatible change was introduced in a fragment or component parameter definition.
* [#7468](https://github.com/TouK/nussknacker/pull/7468) Configurable namespace separator (was fixed to `_`), added namespace tag to Lite engine metrics and fixed namespacing of Kafka consumer groups.

## 1.18

Expand Down
3 changes: 3 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* [#7458](https://github.com/TouK/nussknacker/pull/7458) Flink scenario testing mechanism and scenario state verification mechanism: by default mini cluster is created once and reused each time
To revert previous behaviour (creating minicluster each time), change `deploymentConfig.scenarioTesting.reuseMiniClusterForScenarioTesting` or/and
`deploymentConfig.scenarioTesting.reuseMiniClusterForScenarioStateVerification` to `false`
* [#7468](https://github.com/TouK/nussknacker/pull/7468) When a namespace is configured, Kafka consumer groups are also namespaced.
This change should have been introduced as of starting from Nussknacker 1.15 when a feature flag `useNamingStrategyForConsumerGroupId`
was removed to temporarily disable consumer group namespacing.
### Code API changes
* [#7368](https://github.com/TouK/nussknacker/pull/7368) Renamed `PeriodicSourceFactory` to `SampleGeneratorSourceFactory`
Expand Down
23 changes: 23 additions & 0 deletions docs/configuration/model/ModelConfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,26 @@ The docs button which is displayed in the scenario properties modal window is by
``` scenarioPropertiesDocsUrl: "http://custom-configurable-link"```

in the config.

## Multitenancy support

Nussknacker supports multitenancy, allowing multiple Nussknacker designer instances to operate on shared infrastructure components,
such as Kafka, Flink, or InfluxDB. This is achieved by using configured namespaces to isolate resources:
- Kafka Topics: Prefixed with a namespace
- Kafka Consumer Groups: Prefixed with a namespace
- Flink Jobs: Prefixed with a namespace
- InfluxDB Metrics: Tagged with an additional namespace tag

A namespace configuration can be defined as follows:

```
modelConfig: {
...
namespace: customer1
namespaceSeparator: "_"
}
```

Notes:
- The `namespaceSeparator` is optional and defaults to `_` if not specified.
- This configuration applies a uniform namespace to all resources.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object NamespaceMetricsTags {
NamespaceMetricsTags(
Map(
originalNameTag -> scenarioName,
namespaceTag -> namespace
namespaceTag -> namespace.value
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FlinkKafkaComponentProvider extends ComponentProvider {
): ProcessObjectDependencies = {
val disableNamespacePath = "disableNamespace"
if (config.hasPath(disableNamespacePath) && config.getBoolean(disableNamespacePath)) {
dependencies.copy(namingStrategy = NamingStrategy(None))
dependencies.copy(namingStrategy = NamingStrategy.Disabled)
} else {
dependencies
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.functional

import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import org.apache.flink.configuration.Configuration
import org.apache.flink.metrics.{Counter, Gauge, Histogram}
import org.scalatest.LoneElement._
Expand Down Expand Up @@ -230,6 +231,25 @@ class MetricsSpec
}
}

test("should add namespace to metrics") { implicit scenarioName =>
val process = ScenarioBuilder
.streaming(scenarioName.value)
.source("source1", "input")
.emptySink("out1", "monitor")
val data = List(
SimpleRecord("1", 12, "a", new Date(0))
)
val namespace = "a-tenant"

processInvoker.invokeWithSampleData(
process,
data,
ConfigFactory.empty().withValue("namespace", ConfigValueFactory.fromAnyRef(namespace))
)

counter(s"namespace.$namespace.nodeId.source1.originalProcessName.${scenarioName.value}.nodeCount") shouldBe 1
}

private def counter(name: String)(implicit scenarioName: ProcessName): Long = withClue(s"counter $name") {
reporter.testMetrics[Counter](name).loneElement.getCount
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ class FlinkKafkaSource[T](
)
}

private def prepareConsumerGroupId(nodeContext: FlinkCustomNodeContext): String = overriddenConsumerGroup match {
case Some(overridden) => overridden
case None => ConsumerGroupDeterminer(kafkaConfig).consumerGroup(nodeContext)
private def prepareConsumerGroupId(nodeContext: FlinkCustomNodeContext): String = {
val baseName = overriddenConsumerGroup.getOrElse(ConsumerGroupDeterminer(kafkaConfig).consumerGroup(nodeContext))
namingStrategy.prepareName(baseName)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class EmbeddedDeploymentManagerProvider extends LiteDeploymentManagerProvider {
RequestResponseDeploymentStrategy(engineConfig)
)

val metricRegistry = LiteMetricRegistryFactory.usingHostnameAsDefaultInstanceId.prepareRegistry(engineConfig)
val metricRegistry = LiteMetricRegistryFactory
.usingHostnameAsDefaultInstanceId(modelData.namingStrategy.namespace)
.prepareRegistry(engineConfig)
val contextPreparer = new LiteEngineRuntimeContextPreparer(new DropwizardMetricsProviderFactory(metricRegistry))

strategy.open(modelData.asInvokableModelData, contextPreparer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cats.effect.{IO, Resource}
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader
import pl.touk.nussknacker.engine.api.namespaces.Namespace
import pl.touk.nussknacker.engine.api.{JobData, LiteStreamMetaData, ProcessVersion, RequestResponseMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.lite.RunnableScenarioInterpreter
Expand Down Expand Up @@ -36,7 +37,7 @@ object RunnableScenarioInterpreterFactory extends LazyLogging {
ModelClassLoader(urls, workingDirectoryOpt = None, deploymentManagersClassLoader),
resolveConfigs = true
)
val metricRegistry = prepareMetricRegistry(runtimeConfig)
val metricRegistry = prepareMetricRegistry(runtimeConfig, modelData.namingStrategy.namespace)
val preparer = new LiteEngineRuntimeContextPreparer(new DropwizardMetricsProviderFactory(metricRegistry))
// TODO Pass correct ProcessVersion and DeploymentData
val jobData = JobData(scenario.metaData, ProcessVersion.empty.copy(processName = scenario.metaData.name))
Expand Down Expand Up @@ -75,9 +76,9 @@ object RunnableScenarioInterpreterFactory extends LazyLogging {
}
}

private def prepareMetricRegistry(engineConfig: Config) = {
private def prepareMetricRegistry(engineConfig: Config, namespace: Option[Namespace]) = {
lazy val instanceId = sys.env.getOrElse("INSTANCE_ID", LiteMetricRegistryFactory.hostname)
new LiteMetricRegistryFactory(instanceId).prepareRegistry(engineConfig)
new LiteMetricRegistryFactory(instanceId, namespace).prepareRegistry(engineConfig)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import com.typesafe.scalalogging.LazyLogging
import io.dropwizard.metrics5.{MetricName, MetricRegistry}
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader
import pl.touk.nussknacker.engine.api.namespaces.Namespace
import pl.touk.nussknacker.engine.lite.metrics.dropwizard.influxdb.LiteEngineInfluxDbReporter
import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader

import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLogging {
class LiteMetricRegistryFactory(defaultInstanceId: => String, namespace: Option[Namespace]) extends LazyLogging {

import LiteMetricRegistryFactory._

val metricsConfigPath = "metrics"

Expand All @@ -28,7 +31,7 @@ class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLoggin
}

private def registerReporters(metricRegistry: MetricRegistry, metricsConfig: Config): Unit = {
val prefix = preparePrefix(metricsConfig.rootAs[CommonMetricConfig])
val prefix = prepareMetricPrefix(metricsConfig.rootAs[CommonMetricConfig], defaultInstanceId, namespace)
val metricReporters = loadMetricsReporters()
if (metricReporters.nonEmpty) {
metricReporters.foreach { reporter =>
Expand All @@ -40,15 +43,6 @@ class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLoggin
new JmxMetricsReporter().createAndRunReporter(metricRegistry, prefix, metricsConfig)
}

private def preparePrefix(conf: CommonMetricConfig): MetricName = {
conf.prefix
.map(MetricName.build(_))
.getOrElse(MetricName.empty())
.tagged("instanceId", conf.instanceId.getOrElse(defaultInstanceId))
.tagged("env", conf.environment)
.tagged(conf.additionalTags.asJava)
}

private def loadMetricsReporters(): List[MetricsReporter] = {
try {
val reporters = ScalaServiceLoader.load[MetricsReporter](Thread.currentThread().getContextClassLoader)
Expand All @@ -61,20 +55,12 @@ class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLoggin
}
}

case class CommonMetricConfig(
prefix: Option[String],
instanceId: Option[String],
environment: String,
additionalTags: Map[String, String] = Map.empty
)

}

object LiteMetricRegistryFactory extends LazyLogging {

def usingHostnameAsDefaultInstanceId = new LiteMetricRegistryFactory(hostname)

def usingHostnameAndPortAsDefaultInstanceId(port: Int) = new LiteMetricRegistryFactory(s"$hostname:$port")
def usingHostnameAsDefaultInstanceId(namespace: Option[Namespace]) =
new LiteMetricRegistryFactory(hostname, namespace)

def hostname: String = {
// Checking COMPUTERNAME to make it works also on windows, see: https://stackoverflow.com/a/33112997/1370301
Expand All @@ -84,4 +70,28 @@ object LiteMetricRegistryFactory extends LazyLogging {
}
}

private[dropwizard] def prepareMetricPrefix(
conf: CommonMetricConfig,
defaultInstanceId: => String,
namespace: Option[Namespace]
): MetricName = {
val metricPrefix = conf.prefix
.map(MetricName.build(_))
.getOrElse(MetricName.empty())
.tagged("instanceId", conf.instanceId.getOrElse(defaultInstanceId))
.tagged("env", conf.environment)
.tagged(conf.additionalTags.asJava)
namespace match {
case Some(Namespace(value, _)) => metricPrefix.tagged("namespace", value)
case None => metricPrefix
}
}

private[dropwizard] case class CommonMetricConfig(
prefix: Option[String],
instanceId: Option[String],
environment: String,
additionalTags: Map[String, String] = Map.empty
)

}
Loading

0 comments on commit f8960d1

Please sign in to comment.