From c6bd6f2da9d17491bae1a77fdbe62266f0b1aa4d Mon Sep 17 00:00:00 2001 From: Nicolas Dupeux Date: Mon, 25 Nov 2024 14:36:01 +0100 Subject: [PATCH] Add a prometheus label mapping component --- CHANGELOG.md | 2 + .../2025-prometheus-mapping-component.md | 45 ++++ .../sources/reference/compatibility/_index.md | 2 + .../prometheus/prometheus.mapping.md | 134 +++++++++++ internal/component/all/all.go | 1 + .../component/prometheus/mapping/mapping.go | 224 ++++++++++++++++++ .../prometheus/mapping/mapping_test.go | 81 +++++++ 7 files changed, 489 insertions(+) create mode 100644 docs/design/2025-prometheus-mapping-component.md create mode 100644 docs/sources/reference/components/prometheus/prometheus.mapping.md create mode 100644 internal/component/prometheus/mapping/mapping.go create mode 100644 internal/component/prometheus/mapping/mapping_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ae3c90519..a54941c628 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,8 @@ Main (unreleased) - Add a new `/-/healthy` endpoint which returns HTTP 500 if one or more components are unhealthy. (@ptodev) +- Add a `prometheus.mapping` component to add labels based on a source_label and a mapping table. (@vaxvms) + ### Enhancements - Add second metrics sample to the support bundle to provide delta information (@dehaansa) diff --git a/docs/design/2025-prometheus-mapping-component.md b/docs/design/2025-prometheus-mapping-component.md new file mode 100644 index 0000000000..400b9ce386 --- /dev/null +++ b/docs/design/2025-prometheus-mapping-component.md @@ -0,0 +1,45 @@ +# Proposal: Add a component to perform label mapping efficiently + +* Author(s): Nicolas DUPEUX +* Last updated: 19/11/2024 +* Original issue: https://github.com/grafana/alloy/pull/2025 + +## Abstract + +Add a component to populate labels values based on a lookup table. + +## Problem + +Using `prometheus.relabel` to populate a label value based on another label value is inefficient as we have to have a rule block for each source label value. + +If we have 1k values to map, we'll have to execute 1k regex for each datapoint resulting in an algorithm complexity of O(n). + +## Proposal + +Replace regex computing by a lookup table. Algorithm complexity goes from O(n) to O(1) + +## Pros and cons + +Pros: + - resource efficient + +Cons: + - New component + +## Alternative solutions + +- Instanciate more CPU resources to perform the task +- Optimize prometheus.relabel component +- Summarize regex when severals keys have to same value. + +## Compatibility + +As this is a new component, there isn't any compatibility issue as long as you don't use it. + +## Implementation + +https://github.com/grafana/alloy/pull/2025 + +## Related open issues + +None diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index da5471fbfb..f0215761a9 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -177,6 +177,7 @@ The following components, grouped by namespace, _export_ Prometheus `MetricsRece {{< /collapse >}} {{< collapse title="prometheus" >}} +- [prometheus.mapping](../components/prometheus/prometheus.mapping) - [prometheus.relabel](../components/prometheus/prometheus.relabel) - [prometheus.remote_write](../components/prometheus/prometheus.remote_write) - [prometheus.write.queue](../components/prometheus/prometheus.write.queue) @@ -196,6 +197,7 @@ The following components, grouped by namespace, _consume_ Prometheus `MetricsRec {{< /collapse >}} {{< collapse title="prometheus" >}} +- [prometheus.mapping](../components/prometheus/prometheus.mapping) - [prometheus.operator.podmonitors](../components/prometheus/prometheus.operator.podmonitors) - [prometheus.operator.probes](../components/prometheus/prometheus.operator.probes) - [prometheus.operator.servicemonitors](../components/prometheus/prometheus.operator.servicemonitors) diff --git a/docs/sources/reference/components/prometheus/prometheus.mapping.md b/docs/sources/reference/components/prometheus/prometheus.mapping.md new file mode 100644 index 0000000000..ba24051179 --- /dev/null +++ b/docs/sources/reference/components/prometheus/prometheus.mapping.md @@ -0,0 +1,134 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/prometheus/prometheus.mapping/ +description: Learn about prometheus.mapping +title: prometheus.mapping +--- +Public preview + +# prometheus.mapping + +{{< docs/shared lookup="stability/public_preview.md" source="alloy" version="" >}} + +Prometheus metrics follow the [OpenMetrics](https://openmetrics.io/) format. +Each time series is uniquely identified by its metric name, plus optional +key-value pairs called labels. Each sample represents a datapoint in the +time series and contains a value and an optional timestamp. + +```text +{=, = ...} [timestamp] +``` + +The `prometheus.mapping` component create new labels on each metric passed +along to the exported receiver by applying a mapping table to a label value. + +The most common use of `prometheus.mapping` is to create new labels with a high +cardinality source label value (>1k) when a large set of regular expressions are +inefficient. + +You can specify multiple `prometheus.mapping` components by giving them +different labels. + +## Usage + +```alloy +prometheus.mapping "LABEL" { + forward_to = RECEIVER_LIST + + source_label = "labelA" + + mapping = { + "from" = {"labelB" = "to"}, + ... + } +} +``` + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +---------------|---------------------------|---------------------------------------------------------------------|---------|--------- +`forward_to` | `list(MetricsReceiver)` | The receiver the metrics are forwarded to after they are relabeled. | | yes +`source_label` | `string` | Name of the source label to use for mapping. | | yes +`mapping` | `map(string,map(string))` | Mapping from source label value to target labels name/value. | | yes + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +-----------|-------------------|----------------------------------------------------------- +`receiver` | `MetricsReceiver` | The input receiver where samples are sent to be relabeled. + +## Component health + +`prometheus.mapping` is only reported as unhealthy if given an invalid configuration. +In those cases, exported fields are kept at their last healthy values. + +## Debug information + +`prometheus.mapping` doesn't expose any component-specific debug information. + +## Debug metrics + +* `prometheus_mapping_metrics_processed` (counter): Total number of metrics processed. +* `prometheus_mapping_metrics_written` (counter): Total number of metrics written. + +## Example + +Create an instance of a `prometheus.mapping` component. + +```alloy +prometheus.mapping "keep_backend_only" { + forward_to = [prometheus.remote_write.onprem.receiver] + + source_label = "app" + + mapping = { + "frontend" = {"team" = "teamA"} + "backend" = {"team" = "teamB"} + "database" = {"team" = "teamC"} + } +} +``` + +Use the following metrics. + +```text +metric_a{__address__ = "localhost", instance = "development", app = "frontend"} 10 +metric_a{__address__ = "localhost", instance = "development", app = "backend"} 2 +metric_a{__address__ = "cluster_a", instance = "production", app = "frontend"} 7 +metric_a{__address__ = "cluster_a", instance = "production", app = "backend"} 9 +metric_a{__address__ = "cluster_b", instance = "production", app = "database"} 4 +``` + +After applying the mapping a new `team` label is created based on mapping table and `app` label value. + +```text +metric_a{team = "teamA", __address__ = "localhost", instance = "development", app = "frontend"} 10 +metric_a{team = "teamB", __address__ = "localhost", instance = "development", app = "backend"} 2 +metric_a{team = "teamA", __address__ = "cluster_a", instance = "production", app = "frontend"} 7 +metric_a{team = "teamA", __address__ = "cluster_a", instance = "production", app = "backend"} 9 +metric_a{team = "teamC", __address__ = "cluster_a", instance = "production", app = "database"} 4 +``` + +The resulting metrics are propagated to each receiver defined in the `forward_to` argument. + + +## Compatible components + +`prometheus.mapping` can accept arguments from the following components: + +- Components that export [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-exporters) + +`prometheus.mapping` has exports that can be consumed by the following components: + +- Components that consume [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 65d48a019e..1ea4bfd1eb 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -134,6 +134,7 @@ import ( _ "github.com/grafana/alloy/internal/component/prometheus/exporter/statsd" // Import prometheus.exporter.statsd _ "github.com/grafana/alloy/internal/component/prometheus/exporter/unix" // Import prometheus.exporter.unix _ "github.com/grafana/alloy/internal/component/prometheus/exporter/windows" // Import prometheus.exporter.windows + _ "github.com/grafana/alloy/internal/component/prometheus/mapping" // Import prometheus.mapping _ "github.com/grafana/alloy/internal/component/prometheus/operator/podmonitors" // Import prometheus.operator.podmonitors _ "github.com/grafana/alloy/internal/component/prometheus/operator/probes" // Import prometheus.operator.probes _ "github.com/grafana/alloy/internal/component/prometheus/operator/servicemonitors" // Import prometheus.operator.servicemonitors diff --git a/internal/component/prometheus/mapping/mapping.go b/internal/component/prometheus/mapping/mapping.go new file mode 100644 index 0000000000..352ed06bbf --- /dev/null +++ b/internal/component/prometheus/mapping/mapping.go @@ -0,0 +1,224 @@ +package mapping + +import ( + "context" + "fmt" + "sync" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" + prometheus_client "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" + "go.uber.org/atomic" +) + +const name = "prometheus.mapping" + +func init() { + component.Register(component.Registration{ + Name: name, + Stability: featuregate.StabilityPublicPreview, + Args: Arguments{}, + Exports: Exports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +// Arguments holds values which are used to configure the prometheus.relabel +// component. +type Arguments struct { + // Where the relabelled metrics should be forwarded to. + ForwardTo []storage.Appendable `alloy:"forward_to,attr"` + + // Labels to use for mapping + SourceLabel string `alloy:"source_label,attr"` + + // Mapping + LabelValuesMapping map[string]map[string]string `alloy:"mapping,attr"` +} + +// SetToDefault implements syntax.Defaulter. +func (arg *Arguments) SetToDefault() { +} + +// Validate implements syntax.Validator. +func (arg *Arguments) Validate() error { + return nil +} + +// Exports holds values which are exported by the prometheus.relabel component. +type Exports struct { + Receiver storage.Appendable `alloy:"receiver,attr"` +} + +// Component implements the prometheus.mapping component. +type Component struct { + sourceLabel string + + mappings map[string]map[string]string + mut sync.RWMutex + opts component.Options + receiver *prometheus.Interceptor + metricsProcessed prometheus_client.Counter + metricsOutgoing prometheus_client.Counter + fanout *prometheus.Fanout + exited atomic.Bool + ls labelstore.LabelStore + + debugDataPublisher livedebugging.DebugDataPublisher +} + +var ( + _ component.Component = (*Component)(nil) + _ component.LiveDebugging = (*Component)(nil) +) + +// New creates a new prometheus.mapping component. +func New(o component.Options, args Arguments) (*Component, error) { + debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName) + if err != nil { + return nil, err + } + + data, err := o.GetServiceData(labelstore.ServiceName) + if err != nil { + return nil, err + } + c := &Component{ + opts: o, + ls: data.(labelstore.LabelStore), + sourceLabel: args.SourceLabel, + mappings: args.LabelValuesMapping, + debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), + } + c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{ + Name: "alloy_prometheus_mapping_metrics_processed", + Help: "Total number of metrics processed", + }) + c.metricsOutgoing = prometheus_client.NewCounter(prometheus_client.CounterOpts{ + Name: "alloy_prometheus_mapping_metrics_written", + Help: "Total number of metrics written", + }) + + for _, metric := range []prometheus_client.Collector{c.metricsProcessed, c.metricsOutgoing} { + err = o.Registerer.Register(metric) + if err != nil { + return nil, err + } + } + + c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer, c.ls) + c.receiver = prometheus.NewInterceptor( + c.fanout, + c.ls, + prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + if c.exited.Load() { + return 0, fmt.Errorf("%s has exited", o.ID) + } + + newLbl := c.mapping(l) + if newLbl.IsEmpty() { + return 0, nil + } + c.metricsOutgoing.Inc() + return next.Append(0, newLbl, t, v) + }), + prometheus.WithExemplarHook(func(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { + if c.exited.Load() { + return 0, fmt.Errorf("%s has exited", o.ID) + } + + newLbl := c.mapping(l) + if newLbl.IsEmpty() { + return 0, nil + } + return next.AppendExemplar(0, newLbl, e) + }), + prometheus.WithMetadataHook(func(_ storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { + if c.exited.Load() { + return 0, fmt.Errorf("%s has exited", o.ID) + } + + newLbl := c.mapping(l) + if newLbl.IsEmpty() { + return 0, nil + } + return next.UpdateMetadata(0, newLbl, m) + }), + prometheus.WithHistogramHook(func(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) { + if c.exited.Load() { + return 0, fmt.Errorf("%s has exited", o.ID) + } + + newLbl := c.mapping(l) + if newLbl.IsEmpty() { + return 0, nil + } + return next.AppendHistogram(0, newLbl, t, h, fh) + }), + ) + + // Immediately export the receiver which remains the same for the component + // lifetime. + o.OnStateChange(Exports{Receiver: c.receiver}) + + // Call to Update() to set the relabelling rules once at the start. + if err = c.Update(args); err != nil { + return nil, err + } + + return c, nil +} + +// Run implements component.Component. +func (c *Component) Run(ctx context.Context) error { + defer c.exited.Store(true) + + <-ctx.Done() + return nil +} + +// Update implements component.Component. +func (c *Component) Update(args component.Arguments) error { + c.mut.Lock() + defer c.mut.Unlock() + + newArgs := args.(Arguments) + c.sourceLabel = newArgs.SourceLabel + c.mappings = newArgs.LabelValuesMapping + c.fanout.UpdateChildren(newArgs.ForwardTo) + + c.opts.OnStateChange(Exports{Receiver: c.receiver}) + + return nil +} + +func (c *Component) mapping(lbls labels.Labels) labels.Labels { + // Relabel against a copy of the labels to prevent modifying the original + // slice. + lb := labels.NewBuilder(lbls.Copy()) + sourceValue := lb.Get(c.sourceLabel) + for labelName, labelValue := range c.mappings[sourceValue] { + lb.Set(labelName, labelValue) + } + newLabels := lb.Labels() + + componentID := livedebugging.ComponentID(c.opts.ID) + if c.debugDataPublisher.IsActive(componentID) { + c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lbls.String(), newLabels.String())) + } + + return newLabels +} + +func (c *Component) LiveDebugging(_ int) {} diff --git a/internal/component/prometheus/mapping/mapping_test.go b/internal/component/prometheus/mapping/mapping_test.go new file mode 100644 index 0000000000..235a5305ce --- /dev/null +++ b/internal/component/prometheus/mapping/mapping_test.go @@ -0,0 +1,81 @@ +package mapping + +import ( + "fmt" + "testing" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" + "github.com/grafana/alloy/internal/util" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" +) + +func TestValidator(t *testing.T) { + args := Arguments{} + err := args.Validate() + require.NoError(t, err) +} + +func TestMapping(t *testing.T) { + mapper := generateMapping(t) + lbls := labels.FromStrings("source", "value1") + newLbls := mapper.mapping(lbls) + require.True(t, newLbls.Has("target")) +} + +func TestMappingEmptySourceLabelValue(t *testing.T) { + mapper := generateMapping(t) + lbls := labels.FromStrings("source", "") + newLbls := mapper.mapping(lbls) + require.True(t, newLbls.Has("target")) + require.Equal(t, newLbls.Get("target"), "empty") +} + +func TestMappingEmptyTargetLabelValue(t *testing.T) { + mapper := generateMapping(t) + lbls := labels.FromStrings("source", "value2") + newLbls := mapper.mapping(lbls) + require.False(t, newLbls.Has("target")) +} + +func generateMapping(t *testing.T) *Component { + ls := labelstore.New(nil, prom.DefaultRegisterer) + fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { + require.True(t, l.Has("new_label")) + return ref, nil + })) + mapper, err := New(component.Options{ + ID: "1", + Logger: util.TestAlloyLogger(t), + OnStateChange: func(e component.Exports) {}, + Registerer: prom.NewRegistry(), + GetServiceData: getServiceData, + }, Arguments{ + ForwardTo: []storage.Appendable{fanout}, + SourceLabel: "source", + LabelValuesMapping: map[string]map[string]string{ + "": {"target": "empty"}, + "value1": {"target": "eulav"}, + "value2": {}, + }, + }) + require.NotNil(t, mapper) + require.NoError(t, err) + return mapper +} + +func getServiceData(name string) (interface{}, error) { + switch name { + case labelstore.ServiceName: + return labelstore.New(nil, prom.DefaultRegisterer), nil + case livedebugging.ServiceName: + return livedebugging.NewLiveDebugging(), nil + default: + return nil, fmt.Errorf("service not found %s", name) + } +}