diff --git a/internal/component/pyroscope/write/parser.go b/internal/component/pyroscope/write/parser.go new file mode 100644 index 0000000000..385a241862 --- /dev/null +++ b/internal/component/pyroscope/write/parser.go @@ -0,0 +1,245 @@ +// Package write +// This label parser is copy-pasted from grafana/pyroscope/pkg/og/storage/segment/key.go. +// TODO: Replace this copy with the upstream parser once it's moved to pyroscope/api. +package write + +import ( + "bytes" + "errors" + "fmt" + "sort" + "strings" + "sync" +) + +const ( + ReservedTagKeyName = "__name__" +) + +type ParserState int + +const ( + nameParserState ParserState = iota + tagKeyParserState + tagValueParserState + doneParserState +) + +var reservedTagKeys = []string{ + ReservedTagKeyName, +} + +type Key struct { + labels map[string]string +} + +type parser struct { + parserState ParserState + key *bytes.Buffer + value *bytes.Buffer +} + +var parserPool = sync.Pool{ + New: func() any { + return &parser{ + parserState: nameParserState, + key: new(bytes.Buffer), + value: new(bytes.Buffer), + } + }, +} + +func ParseKey(name string) (*Key, error) { + k := &Key{labels: make(map[string]string)} + p := parserPool.Get().(*parser) + defer parserPool.Put(p) + p.reset() + var err error + for _, r := range name + "{" { + switch p.parserState { + case nameParserState: + err = p.nameParserCase(r, k) + case tagKeyParserState: + p.tagKeyParserCase(r) + case tagValueParserState: + err = p.tagValueParserCase(r, k) + } + if err != nil { + return nil, err + } + } + return k, nil +} + +func (p *parser) reset() { + p.parserState = nameParserState + p.key.Reset() + p.value.Reset() +} + +func (p *parser) nameParserCase(r int32, k *Key) error { + switch r { + case '{': + p.parserState = tagKeyParserState + appName := strings.TrimSpace(p.value.String()) + if err := validateAppName(appName); err != nil { + return err + } + k.labels["__name__"] = appName + default: + p.value.WriteRune(r) + } + return nil +} + +func (p *parser) tagKeyParserCase(r rune) { + switch r { + case '}': + p.parserState = doneParserState + case '=': + p.parserState = tagValueParserState + p.value.Reset() + default: + p.key.WriteRune(r) + } +} + +func (p *parser) tagValueParserCase(r rune, k *Key) error { + switch r { + case ',', '}': + p.parserState = tagKeyParserState + key := strings.TrimSpace(p.key.String()) + if !isTagKeyReserved(key) { + if err := validateTagKey(key); err != nil { + return err + } + } + k.labels[key] = strings.TrimSpace(p.value.String()) + p.key.Reset() + default: + p.value.WriteRune(r) + } + return nil +} + +// Normalized is a helper for formatting the key back to string +func (k *Key) Normalized() string { + var sb strings.Builder + + sortedMap := NewSortedMap() + for k, v := range k.labels { + if k == "__name__" { + sb.WriteString(v) + } else { + sortedMap.Put(k, v) + } + } + + sb.WriteString("{") + for i, k := range sortedMap.Keys() { + v := sortedMap.Get(k).(string) + if i != 0 { + sb.WriteString(",") + } + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(v) + } + sb.WriteString("}") + + return sb.String() +} + +// SortedMap provides a deterministic way to iterate over map entries +type SortedMap struct { + data map[string]interface{} + keys []string +} + +func NewSortedMap() *SortedMap { + return &SortedMap{ + data: make(map[string]interface{}), + keys: make([]string, 0), + } +} + +func (s *SortedMap) Put(k string, v interface{}) { + s.data[k] = v + i := sort.Search(len(s.keys), func(i int) bool { return s.keys[i] >= k }) + s.keys = append(s.keys, "") + copy(s.keys[i+1:], s.keys[i:]) + s.keys[i] = k +} + +func (s *SortedMap) Get(k string) (v interface{}) { + return s.data[k] +} + +func (s *SortedMap) Keys() []string { + return s.keys +} + +func validateAppName(n string) error { + if len(n) == 0 { + return errors.New("application name is required") + } + for _, r := range n { + if !isAppNameRuneAllowed(r) { + return newInvalidAppNameRuneError(n, r) + } + } + return nil +} + +func isAppNameRuneAllowed(r rune) bool { + return r == '-' || r == '.' || isTagKeyRuneAllowed(r) +} + +func isTagKeyReserved(k string) bool { + for _, s := range reservedTagKeys { + if s == k { + return true + } + } + return false +} + +func isTagKeyRuneAllowed(r rune) bool { + return (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '_' || r == '.' +} + +func validateTagKey(k string) error { + if len(k) == 0 { + return errors.New("tag key is required") + } + for _, r := range k { + if !isTagKeyRuneAllowed(r) { + return newInvalidTagKeyRuneError(k, r) + } + } + if isTagKeyReserved(k) { + return newErr(errors.New("tag key is reserved"), k) + } + return nil +} + +type Error struct { + Inner error + Expr string +} + +func newInvalidAppNameRuneError(k string, r rune) *Error { + return newInvalidRuneError(errors.New("invalid application name"), k, r) +} + +func newErr(err error, expr string) *Error { return &Error{Inner: err, Expr: expr} } + +func (e *Error) Error() string { return e.Inner.Error() + ": " + e.Expr } + +func newInvalidTagKeyRuneError(k string, r rune) *Error { + return newInvalidRuneError(errors.New("invalid tag key"), k, r) +} + +func newInvalidRuneError(err error, k string, r rune) *Error { + return newErr(err, fmt.Sprintf("%s: character is not allowed: %q", k, r)) +} diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index d268edb28c..157287f539 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -373,7 +373,20 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco } u.Path = path.Join(u.Path, profile.URL.Path) - u.RawQuery = profile.URL.RawQuery + + // Handle labels + query := profile.URL.Query() + if nameParam := query.Get("name"); nameParam != "" { + key, err := ParseKey(nameParam) + if err != nil { + return err + } + for k, v := range f.config.ExternalLabels { + key.labels[k] = v + } + query.Set("name", key.Normalized()) + } + u.RawQuery = query.Encode() req, err := http.NewRequestWithContext(ctx, "POST", u.String(), pipeReaders[i]) if err != nil { diff --git a/internal/component/pyroscope/write/write_test.go b/internal/component/pyroscope/write/write_test.go index 74fa3d5612..b2c6debdae 100644 --- a/internal/component/pyroscope/write/write_test.go +++ b/internal/component/pyroscope/write/write_test.go @@ -268,14 +268,26 @@ func Test_Write_AppendIngest(t *testing.T) { ) testData := []byte("test-profile-data") + argument.ExternalLabels = map[string]string{ + "env": "prod", // Should override env=staging + "cluster": "cluster-1", // Should be added + } - handlerFn := func(expectedPath, expectedQuery string) http.HandlerFunc { + handlerFn := func(expectedPath string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { appendCount.Inc() require.Equal(t, expectedPath, r.URL.Path, "Unexpected path") - require.Equal(t, expectedQuery, r.URL.RawQuery, "Unexpected query") require.Equal(t, "endpoint-value", r.Header.Get("X-Test-Header")) require.Equal(t, []string{"profile-value1", "profile-value2"}, r.Header["X-Profile-Header"]) + + query := r.URL.Query() + name := query.Get("name") + require.Contains(t, name, "my.awesome.app.cpu", "Base name should be preserved") + require.Contains(t, name, "env=prod", "External label should override profile label") + require.Contains(t, name, "cluster=cluster-1", "External label should be added") + require.Contains(t, name, "region=us-west-1", "Profile-only label should be preserved") + require.Equal(t, "value", query.Get("key"), "Original query parameter should be preserved") + body, err := io.ReadAll(r.Body) require.NoError(t, err, "Failed to read request body") require.Equal(t, testData, body, "Unexpected body content") @@ -284,7 +296,7 @@ func Test_Write_AppendIngest(t *testing.T) { } for i := int32(0); i < serverCount; i++ { - servers[i] = httptest.NewServer(handlerFn("/ingest", "key=value")) + servers[i] = httptest.NewServer(handlerFn("/ingest")) endpoints = append(endpoints, &EndpointOptions{ URL: servers[i].URL, RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout, @@ -327,7 +339,10 @@ func Test_Write_AppendIngest(t *testing.T) { "X-Test-Header": []string{"profile-value"}, // This should be overridden by endpoint "X-Profile-Header": []string{"profile-value1", "profile-value2"}, // This should be preserved }, - URL: &url.URL{Path: "/ingest", RawQuery: "key=value"}, + URL: &url.URL{ + Path: "/ingest", + RawQuery: "name=my.awesome.app.cpu{env=staging,region=us-west-1}&key=value", + }, } err = export.Receiver.Appender().AppendIngest(context.Background(), incomingProfile)