From f8960d1d85f154351adc189f9eb3b17917175907 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20J=C4=99drzejewski?= Date: Wed, 29 Jan 2025 17:03:48 +0100 Subject: [PATCH] [NU-1954] Namespace improvements: configurable separator, namespacing Kafka consumer groups and missing Lite metric namespace tag (#7468) --- .../api/namespaces/NamingStrategy.scala | 43 +++++++++------ .../api/namespaces/NamingStrategySpec.scala | 31 ++++++++++- .../StubModelDataWithModelDefinition.scala | 2 +- docs/Changelog.md | 1 + docs/MigrationGuide.md | 3 ++ .../configuration/model/ModelConfiguration.md | 23 ++++++++ .../engine/flink/api/NkGlobalParameters.scala | 2 +- .../FlinkKafkaComponentProvider.scala | 2 +- .../process/functional/MetricsSpec.scala | 20 +++++++ .../kafka/source/flink/FlinkKafkaSource.scala | 6 +-- .../EmbeddedDeploymentManagerProvider.scala | 4 +- .../RunnableScenarioInterpreterFactory.scala | 7 +-- .../LiteMetricRegistryFactory.scala | 52 +++++++++++-------- .../LiteMetricRegistryFactoryTest.scala | 38 ++++++++++++++ 14 files changed, 186 insertions(+), 48 deletions(-) create mode 100644 engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/metrics/dropwizard/LiteMetricRegistryFactoryTest.scala diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/namespaces/NamingStrategy.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/namespaces/NamingStrategy.scala index bf2deb179c0..dd85a4d4d6a 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/namespaces/NamingStrategy.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/namespaces/NamingStrategy.scala @@ -2,32 +2,45 @@ package pl.touk.nussknacker.engine.api.namespaces import com.typesafe.config.Config import net.ceedubs.ficus.Ficus._ -import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy.namespaceSeparator -final case class NamingStrategy(namespace: Option[String]) { +final case class Namespace(value: String, separator: String) - private val namespacePattern = namespace.map(ns => s"^$ns$namespaceSeparator(.*)".r) +final case class NamingStrategy(namespace: Option[Namespace]) { def prepareName(name: String): String = namespace match { - case Some(value) => s"$value$namespaceSeparator$name" - case None => name + case Some(Namespace(value, separator)) => s"$value$separator$name" + case None => name } - def decodeName(name: String): Option[String] = namespacePattern match { - case Some(pattern) => - name match { - case pattern(originalName) => Some(originalName) - case _ => None - } - case None => Some(name) + def decodeName(name: String): Option[String] = namespace match { + case Some(Namespace(value, separator)) if name.startsWith(s"$value$separator") => + Some(name.stripPrefix(s"$value$separator")) + case Some(Namespace(_, _)) => None + case None => Some(name) } } object NamingStrategy { - private val namespaceSeparator = "_" - private val namespacePath = "namespace" + private val defaultNamespaceSeparator = "_" + private val namespacePath = "namespace" + private val namespaceSeparatorPath = "namespaceSeparator" + + val Disabled: NamingStrategy = NamingStrategy(None) + + def fromConfig(modelConfig: Config): NamingStrategy = { + if (modelConfig.hasPath(namespacePath)) { + readNamespaceConfig(modelConfig) + } else { + Disabled + } + } - def fromConfig(modelConfig: Config): NamingStrategy = NamingStrategy(modelConfig.getAs[String](namespacePath)) + private def readNamespaceConfig(modelConfig: Config): NamingStrategy = { + val value = modelConfig.as[String](namespacePath) + val separator = modelConfig.getAs[String](namespaceSeparatorPath).getOrElse(defaultNamespaceSeparator) + val namespace = Namespace(value, separator) + NamingStrategy(Some(namespace)) + } } diff --git a/components-api/src/test/scala/pl/touk/nussknacker/engine/api/namespaces/NamingStrategySpec.scala b/components-api/src/test/scala/pl/touk/nussknacker/engine/api/namespaces/NamingStrategySpec.scala index 2924c8304c7..fc7d9fa34fc 100644 --- a/components-api/src/test/scala/pl/touk/nussknacker/engine/api/namespaces/NamingStrategySpec.scala +++ b/components-api/src/test/scala/pl/touk/nussknacker/engine/api/namespaces/NamingStrategySpec.scala @@ -1,21 +1,48 @@ package pl.touk.nussknacker.engine.api.namespaces +import com.typesafe.config.ConfigFactory import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers class NamingStrategySpec extends AnyFunSuite with Matchers { test("should leave original names if no namespace configured") { - val defaultNaming = NamingStrategy(None) + val defaultNaming = NamingStrategy.Disabled + defaultNaming.prepareName("original") shouldBe "original" defaultNaming.decodeName("original") shouldBe Some("original") } test("should add namespace if configured") { - val namingStrategy = NamingStrategy(Some("customer1")) + val namingStrategy = NamingStrategy(Some(Namespace("customer1", "_"))) + namingStrategy.prepareName("original") shouldBe "customer1_original" namingStrategy.decodeName("customer1_someName") shouldBe Some("someName") namingStrategy.decodeName("dummy??") shouldBe None } + test("should read disabled naming strategy config") { + val namingStrategy = NamingStrategy.fromConfig(ConfigFactory.empty()) + + namingStrategy.prepareName("original") shouldBe "original" + } + + test("should read naming strategy config with default separator") { + val config = ConfigFactory.parseString("""namespace: customer1""") + + val namingStrategy = NamingStrategy.fromConfig(config) + + namingStrategy.prepareName("original") shouldBe "customer1_original" + } + + test("should read naming strategy config with specified separator") { + val config = ConfigFactory.parseString(""" + |namespace: customer1 + |namespaceSeparator: "."""".stripMargin) + + val namingStrategy = NamingStrategy.fromConfig(config) + + namingStrategy.prepareName("original") shouldBe "customer1.original" + } + } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/StubModelDataWithModelDefinition.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/StubModelDataWithModelDefinition.scala index e216b90d91c..b6799b46351 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/StubModelDataWithModelDefinition.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/StubModelDataWithModelDefinition.scala @@ -26,7 +26,7 @@ class StubModelDataWithModelDefinition( override def modelConfigLoader: ModelConfigLoader = new DefaultModelConfigLoader(_ => true) - override def namingStrategy: NamingStrategy = NamingStrategy(None) + override def namingStrategy: NamingStrategy = NamingStrategy.Disabled override def inputConfigDuringExecution: InputConfigDuringExecution = InputConfigDuringExecution( configDuringExecution diff --git a/docs/Changelog.md b/docs/Changelog.md index ccdc31f17d4..4fabcf6833e 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -67,6 +67,7 @@ * [#7498](https://github.com/TouK/nussknacker/pull/7498) Support many migrations loaded using SPI. Loaded migration numbers cannot overlap, if they do, an exception is thrown. * [#7504](https://github.com/TouK/nussknacker/pull/7504) Return scenario validation error when an incompatible change was introduced in a fragment or component parameter definition. +* [#7468](https://github.com/TouK/nussknacker/pull/7468) Configurable namespace separator (was fixed to `_`), added namespace tag to Lite engine metrics and fixed namespacing of Kafka consumer groups. ## 1.18 diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index dd50634770e..01c2deb616d 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -77,6 +77,9 @@ To see the biggest differences please consult the [changelog](Changelog.md). * [#7458](https://github.com/TouK/nussknacker/pull/7458) Flink scenario testing mechanism and scenario state verification mechanism: by default mini cluster is created once and reused each time To revert previous behaviour (creating minicluster each time), change `deploymentConfig.scenarioTesting.reuseMiniClusterForScenarioTesting` or/and `deploymentConfig.scenarioTesting.reuseMiniClusterForScenarioStateVerification` to `false` +* [#7468](https://github.com/TouK/nussknacker/pull/7468) When a namespace is configured, Kafka consumer groups are also namespaced. + This change should have been introduced as of starting from Nussknacker 1.15 when a feature flag `useNamingStrategyForConsumerGroupId` + was removed to temporarily disable consumer group namespacing. ### Code API changes * [#7368](https://github.com/TouK/nussknacker/pull/7368) Renamed `PeriodicSourceFactory` to `SampleGeneratorSourceFactory` diff --git a/docs/configuration/model/ModelConfiguration.md b/docs/configuration/model/ModelConfiguration.md index 3e630f62121..52971b60176 100644 --- a/docs/configuration/model/ModelConfiguration.md +++ b/docs/configuration/model/ModelConfiguration.md @@ -186,3 +186,26 @@ The docs button which is displayed in the scenario properties modal window is by ``` scenarioPropertiesDocsUrl: "http://custom-configurable-link"``` in the config. + +## Multitenancy support + +Nussknacker supports multitenancy, allowing multiple Nussknacker designer instances to operate on shared infrastructure components, +such as Kafka, Flink, or InfluxDB. This is achieved by using configured namespaces to isolate resources: +- Kafka Topics: Prefixed with a namespace +- Kafka Consumer Groups: Prefixed with a namespace +- Flink Jobs: Prefixed with a namespace +- InfluxDB Metrics: Tagged with an additional namespace tag + +A namespace configuration can be defined as follows: + +``` +modelConfig: { + ... + namespace: customer1 + namespaceSeparator: "_" +} +``` + +Notes: +- The `namespaceSeparator` is optional and defaults to `_` if not specified. +- This configuration applies a uniform namespace to all resources. diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala index ac9e0bc9960..d8709f470a7 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala @@ -52,7 +52,7 @@ object NamespaceMetricsTags { NamespaceMetricsTags( Map( originalNameTag -> scenarioName, - namespaceTag -> namespace + namespaceTag -> namespace.value ) ) } diff --git a/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala b/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala index d79c21b0d53..ed892a6721d 100644 --- a/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala +++ b/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala @@ -70,7 +70,7 @@ class FlinkKafkaComponentProvider extends ComponentProvider { ): ProcessObjectDependencies = { val disableNamespacePath = "disableNamespace" if (config.hasPath(disableNamespacePath) && config.getBoolean(disableNamespacePath)) { - dependencies.copy(namingStrategy = NamingStrategy(None)) + dependencies.copy(namingStrategy = NamingStrategy.Disabled) } else { dependencies } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/MetricsSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/MetricsSpec.scala index e44c1e3a82e..03d87974311 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/MetricsSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/MetricsSpec.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.functional +import com.typesafe.config.{ConfigFactory, ConfigValueFactory} import org.apache.flink.configuration.Configuration import org.apache.flink.metrics.{Counter, Gauge, Histogram} import org.scalatest.LoneElement._ @@ -230,6 +231,25 @@ class MetricsSpec } } + test("should add namespace to metrics") { implicit scenarioName => + val process = ScenarioBuilder + .streaming(scenarioName.value) + .source("source1", "input") + .emptySink("out1", "monitor") + val data = List( + SimpleRecord("1", 12, "a", new Date(0)) + ) + val namespace = "a-tenant" + + processInvoker.invokeWithSampleData( + process, + data, + ConfigFactory.empty().withValue("namespace", ConfigValueFactory.fromAnyRef(namespace)) + ) + + counter(s"namespace.$namespace.nodeId.source1.originalProcessName.${scenarioName.value}.nodeCount") shouldBe 1 + } + private def counter(name: String)(implicit scenarioName: ProcessName): Long = withClue(s"counter $name") { reporter.testMetrics[Counter](name).loneElement.getCount } diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala index ee856a4e369..556413292db 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala @@ -138,9 +138,9 @@ class FlinkKafkaSource[T]( ) } - private def prepareConsumerGroupId(nodeContext: FlinkCustomNodeContext): String = overriddenConsumerGroup match { - case Some(overridden) => overridden - case None => ConsumerGroupDeterminer(kafkaConfig).consumerGroup(nodeContext) + private def prepareConsumerGroupId(nodeContext: FlinkCustomNodeContext): String = { + val baseName = overriddenConsumerGroup.getOrElse(ConsumerGroupDeterminer(kafkaConfig).consumerGroup(nodeContext)) + namingStrategy.prepareName(baseName) } } diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManagerProvider.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManagerProvider.scala index 6d450367e62..00a295f0bc1 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManagerProvider.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManagerProvider.scala @@ -31,7 +31,9 @@ class EmbeddedDeploymentManagerProvider extends LiteDeploymentManagerProvider { RequestResponseDeploymentStrategy(engineConfig) ) - val metricRegistry = LiteMetricRegistryFactory.usingHostnameAsDefaultInstanceId.prepareRegistry(engineConfig) + val metricRegistry = LiteMetricRegistryFactory + .usingHostnameAsDefaultInstanceId(modelData.namingStrategy.namespace) + .prepareRegistry(engineConfig) val contextPreparer = new LiteEngineRuntimeContextPreparer(new DropwizardMetricsProviderFactory(metricRegistry)) strategy.open(modelData.asInvokableModelData, contextPreparer) diff --git a/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala b/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala index d35a894d967..81f57fbccfc 100644 --- a/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala +++ b/engine/lite/runtime-app/src/main/scala/pl/touk/nussknacker/engine/lite/app/RunnableScenarioInterpreterFactory.scala @@ -5,6 +5,7 @@ import cats.effect.{IO, Resource} import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader +import pl.touk.nussknacker.engine.api.namespaces.Namespace import pl.touk.nussknacker.engine.api.{JobData, LiteStreamMetaData, ProcessVersion, RequestResponseMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.lite.RunnableScenarioInterpreter @@ -36,7 +37,7 @@ object RunnableScenarioInterpreterFactory extends LazyLogging { ModelClassLoader(urls, workingDirectoryOpt = None, deploymentManagersClassLoader), resolveConfigs = true ) - val metricRegistry = prepareMetricRegistry(runtimeConfig) + val metricRegistry = prepareMetricRegistry(runtimeConfig, modelData.namingStrategy.namespace) val preparer = new LiteEngineRuntimeContextPreparer(new DropwizardMetricsProviderFactory(metricRegistry)) // TODO Pass correct ProcessVersion and DeploymentData val jobData = JobData(scenario.metaData, ProcessVersion.empty.copy(processName = scenario.metaData.name)) @@ -75,9 +76,9 @@ object RunnableScenarioInterpreterFactory extends LazyLogging { } } - private def prepareMetricRegistry(engineConfig: Config) = { + private def prepareMetricRegistry(engineConfig: Config, namespace: Option[Namespace]) = { lazy val instanceId = sys.env.getOrElse("INSTANCE_ID", LiteMetricRegistryFactory.hostname) - new LiteMetricRegistryFactory(instanceId).prepareRegistry(engineConfig) + new LiteMetricRegistryFactory(instanceId, namespace).prepareRegistry(engineConfig) } } diff --git a/engine/lite/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/metrics/dropwizard/LiteMetricRegistryFactory.scala b/engine/lite/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/metrics/dropwizard/LiteMetricRegistryFactory.scala index b567dcff4d2..32bd1e6ff53 100644 --- a/engine/lite/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/metrics/dropwizard/LiteMetricRegistryFactory.scala +++ b/engine/lite/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/metrics/dropwizard/LiteMetricRegistryFactory.scala @@ -5,6 +5,7 @@ import com.typesafe.scalalogging.LazyLogging import io.dropwizard.metrics5.{MetricName, MetricRegistry} import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader +import pl.touk.nussknacker.engine.api.namespaces.Namespace import pl.touk.nussknacker.engine.lite.metrics.dropwizard.influxdb.LiteEngineInfluxDbReporter import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader @@ -12,7 +13,9 @@ import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal -class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLogging { +class LiteMetricRegistryFactory(defaultInstanceId: => String, namespace: Option[Namespace]) extends LazyLogging { + + import LiteMetricRegistryFactory._ val metricsConfigPath = "metrics" @@ -28,7 +31,7 @@ class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLoggin } private def registerReporters(metricRegistry: MetricRegistry, metricsConfig: Config): Unit = { - val prefix = preparePrefix(metricsConfig.rootAs[CommonMetricConfig]) + val prefix = prepareMetricPrefix(metricsConfig.rootAs[CommonMetricConfig], defaultInstanceId, namespace) val metricReporters = loadMetricsReporters() if (metricReporters.nonEmpty) { metricReporters.foreach { reporter => @@ -40,15 +43,6 @@ class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLoggin new JmxMetricsReporter().createAndRunReporter(metricRegistry, prefix, metricsConfig) } - private def preparePrefix(conf: CommonMetricConfig): MetricName = { - conf.prefix - .map(MetricName.build(_)) - .getOrElse(MetricName.empty()) - .tagged("instanceId", conf.instanceId.getOrElse(defaultInstanceId)) - .tagged("env", conf.environment) - .tagged(conf.additionalTags.asJava) - } - private def loadMetricsReporters(): List[MetricsReporter] = { try { val reporters = ScalaServiceLoader.load[MetricsReporter](Thread.currentThread().getContextClassLoader) @@ -61,20 +55,12 @@ class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLoggin } } - case class CommonMetricConfig( - prefix: Option[String], - instanceId: Option[String], - environment: String, - additionalTags: Map[String, String] = Map.empty - ) - } object LiteMetricRegistryFactory extends LazyLogging { - def usingHostnameAsDefaultInstanceId = new LiteMetricRegistryFactory(hostname) - - def usingHostnameAndPortAsDefaultInstanceId(port: Int) = new LiteMetricRegistryFactory(s"$hostname:$port") + def usingHostnameAsDefaultInstanceId(namespace: Option[Namespace]) = + new LiteMetricRegistryFactory(hostname, namespace) def hostname: String = { // Checking COMPUTERNAME to make it works also on windows, see: https://stackoverflow.com/a/33112997/1370301 @@ -84,4 +70,28 @@ object LiteMetricRegistryFactory extends LazyLogging { } } + private[dropwizard] def prepareMetricPrefix( + conf: CommonMetricConfig, + defaultInstanceId: => String, + namespace: Option[Namespace] + ): MetricName = { + val metricPrefix = conf.prefix + .map(MetricName.build(_)) + .getOrElse(MetricName.empty()) + .tagged("instanceId", conf.instanceId.getOrElse(defaultInstanceId)) + .tagged("env", conf.environment) + .tagged(conf.additionalTags.asJava) + namespace match { + case Some(Namespace(value, _)) => metricPrefix.tagged("namespace", value) + case None => metricPrefix + } + } + + private[dropwizard] case class CommonMetricConfig( + prefix: Option[String], + instanceId: Option[String], + environment: String, + additionalTags: Map[String, String] = Map.empty + ) + } diff --git a/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/metrics/dropwizard/LiteMetricRegistryFactoryTest.scala b/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/metrics/dropwizard/LiteMetricRegistryFactoryTest.scala new file mode 100644 index 00000000000..3f5e8481e0d --- /dev/null +++ b/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/metrics/dropwizard/LiteMetricRegistryFactoryTest.scala @@ -0,0 +1,38 @@ +package pl.touk.nussknacker.engine.lite.metrics.dropwizard + +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.namespaces.Namespace + +import scala.jdk.CollectionConverters._ + +class LiteMetricRegistryFactoryTest extends AnyFunSuiteLike with Matchers { + + import LiteMetricRegistryFactory._ + + private val commonMetricConf = CommonMetricConfig( + prefix = Some("a-prefix"), + instanceId = Some("an-instance"), + environment = "an-env", + additionalTags = Map("custom-tag" -> "custom-value"), + ) + + test("should configure metric prefix") { + val metricPrefix = prepareMetricPrefix(commonMetricConf, "default instance", namespace = None) + + metricPrefix.getKey shouldBe "a-prefix" + metricPrefix.getTags.asScala shouldBe Map( + "env" -> "an-env", + "instanceId" -> "an-instance", + "custom-tag" -> "custom-value", + ) + } + + test("should add namespace tag") { + val namespace = Namespace(value = "a-tenant", separator = "_") + val metricPrefix = prepareMetricPrefix(commonMetricConf, "default instance", Some(namespace)) + + metricPrefix.getTags.asScala should contain("namespace" -> "a-tenant") + } + +}