diff --git a/CHANGELOG.md b/CHANGELOG.md index 8279c67197..4b0f986478 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ Main (unreleased) - Add perf_schema quantile columns to collector - Live Debugging button should appear in UI only for supported components (@ravishankar15) +- Add json format support for log export via faro receiver (@ravishankar15) - Add three new stdlib functions to_base64, from_URLbase64 and to_URLbase64 (@ravishankar15) - Add `ignore_older_than` option for local.file_match (@ravishankar15) - Add livedebugging support for `discover.relabel` (@ravishankar15) diff --git a/docs/sources/reference/components/faro/faro.receiver.md b/docs/sources/reference/components/faro/faro.receiver.md index cdb70f9a7c..5101de3ebf 100644 --- a/docs/sources/reference/components/faro/faro.receiver.md +++ b/docs/sources/reference/components/faro/faro.receiver.md @@ -30,6 +30,14 @@ The following arguments are supported: Name | Type | Description | Default | Required -------------------|---------------|----------------------------------------------|---------|--------- `extra_log_labels` | `map(string)` | Extra labels to attach to emitted log lines. | `{}` | no +`log_format` | `string` | Export format for the logs. | `logfmt`| no + +### Log format + +The following strings are recognized as valid log line formats: + +* `"logfmt"`: Export logs as [logfmt](https://brandur.org/logfmt) lines. +* `"json"`: Export logs as JSON objects. ## Blocks diff --git a/internal/component/faro/receiver/arguments.go b/internal/component/faro/receiver/arguments.go index 915f472574..232f561a94 100644 --- a/internal/component/faro/receiver/arguments.go +++ b/internal/component/faro/receiver/arguments.go @@ -1,6 +1,8 @@ package receiver import ( + "encoding" + "fmt" "time" "github.com/alecthomas/units" @@ -13,6 +15,7 @@ import ( // Arguments configures the app_agent_receiver component. type Arguments struct { LogLabels map[string]string `alloy:"extra_log_labels,attr,optional"` + LogFormat LogFormat `alloy:"log_format,attr,optional"` Server ServerArguments `alloy:"server,block,optional"` SourceMaps SourceMapsArguments `alloy:"sourcemaps,block,optional"` @@ -23,6 +26,7 @@ var _ syntax.Defaulter = (*Arguments)(nil) // SetToDefault applies default settings. func (args *Arguments) SetToDefault() { + args.LogFormat = FormatDefault args.Server.SetToDefault() args.SourceMaps.SetToDefault() } @@ -93,3 +97,33 @@ type OutputArguments struct { Logs []loki.LogsReceiver `alloy:"logs,attr,optional"` Traces []otelcol.Consumer `alloy:"traces,attr,optional"` } + +type LogFormat string + +const ( + FormatLogfmt LogFormat = "logfmt" + FormatJSON LogFormat = "json" + + FormatDefault = FormatLogfmt +) + +var ( + _ encoding.TextMarshaler = FormatDefault + _ encoding.TextUnmarshaler = (*LogFormat)(nil) +) + +func (ll LogFormat) MarshalText() (text []byte, err error) { + return []byte(ll), nil +} + +func (ll *LogFormat) UnmarshalText(text []byte) error { + switch LogFormat(text) { + case "": + *ll = FormatDefault + case FormatLogfmt, FormatJSON: + *ll = LogFormat(text) + default: + return fmt.Errorf("unrecognized log format %q", string(text)) + } + return nil +} diff --git a/internal/component/faro/receiver/exporters.go b/internal/component/faro/receiver/exporters.go index 1543b126c7..c02b106083 100644 --- a/internal/component/faro/receiver/exporters.go +++ b/internal/component/faro/receiver/exporters.go @@ -2,6 +2,7 @@ package receiver import ( "context" + "encoding/json" "errors" "fmt" "sync" @@ -83,6 +84,7 @@ func (exp *metricsExporter) Export(ctx context.Context, p payload.Payload) error type logsExporter struct { log log.Logger sourceMaps sourceMapsStore + format LogFormat receiversMut sync.RWMutex receivers []loki.LogsReceiver @@ -93,10 +95,11 @@ type logsExporter struct { var _ exporter = (*logsExporter)(nil) -func newLogsExporter(log log.Logger, sourceMaps sourceMapsStore) *logsExporter { +func newLogsExporter(log log.Logger, sourceMaps sourceMapsStore, format LogFormat) *logsExporter { return &logsExporter{ log: log, sourceMaps: sourceMaps, + format: format, } } @@ -157,7 +160,19 @@ func (exp *logsExporter) sendKeyValsToLogsPipeline(ctx context.Context, kv *payl ) exp.receiversMut.RUnlock() - line, err := logfmt.MarshalKeyvals(payload.KeyValToInterfaceSlice(kv)...) + var ( + line []byte + err error + ) + switch exp.format { + case FormatLogfmt: + line, err = logfmt.MarshalKeyvals(payload.KeyValToInterfaceSlice(kv)...) + case FormatJSON: + line, err = json.Marshal(payload.KeyValToInterfaceMap(kv)) + default: + line, err = logfmt.MarshalKeyvals(payload.KeyValToInterfaceSlice(kv)...) + } + if err != nil { level.Error(exp.log).Log("msg", "failed to logfmt a frontend log event", "err", err) return err diff --git a/internal/component/faro/receiver/exporters_test.go b/internal/component/faro/receiver/exporters_test.go index 7f7511827e..8f562f1de7 100644 --- a/internal/component/faro/receiver/exporters_test.go +++ b/internal/component/faro/receiver/exporters_test.go @@ -4,10 +4,16 @@ import ( "context" "strings" "testing" + "time" + "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/faro/receiver/internal/payload" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/loki/v3/pkg/logproto" "github.com/prometheus/client_golang/prometheus" promtestutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) @@ -53,3 +59,279 @@ func Test_metricsExporter_Export(t *testing.T) { err := promtestutil.CollectAndCompare(reg, strings.NewReader(expect), metricNames...) require.NoError(t, err) } + +func Test_LogsExporter_Export(t *testing.T) { + now, err := time.Parse("2006-01-02T15:04:05Z0700", "2021-09-30T10:46:17.680Z") + require.NoError(t, err) + tt := []struct { + desc string + format LogFormat + payload payload.Payload + expect loki.Entry + }{ + { + desc: "export logfmt for log payload", + format: FormatLogfmt, + payload: payload.Payload{ + Logs: []payload.Log{ + { + Message: "React Router Future Flag Warning", + LogLevel: payload.LogLevelInfo, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("log"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-09-30 10:46:17.68 +0000 UTC" kind=log message="React Router Future Flag Warning" level=info traceID=a363d4f4417aa83158c437febd2d8838 spanID=42876ecc4c1feafa browser_mobile=false`, + }, + }, + }, + { + desc: "export logfmt for exception payload", + format: FormatLogfmt, + payload: payload.Payload{ + Exceptions: []payload.Exception{ + { + Type: "Error", + Value: "Cannot read property 'find' of undefined", + Timestamp: now, + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Function: "?", + Filename: "http://fe:3002/static/js/vendors~main.chunk.js", + Lineno: 8639, + Colno: 42, + }, + { + Function: "dispatchAction", + Filename: "http://fe:3002/static/js/vendors~main.chunk.js", + Lineno: 268095, + Colno: 9, + }, + }, + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("exception"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-09-30 10:46:17.68 +0000 UTC" kind=exception type=Error value="Cannot read property 'find' of undefined" stacktrace="Error: Cannot read property 'find' of undefined\n at ? (http://fe:3002/static/js/vendors~main.chunk.js:8639:42)\n at dispatchAction (http://fe:3002/static/js/vendors~main.chunk.js:268095:9)" hash=2735541995122471342 browser_mobile=false`, + }, + }, + }, + { + desc: "export logfmt for measurement payload", + format: FormatLogfmt, + payload: payload.Payload{ + Measurements: []payload.Measurement{ + { + Type: "sum", + Values: map[string]float64{ + "ttfp": 20.12, + "ttfcp": 22.12, + "ttfb": 14, + }, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + Context: payload.MeasurementContext{"host": "localhost"}, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("measurement"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-09-30 10:46:17.68 +0000 UTC" kind=measurement type=sum ttfb=14.000000 ttfcp=22.120000 ttfp=20.120000 traceID=a363d4f4417aa83158c437febd2d8838 spanID=42876ecc4c1feafa context_host=localhost value_ttfb=14 value_ttfcp=22.12 value_ttfp=20.12 browser_mobile=false`, + }, + }, + }, + { + desc: "export logfmt for event payload", + format: FormatLogfmt, + payload: payload.Payload{ + Events: []payload.Event{ + { + Name: "click_login_button", + Domain: "frontend", + Attributes: map[string]string{"button_name": "login"}, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("event"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-09-30 10:46:17.68 +0000 UTC" kind=event event_name=click_login_button event_domain=frontend event_data_button_name=login traceID=a363d4f4417aa83158c437febd2d8838 spanID=42876ecc4c1feafa browser_mobile=false`, + }, + }, + }, + { + desc: "export json for log payload", + format: FormatJSON, + payload: payload.Payload{ + Logs: []payload.Log{ + { + Message: "React Router Future Flag Warning", + LogLevel: payload.LogLevelInfo, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("log"), + }, + Entry: logproto.Entry{ + Line: `{"browser_mobile":"false","kind":"log","level":"info","message":"React Router Future Flag Warning","spanID":"42876ecc4c1feafa","timestamp":"2021-09-30 10:46:17.68 +0000 UTC","traceID":"a363d4f4417aa83158c437febd2d8838"}`, + }, + }, + }, + { + desc: "export json for exception payload", + format: FormatJSON, + payload: payload.Payload{ + Exceptions: []payload.Exception{ + { + Type: "Error", + Value: "Cannot read property 'find' of undefined", + Timestamp: now, + Stacktrace: &payload.Stacktrace{ + Frames: []payload.Frame{ + { + Function: "?", + Filename: "http://fe:3002/static/js/vendors~main.chunk.js", + Lineno: 8639, + Colno: 42, + }, + { + Function: "dispatchAction", + Filename: "http://fe:3002/static/js/vendors~main.chunk.js", + Lineno: 268095, + Colno: 9, + }, + }, + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("exception"), + }, + Entry: logproto.Entry{ + Line: `{"browser_mobile":"false","hash":"2735541995122471342","kind":"exception","stacktrace":"Error: Cannot read property 'find' of undefined\n at ? (http://fe:3002/static/js/vendors~main.chunk.js:8639:42)\n at dispatchAction (http://fe:3002/static/js/vendors~main.chunk.js:268095:9)","timestamp":"2021-09-30 10:46:17.68 +0000 UTC","type":"Error","value":"Cannot read property 'find' of undefined"}`, + }, + }, + }, + { + desc: "export json for measurement payload", + format: FormatJSON, + payload: payload.Payload{ + Measurements: []payload.Measurement{ + { + Type: "sum", + Values: map[string]float64{ + "ttfp": 20.12, + "ttfcp": 22.12, + "ttfb": 14, + }, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + Context: payload.MeasurementContext{"host": "localhost"}, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("measurement"), + }, + Entry: logproto.Entry{ + Line: `{"browser_mobile":"false","context_host":"localhost","kind":"measurement","spanID":"42876ecc4c1feafa","timestamp":"2021-09-30 10:46:17.68 +0000 UTC","traceID":"a363d4f4417aa83158c437febd2d8838","ttfb":"14.000000","ttfcp":"22.120000","ttfp":"20.120000","type":"sum","value_ttfb":14,"value_ttfcp":22.12,"value_ttfp":20.12}`, + }, + }, + }, + { + desc: "export json for event payload", + format: FormatJSON, + payload: payload.Payload{ + Events: []payload.Event{ + { + Name: "click_login_button", + Domain: "frontend", + Attributes: map[string]string{"button_name": "login"}, + Timestamp: now, + Trace: payload.TraceContext{ + TraceID: "a363d4f4417aa83158c437febd2d8838", + SpanID: "42876ecc4c1feafa", + }, + }, + }, + }, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("event"), + }, + Entry: logproto.Entry{ + Line: `{"browser_mobile":"false","event_data_button_name":"login","event_domain":"frontend","event_name":"click_login_button","kind":"event","spanID":"42876ecc4c1feafa","timestamp":"2021-09-30 10:46:17.68 +0000 UTC","traceID":"a363d4f4417aa83158c437febd2d8838"}`, + }, + }, + }, + } + for _, tc := range tt { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + var ( + lr = newFakeLogsReceiver(t) + exp = newLogsExporter(util.TestLogger(t), &varSourceMapsStore{}, tc.format) + ) + exp.SetReceivers([]loki.LogsReceiver{lr}) + exp.SetLabels(map[string]string{ + "foo": "bar", + "kind": "", + }) + ctx := componenttest.TestContext(t) + require.NoError(t, exp.Export(ctx, tc.payload)) + // Sleep for 2ms since fake logger process in separate go routine + time.Sleep(2 * time.Millisecond) + require.Len(t, lr.GetEntries(), 1) + require.Equal(t, tc.expect, lr.entries[0]) + }) + } +} diff --git a/internal/component/faro/receiver/receiver.go b/internal/component/faro/receiver/receiver.go index b5ed5e572b..aebab7ebce 100644 --- a/internal/component/faro/receiver/receiver.go +++ b/internal/component/faro/receiver/receiver.go @@ -54,7 +54,7 @@ func New(o component.Options, args Arguments) (*Component, error) { varStore = &varSourceMapsStore{} metrics = newMetricsExporter(o.Registerer) - logs = newLogsExporter(log.With(o.Logger, "exporter", "logs"), varStore) + logs = newLogsExporter(log.With(o.Logger, "exporter", "logs"), varStore, args.LogFormat) traces = newTracesExporter(log.With(o.Logger, "exporter", "traces")) ) diff --git a/internal/component/faro/receiver/receiver_test.go b/internal/component/faro/receiver/receiver_test.go index 84c35288ec..3c987b665b 100644 --- a/internal/component/faro/receiver/receiver_test.go +++ b/internal/component/faro/receiver/receiver_test.go @@ -21,54 +21,91 @@ import ( // Test performs an end-to-end test of the component. func Test(t *testing.T) { - ctx := componenttest.TestContext(t) - - ctrl, err := componenttest.NewControllerFromID( - util.TestLogger(t), - "faro.receiver", - ) - require.NoError(t, err) - - freePort, err := freeport.GetFreePort() - require.NoError(t, err) - - lr := newFakeLogsReceiver(t) - - go func() { - err := ctrl.Run(ctx, Arguments{ - LogLabels: map[string]string{ - "foo": "bar", - "kind": "", - }, - - Server: ServerArguments{ - Host: "127.0.0.1", - Port: freePort, - IncludeMetadata: true, + tt := []struct { + desc string + logFormat LogFormat + expect loki.Entry + }{ + { + desc: "format logfmt", + logFormat: FormatLogfmt, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("log"), + }, + Entry: logproto.Entry{ + Line: `timestamp="2021-01-01 00:00:00 +0000 UTC" kind=log message="hello, world" level=info context_env=dev traceID=0 spanID=0 browser_mobile=false`, + }, }, - - Output: OutputArguments{ - Logs: []loki.LogsReceiver{lr}, - Traces: []otelcol.Consumer{}, + }, + { + desc: "format json", + logFormat: FormatJSON, + expect: loki.Entry{ + Labels: model.LabelSet{ + "foo": model.LabelValue("bar"), + "kind": model.LabelValue("log"), + }, + Entry: logproto.Entry{ + Line: `{"browser_mobile":"false","context_env":"dev","kind":"log","level":"info","message":"hello, world","spanID":"0","timestamp":"2021-01-01 00:00:00 +0000 UTC","traceID":"0"}`, + }, }, - }) - require.NoError(t, err) - }() - - // Wait for the server to be running. - util.Eventually(t, func(t require.TestingT) { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/-/ready", freePort)) - require.NoError(t, err) - defer resp.Body.Close() - - require.Equal(t, http.StatusOK, resp.StatusCode) - }) + }, + } + for _, tc := range tt { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + ctx := componenttest.TestContext(t) + + ctrl, err := componenttest.NewControllerFromID( + util.TestLogger(t), + "faro.receiver", + ) + require.NoError(t, err) + + freePort, err := freeport.GetFreePort() + require.NoError(t, err) + + lr := newFakeLogsReceiver(t) + + go func() { + err := ctrl.Run(ctx, Arguments{ + LogLabels: map[string]string{ + "foo": "bar", + "kind": "", + }, + LogFormat: tc.logFormat, + + Server: ServerArguments{ + Host: "127.0.0.1", + Port: freePort, + IncludeMetadata: true, + }, + + Output: OutputArguments{ + Logs: []loki.LogsReceiver{lr}, + Traces: []otelcol.Consumer{}, + }, + }) + require.NoError(t, err) + }() + + // Wait for the server to be running. + util.Eventually(t, func(t require.TestingT) { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/-/ready", freePort)) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusOK, resp.StatusCode) + }) - // Send a sample payload to the server. - req, err := http.NewRequest( - "POST", - fmt.Sprintf("http://localhost:%d/collect", freePort), - strings.NewReader(`{ + // Send a sample payload to the server. + req, err := http.NewRequest( + "POST", + fmt.Sprintf("http://localhost:%d/collect", freePort), + strings.NewReader(`{ "traces": { "resourceSpans": [] }, @@ -86,36 +123,31 @@ func Test(t *testing.T) { "measurements": [], "meta": {} }`), - ) - require.NoError(t, err) + ) + require.NoError(t, err) - req.Header.Add("Tenant-Id", "TENANTID") - req.Header.Add("Content-Type", "application/json") + req.Header.Add("Tenant-Id", "TENANTID") + req.Header.Add("Content-Type", "application/json") - client := &http.Client{} - resp, err := client.Do(req) - require.NoError(t, err) - defer resp.Body.Close() + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() - require.Equal(t, http.StatusAccepted, resp.StatusCode) - require.Len(t, lr.GetEntries(), 1) + require.Equal(t, http.StatusAccepted, resp.StatusCode) + lr.wg.Wait() // Wait for the fakelogreceiver goroutine to process + require.Len(t, lr.GetEntries(), 1) - expect := loki.Entry{ - Labels: model.LabelSet{ - "foo": model.LabelValue("bar"), - "kind": model.LabelValue("log"), - }, - Entry: logproto.Entry{ - Line: `timestamp="2021-01-01 00:00:00 +0000 UTC" kind=log message="hello, world" level=info context_env=dev traceID=0 spanID=0 browser_mobile=false`, - }, + require.Equal(t, tc.expect, lr.entries[0]) + }) } - require.Equal(t, expect, lr.entries[0]) } type fakeLogsReceiver struct { ch chan loki.Entry entriesMut sync.RWMutex + wg sync.WaitGroup entries []loki.Entry } @@ -128,6 +160,7 @@ func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver { ch: make(chan loki.Entry, 1), } + lr.wg.Add(1) go func() { defer close(lr.ch) @@ -145,6 +178,7 @@ func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver { }, }) lr.entriesMut.Unlock() + lr.wg.Done() } }() diff --git a/internal/converter/internal/staticconvert/testdata-v2/integrations_v2.alloy b/internal/converter/internal/staticconvert/testdata-v2/integrations_v2.alloy index c454dbd84c..712f63ae59 100644 --- a/internal/converter/internal/staticconvert/testdata-v2/integrations_v2.alloy +++ b/internal/converter/internal/staticconvert/testdata-v2/integrations_v2.alloy @@ -23,6 +23,7 @@ logging { faro.receiver "integrations_app_agent_receiver" { extra_log_labels = {} + log_format = "" server { listen_address = "localhost" diff --git a/internal/converter/internal/staticconvert/testdata-v2/unsupported.alloy b/internal/converter/internal/staticconvert/testdata-v2/unsupported.alloy index 9f06a3cdc5..b7b1bf1dc7 100644 --- a/internal/converter/internal/staticconvert/testdata-v2/unsupported.alloy +++ b/internal/converter/internal/staticconvert/testdata-v2/unsupported.alloy @@ -18,6 +18,7 @@ loki.write "logs_log_config" { faro.receiver "integrations_app_agent_receiver" { extra_log_labels = {} + log_format = "" server { listen_address = "localhost"