Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/statsdreceiver] Add support for parsing dogstatsd multi value lines #1

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/statsdreceiver_support_multivalue_dogstatsd_lines.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: statsdreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for multivalue DogStatsD metrics as described by the DogStatsD v1.1 protocol.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: []

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
55 changes: 34 additions & 21 deletions receiver/statsdreceiver/internal/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (p *StatsDParser) observerCategoryFor(t MetricType) ObserverCategory {

// Aggregate for each metric line.
func (p *StatsDParser) Aggregate(line string, addr net.Addr) error {
parsedMetric, err := parseMessageToMetric(line, p.enableMetricType, p.enableSimpleTags)
parsedMetrics, err := parseMessageToMetrics(line, p.enableMetricType, p.enableSimpleTags)
robskillington marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand All @@ -305,6 +305,14 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error {
p.instrumentsByAddress[addrKey] = instrument
}

for _, parsedMetric := range parsedMetrics {
p.aggregateMetric(instrument, parsedMetric)
}

return nil
}

func (p *StatsDParser) aggregateMetric(instrument *instruments, parsedMetric statsDMetric) {
robskillington marked this conversation as resolved.
Show resolved Hide resolved
switch parsedMetric.description.metricType {
case GaugeType:
_, ok := instrument.gauges[parsedMetric.description]
Expand Down Expand Up @@ -370,29 +378,27 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error {
// No action.
}
}

return nil
}

func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags bool) (statsDMetric, error) {
func parseMessageToMetrics(line string, enableMetricType bool, enableSimpleTags bool) ([]statsDMetric, error) {
result := statsDMetric{}

nameValue, rest, foundName := strings.Cut(line, "|")
if !foundName {
return result, fmt.Errorf("invalid message format: %s", line)
return nil, fmt.Errorf("invalid message format: %s", line)
}

name, valueStr, foundValue := strings.Cut(nameValue, ":")
if !foundValue {
return result, fmt.Errorf("invalid <name>:<value> format: %s", nameValue)
return nil, fmt.Errorf("invalid <name>:<value> format: %s", nameValue)
}

if name == "" {
return result, errEmptyMetricName
return nil, errEmptyMetricName
}
result.description.name = name
if valueStr == "" {
return result, errEmptyMetricValue
return nil, errEmptyMetricValue
}
if strings.HasPrefix(valueStr, "-") || strings.HasPrefix(valueStr, "+") {
result.addition = true
Expand All @@ -404,7 +410,7 @@ func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags b
case CounterType, GaugeType, HistogramType, TimingType, DistributionType:
result.description.metricType = inType
default:
return result, fmt.Errorf("unsupported metric type: %s", inType)
return nil, fmt.Errorf("unsupported metric type: %s", inType)
}

var kvs []attribute.KeyValue
Expand All @@ -418,7 +424,7 @@ func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags b

f, err := strconv.ParseFloat(sampleRateStr, 64)
if err != nil {
return result, fmt.Errorf("parse sample rate: %s", sampleRateStr)
return nil, fmt.Errorf("parse sample rate: %s", sampleRateStr)
}

result.sampleRate = f
Expand All @@ -436,13 +442,13 @@ func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags b
for ; len(tagSet) > 0; tagSet, tagsStr, _ = strings.Cut(tagsStr, ",") {
k, v, _ := strings.Cut(tagSet, ":")
if k == "" {
return result, fmt.Errorf("invalid tag format: %q", tagSet)
return nil, fmt.Errorf("invalid tag format: %q", tagSet)
}

// support both simple tags (w/o value) and dimension tags (w/ value).
// dogstatsd notably allows simple tags.
if v == "" && !enableSimpleTags {
return result, fmt.Errorf("invalid tag format: %q", tagSet)
return nil, fmt.Errorf("invalid tag format: %q", tagSet)
}

kvs = append(kvs, attribute.String(k, v))
Expand All @@ -459,25 +465,20 @@ func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags b
// As per DogStatD protocol v1.3:
// https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#dogstatsd-protocol-v13
if inType != CounterType && inType != GaugeType {
return result, errors.New("only GAUGE and COUNT metrics support a timestamp")
return nil, errors.New("only GAUGE and COUNT metrics support a timestamp")
}

timestampStr := strings.TrimPrefix(part, "T")
timestampSeconds, err := strconv.ParseUint(timestampStr, 10, 64)
if err != nil {
return result, fmt.Errorf("invalid timestamp: %s", timestampStr)
return nil, fmt.Errorf("invalid timestamp: %s", timestampStr)
}

result.timestamp = timestampSeconds * 1e9 // Convert seconds to nanoseconds
default:
return result, fmt.Errorf("unrecognized message part: %s", part)
return nil, fmt.Errorf("unrecognized message part: %s", part)
}
}
var err error
result.asFloat, err = strconv.ParseFloat(valueStr, 64)
if err != nil {
return result, fmt.Errorf("parse metric value string: %s", valueStr)
}

// add metric_type dimension for all metrics
if enableMetricType {
Expand All @@ -490,7 +491,19 @@ func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags b
result.description.attrs = attribute.NewSet(kvs...)
}

return result, nil
valueStrParts := strings.Split(valueStr, ":")
results := make([]statsDMetric, len(valueStrParts))
for i, valueStrPart := range valueStrParts {
var err error
result.asFloat, err = strconv.ParseFloat(valueStrPart, 64)
if err != nil {
return nil, fmt.Errorf("parse metric value string: %s", valueStr)
}

results[i] = result
}

return results, nil
robskillington marked this conversation as resolved.
Show resolved Hide resolved
}

type netAddr struct {
Expand Down
116 changes: 110 additions & 6 deletions receiver/statsdreceiver/internal/protocol/statsd_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,14 @@ func Test_ParseMessageToMetric(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseMessageToMetric(tt.input, false, false)
got, err := parseMessageToMetrics(tt.input, false, false)

if tt.err != nil {
assert.Equal(t, tt.err, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.wantMetric, got)
require.Len(t, got, 1)
assert.Equal(t, tt.wantMetric, got[0])
}
})
}
Expand Down Expand Up @@ -530,13 +531,14 @@ func Test_ParseMessageToMetricWithMetricType(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseMessageToMetric(tt.input, true, false)
got, err := parseMessageToMetrics(tt.input, true, false)

if tt.err != nil {
assert.Equal(t, tt.err, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.wantMetric, got)
require.Len(t, got, 1)
assert.Equal(t, tt.wantMetric, got[0])
}
})
}
Expand Down Expand Up @@ -595,13 +597,14 @@ func Test_ParseMessageToMetricWithSimpleTags(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseMessageToMetric(tt.input, false, true)
got, err := parseMessageToMetrics(tt.input, false, true)

if tt.err != nil {
assert.Equal(t, tt.err, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.wantMetric, got)
require.Len(t, got, 1)
assert.Equal(t, tt.wantMetric, got[0])
}
})
}
Expand Down Expand Up @@ -644,6 +647,19 @@ func testStatsDMetric(
}
}

func testStatsDMetrics(
name string, values []float64,
addition bool, metricType MetricType,
sampleRate float64, labelKeys []string,
labelValue []string, timestamp uint64,
) []statsDMetric {
metrics := make([]statsDMetric, len(values))
for i, v := range values {
metrics[i] = testStatsDMetric(name, v, false, metricType, sampleRate, labelKeys, labelValue, timestamp)
}
return metrics
}

func testDescription(name string, metricType MetricType, keys []string, values []string) statsDMetricDescription {
var kvs []attribute.KeyValue
for n, k := range keys {
Expand Down Expand Up @@ -1962,3 +1978,91 @@ func TestStatsDParser_IPOnlyAggregation(t *testing.T) {

assert.Equal(t, int64(4), value)
}

func Test_ParseMessageWithMultipleValuesToMetric(t *testing.T) {
tests := []struct {
name string
input string
wantMetrics []statsDMetric
wantErr string
}{
{
name: "multiple int values",
input: "test.metric:42:41:43|c|#key:value",
wantMetrics: testStatsDMetrics(
"test.metric",
[]float64{42, 41, 43},
false,
"c",
0.0,
[]string{"key"},
[]string{"value"},
0,
),
},
{
name: "multiple float values",
input: "test.metric:42.1:41.2:43.3|c|#key:value",
wantMetrics: testStatsDMetrics(
"test.metric",
[]float64{42.1, 41.2, 43.3},
false,
"c",
0.0,
[]string{"key"},
[]string{"value"},
0,
),
},
{
name: "mixed float and ints",
input: "test.metric:42.0:41:43.123|c|#key:value",
wantMetrics: testStatsDMetrics(
"test.metric",
[]float64{42.0, 41, 43.123},
false,
"c",
0.0,
[]string{"key"},
[]string{"value"},
0,
),
},
{
name: "value contains zero",
input: "test.metric:42:0:3|c|#key:value",
wantMetrics: testStatsDMetrics(
"test.metric",
[]float64{42, 0, 3},
false,
"c",
0.0,
[]string{"key"},
[]string{"value"},
0,
),
},
{
name: "invalid value",
input: "test.metric:42:41.hello|c|#key:value",
wantErr: "parse metric value string",
},
{
name: "trailing colon",
input: "test.metric:42:41:|c|#key:value",
wantErr: "parse metric value string",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseMessageToMetrics(tt.input, false, false)

if tt.wantErr != "" {
assert.ErrorContainsf(t, err, tt.wantErr, "")
} else {
assert.NoError(t, err)
assert.Equal(t, tt.wantMetrics, got)
}
})
}
}