diff --git a/CHANGELOG.md b/CHANGELOG.md index d563c31341..ff0260f0bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ internal API changes are not present. Main (unreleased) ----------------- +### Enhancements + +- Improved performance by reducing allocation in Prometheus write pipelines by ~30% (@thampiotr) + v1.6.0-rc.1 ----------------- diff --git a/internal/component/prometheus/fanout.go b/internal/component/prometheus/fanout.go index a0d1165cda..f32e975891 100644 --- a/internal/component/prometheus/fanout.go +++ b/internal/component/prometheus/fanout.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" + "go.uber.org/atomic" "github.com/grafana/alloy/internal/service/labelstore" ) @@ -29,6 +30,10 @@ type Fanout struct { writeLatency prometheus.Histogram samplesCounter prometheus.Counter ls labelstore.LabelStore + + // lastSeriesCount stores the number of series that were sent through the last appender. It helps to estimate how + // much memory to allocate for the staleness trackers. + lastSeriesCount atomic.Int64 } // NewFanout creates a fanout appendable. @@ -77,11 +82,8 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender { app := &appender{ children: make([]storage.Appender, 0), - componentID: f.componentID, - writeLatency: f.writeLatency, - samplesCounter: f.samplesCounter, - ls: f.ls, - stalenessTrackers: make([]labelstore.StalenessTracker, 0), + fanout: f, + stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()), } for _, x := range f.children { @@ -95,12 +97,9 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender { type appender struct { children []storage.Appender - componentID string - writeLatency prometheus.Histogram - samplesCounter prometheus.Counter start time.Time - ls labelstore.LabelStore stalenessTrackers []labelstore.StalenessTracker + fanout *Fanout } var _ storage.Appender = (*appender)(nil) @@ -111,7 +110,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo a.start = time.Now() } if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l)) } a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{ GlobalRefID: uint64(ref), @@ -129,7 +128,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo } } if updated { - a.samplesCounter.Inc() + a.fanout.samplesCounter.Inc() } return ref, multiErr } @@ -138,7 +137,8 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo func (a *appender) Commit() error { defer a.recordLatency() var multiErr error - a.ls.TrackStaleness(a.stalenessTrackers) + a.fanout.lastSeriesCount.Store(int64(len(a.stalenessTrackers))) + a.fanout.ls.TrackStaleness(a.stalenessTrackers) for _, x := range a.children { err := x.Commit() if err != nil { @@ -151,7 +151,8 @@ func (a *appender) Commit() error { // Rollback satisfies the Appender interface. func (a *appender) Rollback() error { defer a.recordLatency() - a.ls.TrackStaleness(a.stalenessTrackers) + a.fanout.lastSeriesCount.Store(int64(len(a.stalenessTrackers))) + a.fanout.ls.TrackStaleness(a.stalenessTrackers) var multiErr error for _, x := range a.children { err := x.Rollback() @@ -167,7 +168,7 @@ func (a *appender) recordLatency() { return } duration := time.Since(a.start) - a.writeLatency.Observe(duration.Seconds()) + a.fanout.writeLatency.Observe(duration.Seconds()) } // AppendExemplar satisfies the Appender interface. @@ -176,7 +177,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem a.start = time.Now() } if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l)) } var multiErr error for _, x := range a.children { @@ -194,7 +195,7 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta a.start = time.Now() } if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l)) } var multiErr error for _, x := range a.children { @@ -211,7 +212,7 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int a.start = time.Now() } if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l)) } var multiErr error for _, x := range a.children { @@ -228,7 +229,7 @@ func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, a.start = time.Now() } if ref == 0 { - ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l)) } var multiErr error for _, x := range a.children { @@ -244,7 +245,7 @@ func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, type NoopMetadataStore map[string]scrape.MetricMetadata // GetMetadata implements the MetricMetadataStore interface. -func (ms NoopMetadataStore) GetMetadata(familyName string) (scrape.MetricMetadata, bool) { +func (ms NoopMetadataStore) GetMetadata(_ string) (scrape.MetricMetadata, bool) { return scrape.MetricMetadata{}, false } diff --git a/internal/component/prometheus/interceptor.go b/internal/component/prometheus/interceptor.go index a3bba13849..3dc307cedf 100644 --- a/internal/component/prometheus/interceptor.go +++ b/internal/component/prometheus/interceptor.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" + "go.uber.org/atomic" "github.com/grafana/alloy/internal/service/labelstore" ) @@ -26,6 +27,10 @@ type Interceptor struct { next storage.Appendable ls labelstore.LabelStore + + // lastSeriesCount stores the number of series that were sent through the last interceptappender. It helps to estimate how + // much memory to allocate for the staleness trackers. + lastSeriesCount atomic.Int64 } var _ storage.Appendable = (*Interceptor)(nil) @@ -91,7 +96,7 @@ func (f *Interceptor) Appender(ctx context.Context) storage.Appender { app := &interceptappender{ interceptor: f, ls: f.ls, - stalenessTrackers: make([]labelstore.StalenessTracker, 0), + stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()), } if f.next != nil { app.child = f.next.Appender(ctx) @@ -130,6 +135,7 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int // Commit satisfies the Appender interface. func (a *interceptappender) Commit() error { + a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers))) a.ls.TrackStaleness(a.stalenessTrackers) if a.child == nil { return nil @@ -139,6 +145,7 @@ func (a *interceptappender) Commit() error { // Rollback satisfies the Appender interface. func (a *interceptappender) Rollback() error { + a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers))) a.ls.TrackStaleness(a.stalenessTrackers) if a.child == nil { return nil