Skip to content

Commit

Permalink
[NU-1954] Add namespace metric to Lite engine
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrz committed Jan 29, 2025
1 parent ad1afc4 commit 22bb107
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class EmbeddedDeploymentManagerProvider extends LiteDeploymentManagerProvider {
RequestResponseDeploymentStrategy(engineConfig)
)

val metricRegistry = LiteMetricRegistryFactory.usingHostnameAsDefaultInstanceId.prepareRegistry(engineConfig)
val metricRegistry = LiteMetricRegistryFactory
.usingHostnameAsDefaultInstanceId(modelData.namingStrategy.namespace)
.prepareRegistry(engineConfig)
val contextPreparer = new LiteEngineRuntimeContextPreparer(new DropwizardMetricsProviderFactory(metricRegistry))

strategy.open(modelData.asInvokableModelData, contextPreparer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cats.effect.{IO, Resource}
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader
import pl.touk.nussknacker.engine.api.namespaces.Namespace
import pl.touk.nussknacker.engine.api.{JobData, LiteStreamMetaData, ProcessVersion, RequestResponseMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.lite.RunnableScenarioInterpreter
Expand Down Expand Up @@ -36,7 +37,7 @@ object RunnableScenarioInterpreterFactory extends LazyLogging {
ModelClassLoader(urls, workingDirectoryOpt = None, deploymentManagersClassLoader),
resolveConfigs = true
)
val metricRegistry = prepareMetricRegistry(runtimeConfig)
val metricRegistry = prepareMetricRegistry(runtimeConfig, modelData.namingStrategy.namespace)
val preparer = new LiteEngineRuntimeContextPreparer(new DropwizardMetricsProviderFactory(metricRegistry))
// TODO Pass correct ProcessVersion and DeploymentData
val jobData = JobData(scenario.metaData, ProcessVersion.empty.copy(processName = scenario.metaData.name))
Expand Down Expand Up @@ -75,9 +76,9 @@ object RunnableScenarioInterpreterFactory extends LazyLogging {
}
}

private def prepareMetricRegistry(engineConfig: Config) = {
private def prepareMetricRegistry(engineConfig: Config, namespace: Option[Namespace]) = {
lazy val instanceId = sys.env.getOrElse("INSTANCE_ID", LiteMetricRegistryFactory.hostname)
new LiteMetricRegistryFactory(instanceId).prepareRegistry(engineConfig)
new LiteMetricRegistryFactory(instanceId, namespace).prepareRegistry(engineConfig)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import com.typesafe.scalalogging.LazyLogging
import io.dropwizard.metrics5.{MetricName, MetricRegistry}
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader
import pl.touk.nussknacker.engine.api.namespaces.Namespace
import pl.touk.nussknacker.engine.lite.metrics.dropwizard.influxdb.LiteEngineInfluxDbReporter
import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader

import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLogging {
class LiteMetricRegistryFactory(defaultInstanceId: => String, namespace: Option[Namespace]) extends LazyLogging {

import LiteMetricRegistryFactory._

val metricsConfigPath = "metrics"

Expand All @@ -28,7 +31,7 @@ class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLoggin
}

private def registerReporters(metricRegistry: MetricRegistry, metricsConfig: Config): Unit = {
val prefix = preparePrefix(metricsConfig.rootAs[CommonMetricConfig])
val prefix = prepareMetricPrefix(metricsConfig.rootAs[CommonMetricConfig], defaultInstanceId, namespace)
val metricReporters = loadMetricsReporters()
if (metricReporters.nonEmpty) {
metricReporters.foreach { reporter =>
Expand All @@ -40,15 +43,6 @@ class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLoggin
new JmxMetricsReporter().createAndRunReporter(metricRegistry, prefix, metricsConfig)
}

private def preparePrefix(conf: CommonMetricConfig): MetricName = {
conf.prefix
.map(MetricName.build(_))
.getOrElse(MetricName.empty())
.tagged("instanceId", conf.instanceId.getOrElse(defaultInstanceId))
.tagged("env", conf.environment)
.tagged(conf.additionalTags.asJava)
}

private def loadMetricsReporters(): List[MetricsReporter] = {
try {
val reporters = ScalaServiceLoader.load[MetricsReporter](Thread.currentThread().getContextClassLoader)
Expand All @@ -61,20 +55,12 @@ class LiteMetricRegistryFactory(defaultInstanceId: => String) extends LazyLoggin
}
}

case class CommonMetricConfig(
prefix: Option[String],
instanceId: Option[String],
environment: String,
additionalTags: Map[String, String] = Map.empty
)

}

object LiteMetricRegistryFactory extends LazyLogging {

def usingHostnameAsDefaultInstanceId = new LiteMetricRegistryFactory(hostname)

def usingHostnameAndPortAsDefaultInstanceId(port: Int) = new LiteMetricRegistryFactory(s"$hostname:$port")
def usingHostnameAsDefaultInstanceId(namespace: Option[Namespace]) =
new LiteMetricRegistryFactory(hostname, namespace)

def hostname: String = {
// Checking COMPUTERNAME to make it works also on windows, see: https://stackoverflow.com/a/33112997/1370301
Expand All @@ -84,4 +70,28 @@ object LiteMetricRegistryFactory extends LazyLogging {
}
}

private[dropwizard] def prepareMetricPrefix(
conf: CommonMetricConfig,
defaultInstanceId: => String,
namespace: Option[Namespace]
): MetricName = {
val metricPrefix = conf.prefix
.map(MetricName.build(_))
.getOrElse(MetricName.empty())
.tagged("instanceId", conf.instanceId.getOrElse(defaultInstanceId))
.tagged("env", conf.environment)
.tagged(conf.additionalTags.asJava)
namespace match {
case Some(Namespace(value, _)) => metricPrefix.tagged("namespace", value)
case None => metricPrefix
}
}

private[dropwizard] case class CommonMetricConfig(
prefix: Option[String],
instanceId: Option[String],
environment: String,
additionalTags: Map[String, String] = Map.empty
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pl.touk.nussknacker.engine.lite.metrics.dropwizard

import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.namespaces.Namespace

import scala.jdk.CollectionConverters._

class LiteMetricRegistryFactoryTest extends AnyFunSuiteLike with Matchers {

import LiteMetricRegistryFactory._

private val commonMetricConf = CommonMetricConfig(
prefix = Some("a-prefix"),
instanceId = Some("an-instance"),
environment = "an-env",
additionalTags = Map("custom-tag" -> "custom-value"),
)

test("should configure metric prefix") {
val metricPrefix = prepareMetricPrefix(commonMetricConf, "default instance", namespace = None)

metricPrefix.getKey shouldBe "a-prefix"
metricPrefix.getTags.asScala shouldBe Map(
"env" -> "an-env",
"instanceId" -> "an-instance",
"custom-tag" -> "custom-value",
)
}

test("should add namespace tag") {
val namespace = Namespace(value = "a-tenant", separator = "_")
val metricPrefix = prepareMetricPrefix(commonMetricConf, "default instance", Some(namespace))

metricPrefix.getTags.asScala should contain("namespace" -> "a-tenant")
}

}

0 comments on commit 22bb107

Please sign in to comment.