Skip to content

Commit

Permalink
[exporter/loadbalancing] Improve the performance when merging traces …
Browse files Browse the repository at this point in the history
…belonging to the same backend (open-telemetry#32032)

**Description:** no need to reimplement that in an extremely
allocation-inefficient fashion.

I'm actually not sure why mergeTraces() and mergeMetrics() need to exist
in the first place; all the other exporters coupled with the batch
processor work just fine, not sure why loadbalancing would be special.
open-telemetry#30141
seems to imply they were implemented to improve performance, but I don't
really understand why batch processor would not have been sufficient for
that improvement originally.

benchmarks before:
```
	goos: darwin
	goarch: arm64
	pkg: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter
	BenchmarkMergeTraces_X100-8     	   50214	     23507 ns/op
	BenchmarkMergeTraces_X500-8     	   10000	    113952 ns/op
	BenchmarkMergeTraces_X1000-8    	    5208	    226062 ns/op
	BenchmarkMergeMetrics_X100-8    	   64933	     18540 ns/op
	BenchmarkMergeMetrics_X500-8    	   12885	     91418 ns/op
	BenchmarkMergeMetrics_X1000-8   	    6590	    184584 ns/op
	PASS
	ok  	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter	9.783s
```
and after:
```
	goos: darwin
	goarch: arm64
	pkg: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter
	BenchmarkMergeTraces_X100-8     	295886529	         3.836 ns/op
	BenchmarkMergeTraces_X500-8     	309865370	         3.833 ns/op
	BenchmarkMergeTraces_X1000-8    	310739948	         3.800 ns/op
	BenchmarkMergeMetrics_X100-8    	315567813	         3.841 ns/op
	BenchmarkMergeMetrics_X500-8    	310341650	         3.849 ns/op
	BenchmarkMergeMetrics_X1000-8   	314292003	         3.830 ns/op
	PASS
	ok  	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter	10.733s
```
**Link to tracking Issue:** n/a
**Testing:** unit tests pass & cpu time for our collectors using
loadbalancingexporter (12 replicas, total of 25k-40k spans/sec) went
from 800ms-1400ms/sec down to <40msec/sec.
**Documentation:** none

---------

Co-authored-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
Lauri Tirkkonen and jpkrohling authored May 8, 2024
1 parent f617c65 commit 0e2f434
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 106 deletions.
27 changes: 27 additions & 0 deletions .chloggen/loadbalancingexporter_perf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Improve the performance when merging traces belonging to the same backend

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32032]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
110 changes: 4 additions & 106 deletions exporter/loadbalancingexporter/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,114 +10,12 @@ import (

// mergeTraces concatenates two ptrace.Traces into a single ptrace.Traces.
func mergeTraces(t1 ptrace.Traces, t2 ptrace.Traces) ptrace.Traces {
mergedTraces := ptrace.NewTraces()

if t1.SpanCount() == 0 && t2.SpanCount() == 0 {
return mergedTraces
}

// Iterate over the first trace and append spans to the merged traces
for i := 0; i < t1.ResourceSpans().Len(); i++ {
rs := t1.ResourceSpans().At(i)
newRS := mergedTraces.ResourceSpans().AppendEmpty()

rs.Resource().MoveTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())

for j := 0; j < rs.ScopeSpans().Len(); j++ {
ils := rs.ScopeSpans().At(j)

newILS := newRS.ScopeSpans().AppendEmpty()
ils.Scope().MoveTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())

for k := 0; k < ils.Spans().Len(); k++ {
span := ils.Spans().At(k)
newSpan := newILS.Spans().AppendEmpty()
span.MoveTo(newSpan)
}
}
}

// Iterate over the second trace and append spans to the merged traces
for i := 0; i < t2.ResourceSpans().Len(); i++ {
rs := t2.ResourceSpans().At(i)
newRS := mergedTraces.ResourceSpans().AppendEmpty()

rs.Resource().MoveTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())

for j := 0; j < rs.ScopeSpans().Len(); j++ {
ils := rs.ScopeSpans().At(j)

newILS := newRS.ScopeSpans().AppendEmpty()
ils.Scope().MoveTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())

for k := 0; k < ils.Spans().Len(); k++ {
span := ils.Spans().At(k)
newSpan := newILS.Spans().AppendEmpty()
span.MoveTo(newSpan)
}
}
}

return mergedTraces
t2.ResourceSpans().MoveAndAppendTo(t1.ResourceSpans())
return t1
}

// mergeMetrics concatenates two pmetric.Metrics into a single pmetric.Metrics.
func mergeMetrics(m1 pmetric.Metrics, m2 pmetric.Metrics) pmetric.Metrics {
mergedMetrics := pmetric.NewMetrics()

if m1.MetricCount() == 0 && m2.MetricCount() == 0 {
return mergedMetrics
}

// Iterate over the first metric and append metrics to the merged metrics
for i := 0; i < m1.ResourceMetrics().Len(); i++ {
rs := m1.ResourceMetrics().At(i)
newRS := mergedMetrics.ResourceMetrics().AppendEmpty()

rs.Resource().MoveTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())

for j := 0; j < rs.ScopeMetrics().Len(); j++ {
ils := rs.ScopeMetrics().At(j)

newILS := newRS.ScopeMetrics().AppendEmpty()
ils.Scope().MoveTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())

for k := 0; k < ils.Metrics().Len(); k++ {
metric := ils.Metrics().At(k)
newMetric := newILS.Metrics().AppendEmpty()
metric.MoveTo(newMetric)
}
}
}

// Iterate over the second metric and append metrics to the merged metrics
for i := 0; i < m2.ResourceMetrics().Len(); i++ {
rs := m2.ResourceMetrics().At(i)
newRS := mergedMetrics.ResourceMetrics().AppendEmpty()

rs.Resource().MoveTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())

for j := 0; j < rs.ScopeMetrics().Len(); j++ {
ils := rs.ScopeMetrics().At(j)

newILS := newRS.ScopeMetrics().AppendEmpty()
ils.Scope().MoveTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())

for k := 0; k < ils.Metrics().Len(); k++ {
metric := ils.Metrics().At(k)
newMetric := newILS.Metrics().AppendEmpty()
metric.MoveTo(newMetric)
}
}
}

return mergedMetrics
m2.ResourceMetrics().MoveAndAppendTo(m1.ResourceMetrics())
return m1
}

0 comments on commit 0e2f434

Please sign in to comment.