Skip to content

Commit

Permalink
[apmbench] Make benchtest read intake events expvar
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Jan 25, 2025
1 parent 3048b8f commit 7aaf61e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 20 deletions.
18 changes: 12 additions & 6 deletions systemtest/benchtest/expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ type expvar struct {
}

type ElasticResponseStats struct {
TotalElasticResponses int64 `json:"apm-server.server.response.count"`
ErrorElasticResponses int64 `json:"apm-server.server.response.errors.count"`
TransactionsProcessed int64 `json:"apm-server.processor.transaction.transformations"`
SpansProcessed int64 `json:"apm-server.processor.span.transformations"`
MetricsProcessed int64 `json:"apm-server.processor.metric.transformations"`
ErrorsProcessed int64 `json:"apm-server.processor.error.transformations"`
TotalElasticResponses int64 `json:"apm-server.server.response.count"`
ErrorElasticResponses int64 `json:"apm-server.server.response.errors.count"`
TransactionsProcessed int64 `json:"apm-server.processor.transaction.transformations"`
SpansProcessed int64 `json:"apm-server.processor.span.transformations"`
MetricsProcessed int64 `json:"apm-server.processor.metric.transformations"`
ErrorsProcessed int64 `json:"apm-server.processor.error.transformations"`
IntakeEventsAccepted int64 `json:"apm-server.processor.stream.accepted"`
IntakeEventsErrorsInvalid int64 `json:"apm-server.processor.stream.errors.invalid"`
IntakeEventsErrorsTooLarge int64 `json:"apm-server.processor.stream.errors.toolarge"`
}

type OTLPResponseStats struct {
Expand Down Expand Up @@ -204,6 +207,9 @@ func aggregateResponseStats(from ElasticResponseStats, to *ElasticResponseStats)
to.SpansProcessed += from.SpansProcessed
to.TransactionsProcessed += from.TransactionsProcessed
to.TotalElasticResponses += from.TotalElasticResponses
to.IntakeEventsAccepted += from.IntakeEventsAccepted
to.IntakeEventsErrorsInvalid += from.IntakeEventsErrorsInvalid
to.IntakeEventsErrorsTooLarge += from.IntakeEventsErrorsTooLarge
}

func aggregateOTLPResponseStats(from OTLPResponseStats, to *OTLPResponseStats) {
Expand Down
6 changes: 6 additions & 0 deletions systemtest/benchtest/expvar/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
MemBytes
ActiveEvents
TotalEvents
IntakeEventsAccepted
IntakeEventsErrorsInvalid
IntakeEventsErrorsTooLarge
TransactionsProcessed
SpansProcessed
MetricsProcessed
Expand Down Expand Up @@ -154,6 +157,9 @@ func (c *Collector) accumulate(e expvar) {
c.processMetric(ActiveEvents, e.ActiveEvents)
c.processMetric(RSSMemoryBytes, e.RSSMemoryBytes)
c.processMetric(AvailableBulkRequests, e.AvailableBulkRequests)
c.processMetric(IntakeEventsAccepted, e.IntakeEventsAccepted)
c.processMetric(IntakeEventsErrorsInvalid, e.IntakeEventsErrorsInvalid)
c.processMetric(IntakeEventsErrorsTooLarge, e.IntakeEventsErrorsTooLarge)
c.processMetric(TransactionsProcessed, e.TransactionsProcessed)
c.processMetric(SpansProcessed, e.SpansProcessed)
c.processMetric(MetricsProcessed, e.MetricsProcessed)
Expand Down
2 changes: 2 additions & 0 deletions systemtest/benchtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func addExpvarMetrics(result *testing.BenchmarkResult, collector *expvar.Collect
result.MemBytes = uint64(collector.Delta(expvar.MemBytes))
result.Extra["events/sec"] = float64(collector.Delta(expvar.TotalEvents)) / result.T.Seconds()
if detailed {
result.Extra["intake_events_accepted/sec"] = float64(collector.Delta(expvar.IntakeEventsAccepted)) / result.T.Seconds()
result.Extra["intake_events_errors/sec"] = float64(collector.Delta(expvar.IntakeEventsErrorsInvalid)+collector.Delta(expvar.IntakeEventsErrorsTooLarge)) / result.T.Seconds()
result.Extra["txs/sec"] = float64(collector.Delta(expvar.TransactionsProcessed)) / result.T.Seconds()
result.Extra["spans/sec"] = float64(collector.Delta(expvar.SpansProcessed)) / result.T.Seconds()
result.Extra["metrics/sec"] = float64(collector.Delta(expvar.MetricsProcessed)) / result.T.Seconds()
Expand Down
33 changes: 19 additions & 14 deletions systemtest/benchtest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func TestAddExpvarMetrics(t *testing.T) {
detailed: true,
responseMetrics: []string{
`"libbeat.output.events.total": 24`,
`"apm-server.processor.stream.accepted": 111`,
`"apm-server.processor.stream.errors.invalid": 200`,
`"apm-server.processor.stream.errors.toolarge": 300`,
`"apm-server.processor.transaction.transformations": 7`,
`"apm-server.processor.span.transformations": 5`,
`"apm-server.processor.metric.transformations": 9`,
Expand All @@ -155,20 +158,22 @@ func TestAddExpvarMetrics(t *testing.T) {
`"HeapObjects": 102`,
},
expectedResult: map[string]float64{
"events/sec": 24,
"txs/sec": 7,
"spans/sec": 5,
"metrics/sec": 9,
"errors/sec": 3,
"gc_cycles": 10,
"max_rss": 1048576,
"max_goroutines": 4,
"max_heap_alloc": 10240,
"max_heap_objects": 102,
"mean_available_indexers": 0,
"error_responses/sec": 1,
"tbs_lsm_size": 10,
"tbs_vlog_size": 11,
"events/sec": 24,
"intake_events_accepted/sec": 111,
"intake_events_errors/sec": 500,
"txs/sec": 7,
"spans/sec": 5,
"metrics/sec": 9,
"errors/sec": 3,
"gc_cycles": 10,
"max_rss": 1048576,
"max_goroutines": 4,
"max_heap_alloc": 10240,
"max_heap_objects": 102,
"mean_available_indexers": 0,
"error_responses/sec": 1,
"tbs_lsm_size": 10,
"tbs_vlog_size": 11,
},
},
}
Expand Down

0 comments on commit 7aaf61e

Please sign in to comment.