Skip to content

Commit

Permalink
feat: add external labels support to pyroscope.write AppendIngest (#2001
Browse files Browse the repository at this point in the history
)

* feat: add external labels support to pyroscope.write AppendIngest

* copy paste label parsing from pyroscope
  • Loading branch information
marcsanmi authored Nov 4, 2024
1 parent d75c507 commit 7167122
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 5 deletions.
245 changes: 245 additions & 0 deletions internal/component/pyroscope/write/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Package write
// This label parser is copy-pasted from grafana/pyroscope/pkg/og/storage/segment/key.go.
// TODO: Replace this copy with the upstream parser once it's moved to pyroscope/api.
package write

import (
"bytes"
"errors"
"fmt"
"sort"
"strings"
"sync"
)

const (
ReservedTagKeyName = "__name__"
)

type ParserState int

const (
nameParserState ParserState = iota
tagKeyParserState
tagValueParserState
doneParserState
)

var reservedTagKeys = []string{
ReservedTagKeyName,
}

type Key struct {
labels map[string]string
}

type parser struct {
parserState ParserState
key *bytes.Buffer
value *bytes.Buffer
}

var parserPool = sync.Pool{
New: func() any {
return &parser{
parserState: nameParserState,
key: new(bytes.Buffer),
value: new(bytes.Buffer),
}
},
}

func ParseKey(name string) (*Key, error) {
k := &Key{labels: make(map[string]string)}
p := parserPool.Get().(*parser)
defer parserPool.Put(p)
p.reset()
var err error
for _, r := range name + "{" {
switch p.parserState {
case nameParserState:
err = p.nameParserCase(r, k)
case tagKeyParserState:
p.tagKeyParserCase(r)
case tagValueParserState:
err = p.tagValueParserCase(r, k)
}
if err != nil {
return nil, err
}
}
return k, nil
}

func (p *parser) reset() {
p.parserState = nameParserState
p.key.Reset()
p.value.Reset()
}

func (p *parser) nameParserCase(r int32, k *Key) error {
switch r {
case '{':
p.parserState = tagKeyParserState
appName := strings.TrimSpace(p.value.String())
if err := validateAppName(appName); err != nil {
return err
}
k.labels["__name__"] = appName
default:
p.value.WriteRune(r)
}
return nil
}

func (p *parser) tagKeyParserCase(r rune) {
switch r {
case '}':
p.parserState = doneParserState
case '=':
p.parserState = tagValueParserState
p.value.Reset()
default:
p.key.WriteRune(r)
}
}

func (p *parser) tagValueParserCase(r rune, k *Key) error {
switch r {
case ',', '}':
p.parserState = tagKeyParserState
key := strings.TrimSpace(p.key.String())
if !isTagKeyReserved(key) {
if err := validateTagKey(key); err != nil {
return err
}
}
k.labels[key] = strings.TrimSpace(p.value.String())
p.key.Reset()
default:
p.value.WriteRune(r)
}
return nil
}

// Normalized is a helper for formatting the key back to string
func (k *Key) Normalized() string {
var sb strings.Builder

sortedMap := NewSortedMap()
for k, v := range k.labels {
if k == "__name__" {
sb.WriteString(v)
} else {
sortedMap.Put(k, v)
}
}

sb.WriteString("{")
for i, k := range sortedMap.Keys() {
v := sortedMap.Get(k).(string)
if i != 0 {
sb.WriteString(",")
}
sb.WriteString(k)
sb.WriteString("=")
sb.WriteString(v)
}
sb.WriteString("}")

return sb.String()
}

// SortedMap provides a deterministic way to iterate over map entries
type SortedMap struct {
data map[string]interface{}
keys []string
}

func NewSortedMap() *SortedMap {
return &SortedMap{
data: make(map[string]interface{}),
keys: make([]string, 0),
}
}

func (s *SortedMap) Put(k string, v interface{}) {
s.data[k] = v
i := sort.Search(len(s.keys), func(i int) bool { return s.keys[i] >= k })
s.keys = append(s.keys, "")
copy(s.keys[i+1:], s.keys[i:])
s.keys[i] = k
}

func (s *SortedMap) Get(k string) (v interface{}) {
return s.data[k]
}

func (s *SortedMap) Keys() []string {
return s.keys
}

func validateAppName(n string) error {
if len(n) == 0 {
return errors.New("application name is required")
}
for _, r := range n {
if !isAppNameRuneAllowed(r) {
return newInvalidAppNameRuneError(n, r)
}
}
return nil
}

func isAppNameRuneAllowed(r rune) bool {
return r == '-' || r == '.' || isTagKeyRuneAllowed(r)
}

func isTagKeyReserved(k string) bool {
for _, s := range reservedTagKeys {
if s == k {
return true
}
}
return false
}

func isTagKeyRuneAllowed(r rune) bool {
return (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '_' || r == '.'
}

func validateTagKey(k string) error {
if len(k) == 0 {
return errors.New("tag key is required")
}
for _, r := range k {
if !isTagKeyRuneAllowed(r) {
return newInvalidTagKeyRuneError(k, r)
}
}
if isTagKeyReserved(k) {
return newErr(errors.New("tag key is reserved"), k)
}
return nil
}

type Error struct {
Inner error
Expr string
}

func newInvalidAppNameRuneError(k string, r rune) *Error {
return newInvalidRuneError(errors.New("invalid application name"), k, r)
}

func newErr(err error, expr string) *Error { return &Error{Inner: err, Expr: expr} }

func (e *Error) Error() string { return e.Inner.Error() + ": " + e.Expr }

func newInvalidTagKeyRuneError(k string, r rune) *Error {
return newInvalidRuneError(errors.New("invalid tag key"), k, r)
}

func newInvalidRuneError(err error, k string, r rune) *Error {
return newErr(err, fmt.Sprintf("%s: character is not allowed: %q", k, r))
}
15 changes: 14 additions & 1 deletion internal/component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,20 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
}

u.Path = path.Join(u.Path, profile.URL.Path)
u.RawQuery = profile.URL.RawQuery

// Handle labels
query := profile.URL.Query()
if nameParam := query.Get("name"); nameParam != "" {
key, err := ParseKey(nameParam)
if err != nil {
return err
}
for k, v := range f.config.ExternalLabels {
key.labels[k] = v
}
query.Set("name", key.Normalized())
}
u.RawQuery = query.Encode()

req, err := http.NewRequestWithContext(ctx, "POST", u.String(), pipeReaders[i])
if err != nil {
Expand Down
23 changes: 19 additions & 4 deletions internal/component/pyroscope/write/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,26 @@ func Test_Write_AppendIngest(t *testing.T) {
)

testData := []byte("test-profile-data")
argument.ExternalLabels = map[string]string{
"env": "prod", // Should override env=staging
"cluster": "cluster-1", // Should be added
}

handlerFn := func(expectedPath, expectedQuery string) http.HandlerFunc {
handlerFn := func(expectedPath string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
appendCount.Inc()
require.Equal(t, expectedPath, r.URL.Path, "Unexpected path")
require.Equal(t, expectedQuery, r.URL.RawQuery, "Unexpected query")
require.Equal(t, "endpoint-value", r.Header.Get("X-Test-Header"))
require.Equal(t, []string{"profile-value1", "profile-value2"}, r.Header["X-Profile-Header"])

query := r.URL.Query()
name := query.Get("name")
require.Contains(t, name, "my.awesome.app.cpu", "Base name should be preserved")
require.Contains(t, name, "env=prod", "External label should override profile label")
require.Contains(t, name, "cluster=cluster-1", "External label should be added")
require.Contains(t, name, "region=us-west-1", "Profile-only label should be preserved")
require.Equal(t, "value", query.Get("key"), "Original query parameter should be preserved")

body, err := io.ReadAll(r.Body)
require.NoError(t, err, "Failed to read request body")
require.Equal(t, testData, body, "Unexpected body content")
Expand All @@ -284,7 +296,7 @@ func Test_Write_AppendIngest(t *testing.T) {
}

for i := int32(0); i < serverCount; i++ {
servers[i] = httptest.NewServer(handlerFn("/ingest", "key=value"))
servers[i] = httptest.NewServer(handlerFn("/ingest"))
endpoints = append(endpoints, &EndpointOptions{
URL: servers[i].URL,
RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout,
Expand Down Expand Up @@ -327,7 +339,10 @@ func Test_Write_AppendIngest(t *testing.T) {
"X-Test-Header": []string{"profile-value"}, // This should be overridden by endpoint
"X-Profile-Header": []string{"profile-value1", "profile-value2"}, // This should be preserved
},
URL: &url.URL{Path: "/ingest", RawQuery: "key=value"},
URL: &url.URL{
Path: "/ingest",
RawQuery: "name=my.awesome.app.cpu{env=staging,region=us-west-1}&key=value",
},
}

err = export.Receiver.Appender().AppendIngest(context.Background(), incomingProfile)
Expand Down

0 comments on commit 7167122

Please sign in to comment.