From 1c762e0846bddf86a9b5d0531dfa855ea7456182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Ch=C3=A1vez?= Date: Wed, 9 Sep 2020 18:38:56 +0200 Subject: [PATCH 1/2] feat(grpc): adds support for grpc parsing. --- go.sum | 2 + middleware/grpc/client.go | 27 ++++- middleware/grpc/grpc_suite_test.go | 38 ++++++- middleware/grpc/server.go | 31 +++++- middleware/grpc/server_parser_test.go | 138 ++++++++++++++++++++++++++ middleware/grpc/shared.go | 20 +++- 6 files changed, 250 insertions(+), 6 deletions(-) create mode 100644 middleware/grpc/server_parser_test.go diff --git a/go.sum b/go.sum index 135cc7d2..8883806c 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,7 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= @@ -88,6 +89,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/middleware/grpc/client.go b/middleware/grpc/client.go index f39b0e81..ba44d6ca 100644 --- a/middleware/grpc/client.go +++ b/middleware/grpc/client.go @@ -28,6 +28,7 @@ import ( type clientHandler struct { tracer *zipkin.Tracer remoteServiceName string + handleRPCParser handleRPCParser } // A ClientOption can be passed to NewClientHandler to customize the returned handler. @@ -41,6 +42,30 @@ func WithRemoteServiceName(name string) ClientOption { } } +// WithClientInPayloadParser adds a parser for the stats.InPayload to be able to access +// the request payload +func WithClientInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ClientOption { + return func(h *clientHandler) { + h.handleRPCParser.inPayload = parser + } +} + +// WithClientInTrailerParser adds a parser for the stats.InTrailer to be able to access +// the request trailer +func WithClientInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ClientOption { + return func(h *clientHandler) { + h.handleRPCParser.inTrailer = parser + } +} + +// WithClientInHeaderParser adds a parser for the stats.InHeader to be able to access +// the request payload +func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ClientOption { + return func(h *clientHandler) { + h.handleRPCParser.inHeader = parser + } +} + // NewClientHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add // tracing to a gRPC client. The gRPC method name is used as the span name and by default the only // tags are the gRPC status code if the call fails. @@ -67,7 +92,7 @@ func (c *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con // HandleRPC implements per-RPC tracing and stats instrumentation. func (c *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - handleRPC(ctx, rs) + handleRPC(ctx, rs, c.handleRPCParser) } // TagRPC implements per-RPC context management. diff --git a/middleware/grpc/grpc_suite_test.go b/middleware/grpc/grpc_suite_test.go index bc233b6c..e8497350 100644 --- a/middleware/grpc/grpc_suite_test.go +++ b/middleware/grpc/grpc_suite_test.go @@ -17,6 +17,7 @@ package grpc_test import ( "context" "errors" + "log" "net" "testing" @@ -26,6 +27,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" "github.com/openzipkin/zipkin-go" zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc" @@ -121,7 +123,7 @@ func (g *sequentialIdGenerator) reset() { g.nextSpanId = g.start } -type TestHelloService struct{ +type TestHelloService struct { service.UnimplementedHelloServiceServer } @@ -158,3 +160,37 @@ func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest) return resp, nil } + +func initListener(s *grpc.Server) func(context.Context, string) (net.Conn, error) { + const bufSize = 1024 * 1024 + + listener := bufconn.Listen(bufSize) + bufDialer := func(context.Context, string) (net.Conn, error) { + return listener.Dial() + } + + go func() { + if err := s.Serve(listener); err != nil { + log.Fatalf("Server exited with error: %v", err) + } + }() + + return bufDialer +} + +func createTracer(joinSpans bool) (*zipkin.Tracer, func() []model.SpanModel) { + recorder := recorder.NewReporter() + ep, _ := zipkin.NewEndpoint("grpc-server", "") + + serverIdGenerator = newSequentialIdGenerator(0x1000000) + + tracer, _ := zipkin.NewTracer( + recorder, + zipkin.WithLocalEndpoint(ep), + zipkin.WithSharedSpans(joinSpans), + zipkin.WithIDGenerator(serverIdGenerator), + ) + return tracer, func() []model.SpanModel { + return recorder.Flush() + } +} diff --git a/middleware/grpc/server.go b/middleware/grpc/server.go index 8f7faac1..f229420e 100644 --- a/middleware/grpc/server.go +++ b/middleware/grpc/server.go @@ -25,8 +25,9 @@ import ( ) type serverHandler struct { - tracer *zipkin.Tracer - defaultTags map[string]string + tracer *zipkin.Tracer + defaultTags map[string]string + handleRPCParser handleRPCParser } // A ServerOption can be passed to NewServerHandler to customize the returned handler. @@ -39,6 +40,30 @@ func ServerTags(tags map[string]string) ServerOption { } } +// WithServerInPayloadParser adds a parser for the stats.InPayload to be able to access +// the request payload +func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ServerOption { + return func(h *serverHandler) { + h.handleRPCParser.inPayload = parser + } +} + +// WithserverInTrailerParser adds a parser for the stats.InTrailer to be able to access +// the request trailer +func WithserverInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ServerOption { + return func(h *serverHandler) { + h.handleRPCParser.inTrailer = parser + } +} + +// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access +// the request payload +func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ServerOption { + return func(h *serverHandler) { + h.handleRPCParser.inHeader = parser + } +} + // NewServerHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add // tracing to a gRPC server. The gRPC method name is used as the span name and by default the only // tags are the gRPC status code if the call fails. Use ServerTags to add additional tags that @@ -66,7 +91,7 @@ func (s *serverHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con // HandleRPC implements per-RPC tracing and stats instrumentation. func (s *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - handleRPC(ctx, rs) + handleRPC(ctx, rs, s.handleRPCParser) } // TagRPC implements per-RPC context management. diff --git a/middleware/grpc/server_parser_test.go b/middleware/grpc/server_parser_test.go new file mode 100644 index 00000000..979e3224 --- /dev/null +++ b/middleware/grpc/server_parser_test.go @@ -0,0 +1,138 @@ +// Copyright 2019 The OpenZipkin Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc_test + +import ( + "context" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" + + "github.com/openzipkin/zipkin-go" + zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc" + "github.com/openzipkin/zipkin-go/model" + service "github.com/openzipkin/zipkin-go/proto/testing" +) + +func TestGRPCServerCreatesASpanAndContext(t *testing.T) { + tracer, flusher := createTracer(false) + + s := grpc.NewServer( + grpc.StatsHandler( + zipkingrpc.NewServerHandler( + tracer, + zipkingrpc.ServerTags(map[string]string{"default": "tag"}), + ), + ), + ) + defer s.Stop() + + service.RegisterHelloServiceServer(s, &TestHelloService{}) + + dialer := initListener(s) + + ctx := context.Background() + conn, err := grpc.DialContext( + ctx, + "bufnet", + grpc.WithContextDialer(dialer), + grpc.WithInsecure(), + ) + if err != nil { + t.Fatalf("Failed to dial bufnet: %v", err) + } + defer conn.Close() + + client := service.NewHelloServiceClient(conn) + + _, err = client.Hello(ctx, &service.HelloRequest{ + Payload: "Hello", + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + spans := flusher() + if want, have := 1, len(spans); want != have { + t.Errorf("unexpected number of spans, want %d, have %d", want, have) + } + + span := spans[0] + if want, have := model.Server, span.Kind; want != have { + t.Errorf("unexpected kind, want %q, have %q", want, have) + } +} + +func TestGRPCServerCanAccessToHeaders(t *testing.T) { + tracer, flusher := createTracer(false) + + s := grpc.NewServer( + grpc.StatsHandler( + zipkingrpc.NewServerHandler( + tracer, + zipkingrpc.ServerTags(map[string]string{"default": "tag"}), + zipkingrpc.WithServerInHeaderParser(func(inHeader *stats.InHeader, span zipkin.Span) { + if want, have := "test_value", inHeader.Header.Get("test_key")[0]; want != have { + t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have) + } + }), + zipkingrpc.WithServerInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.Span) { + if want, have := "test_value", inTrailer.Trailer.Get("test_key")[0]; want != have { + t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have) + } + }), + ), + ), + ) + defer s.Stop() + + service.RegisterHelloServiceServer(s, &TestHelloService{}) + + dialer := initListener(s) + + ctx := context.Background() + conn, err := grpc.DialContext( + ctx, + "bufnet", + grpc.WithContextDialer(dialer), + grpc.WithInsecure(), + ) + if err != nil { + t.Fatalf("Failed to dial bufnet: %v", err) + } + defer conn.Close() + + client := service.NewHelloServiceClient(conn) + + ctx = metadata.AppendToOutgoingContext(ctx, "test_key", "test_value") + _, err = client.Hello(ctx, &service.HelloRequest{ + Payload: "Hello", + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + spans := flusher() + if want, have := 1, len(spans); want != have { + t.Errorf("unexpected number of spans, want %d, have %d", want, have) + } + + span := spans[0] + if want, have := model.Server, span.Kind; want != have { + t.Errorf("unexpected kind, want %q, have %q", want, have) + } +} diff --git a/middleware/grpc/shared.go b/middleware/grpc/shared.go index c32c0bbd..8cc14ca0 100644 --- a/middleware/grpc/shared.go +++ b/middleware/grpc/shared.go @@ -27,6 +27,12 @@ import ( "github.com/openzipkin/zipkin-go/model" ) +type handleRPCParser struct { + inPayload func(*stats.InPayload, zipkin.Span) + inTrailer func(*stats.InTrailer, zipkin.Span) + inHeader func(*stats.InHeader, zipkin.Span) +} + // A RPCHandler can be registered using WithClientRPCHandler or WithServerRPCHandler to intercept calls to HandleRPC of // a handler for additional span customization. type RPCHandler func(span zipkin.Span, rpcStats stats.RPCStats) @@ -37,10 +43,22 @@ func spanName(rti *stats.RPCTagInfo) string { return name } -func handleRPC(ctx context.Context, rs stats.RPCStats) { +func handleRPC(ctx context.Context, rs stats.RPCStats, h handleRPCParser) { span := zipkin.SpanFromContext(ctx) switch rs := rs.(type) { + case *stats.InPayload: + if h.inPayload != nil { + h.inPayload(rs, span) + } + case *stats.InHeader: + if h.inHeader != nil { + h.inHeader(rs, span) + } + case *stats.InTrailer: + if h.inTrailer != nil { + h.inTrailer(rs, span) + } case *stats.End: s, ok := status.FromError(rs.Error) // rs.Error should always be convertable to a status, this is just a defensive check. From 92e7337e3420a06ff7fdc6c2c3abe656eeb2f5cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Ch=C3=A1vez?= Date: Sun, 20 Sep 2020 21:21:51 +0200 Subject: [PATCH 2/2] feat(span-customizer): adds support for span customizer. --- middleware/grpc/client.go | 30 ++++++-- middleware/grpc/client_parser_test.go | 105 ++++++++++++++++++++++++++ middleware/grpc/grpc_suite_test.go | 7 +- middleware/grpc/server.go | 38 +++++++--- middleware/grpc/server_parser_test.go | 49 ++++++++---- middleware/grpc/shared.go | 46 +++++++++-- noop.go | 2 + span.go | 6 +- span_customizer.go | 48 ++++++++++++ span_implementation.go | 6 +- 10 files changed, 295 insertions(+), 42 deletions(-) create mode 100644 middleware/grpc/client_parser_test.go create mode 100644 span_customizer.go diff --git a/middleware/grpc/client.go b/middleware/grpc/client.go index ba44d6ca..5b8a79cb 100644 --- a/middleware/grpc/client.go +++ b/middleware/grpc/client.go @@ -1,4 +1,4 @@ -// Copyright 2019 The OpenZipkin Authors +// Copyright 2020 The OpenZipkin Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -42,25 +42,41 @@ func WithRemoteServiceName(name string) ClientOption { } } +// WithClientOutPayloadParser adds a parser for the stats.OutPayload to be able to access +// the outgoing request payload +func WithClientOutPayloadParser(parser func(*stats.OutPayload, zipkin.SpanCustomizer)) ClientOption { + return func(h *clientHandler) { + h.handleRPCParser.outPayload = parser + } +} + +// WithClientOutHeaderParser adds a parser for the stats.OutHeader to be able to access +// the outgoing request payload +func WithClientOutHeaderParser(parser func(*stats.OutHeader, zipkin.SpanCustomizer)) ClientOption { + return func(h *clientHandler) { + h.handleRPCParser.outHeader = parser + } +} + // WithClientInPayloadParser adds a parser for the stats.InPayload to be able to access -// the request payload -func WithClientInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ClientOption { +// the incoming response payload +func WithClientInPayloadParser(parser func(*stats.InPayload, zipkin.SpanCustomizer)) ClientOption { return func(h *clientHandler) { h.handleRPCParser.inPayload = parser } } // WithClientInTrailerParser adds a parser for the stats.InTrailer to be able to access -// the request trailer -func WithClientInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ClientOption { +// the incoming response trailer +func WithClientInTrailerParser(parser func(*stats.InTrailer, zipkin.SpanCustomizer)) ClientOption { return func(h *clientHandler) { h.handleRPCParser.inTrailer = parser } } // WithClientInHeaderParser adds a parser for the stats.InHeader to be able to access -// the request payload -func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ClientOption { +// the incoming response header +func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.SpanCustomizer)) ClientOption { return func(h *clientHandler) { h.handleRPCParser.inHeader = parser } diff --git a/middleware/grpc/client_parser_test.go b/middleware/grpc/client_parser_test.go new file mode 100644 index 00000000..6c2f6cfd --- /dev/null +++ b/middleware/grpc/client_parser_test.go @@ -0,0 +1,105 @@ +// Copyright 2020 The OpenZipkin Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc_test + +import ( + "context" + "testing" + + "github.com/openzipkin/zipkin-go" + zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc" + service "github.com/openzipkin/zipkin-go/proto/testing" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" +) + +func TestGRPCClientCanAccessToPayloadAndMetadata(t *testing.T) { + tracer, flusher := createTracer(false) + + s := grpc.NewServer() + defer s.Stop() + + service.RegisterHelloServiceServer(s, &TestHelloService{ + responseHeader: metadata.Pairs("test_key", "test_value_1"), + responseTrailer: metadata.Pairs("test_key", "test_value_2"), + }) + + dialer := initListener(s) + + ctx := context.Background() + conn, err := grpc.DialContext( + ctx, + "bufnet", + grpc.WithContextDialer(dialer), + grpc.WithInsecure(), + grpc.WithStatsHandler(zipkingrpc.NewClientHandler( + tracer, + zipkingrpc.WithClientOutPayloadParser(func(outPayload *stats.OutPayload, span zipkin.SpanCustomizer) { + m, ok := outPayload.Payload.(*service.HelloRequest) + if !ok { + t.Fatal("failed to cast the payload as a service.HelloResponse") + } + if want, have := "Hello", m.Payload; want != have { + t.Errorf("incorrect payload: want %q, have %q", want, have) + } + }), + zipkingrpc.WithClientOutHeaderParser(func(outHeader *stats.OutHeader, span zipkin.SpanCustomizer) { + if want, have := "test_value", outHeader.Header.Get("test_key")[0]; want != have { + t.Errorf("incorrect header value, want %q, have %q", want, have) + } + }), + zipkingrpc.WithClientInPayloadParser(func(inPayload *stats.InPayload, span zipkin.SpanCustomizer) { + m, ok := inPayload.Payload.(*service.HelloResponse) + if !ok { + t.Fatal("failed to cast the payload as a service.HelloRequest") + } + if want, have := "World", m.Payload; want != have { + t.Errorf("incorrect payload: want %q, have %q", want, have) + } + }), + zipkingrpc.WithClientInHeaderParser(func(inHeader *stats.InHeader, span zipkin.SpanCustomizer) { + if want, have := "test_value_1", inHeader.Header.Get("test_key")[0]; want != have { + t.Errorf("incorrect header value, want %q, have %q", want, have) + } + }), + zipkingrpc.WithClientInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.SpanCustomizer) { + if want, have := "test_value_2", inTrailer.Trailer.Get("test_key")[0]; want != have { + t.Errorf("incorrect header value, want %q, have %q", want, have) + } + }), + )), + ) + + if err != nil { + t.Fatalf("Failed to dial bufnet: %v", err) + } + defer conn.Close() + + client := service.NewHelloServiceClient(conn) + + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value")) + _, err = client.Hello(ctx, &service.HelloRequest{ + Payload: "Hello", + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + spans := flusher() + if want, have := 1, len(spans); want != have { + t.Errorf("unexpected number of spans, want %d, have %d", want, have) + } +} diff --git a/middleware/grpc/grpc_suite_test.go b/middleware/grpc/grpc_suite_test.go index e8497350..a5431888 100644 --- a/middleware/grpc/grpc_suite_test.go +++ b/middleware/grpc/grpc_suite_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The OpenZipkin Authors +// Copyright 2020 The OpenZipkin Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -125,6 +125,8 @@ func (g *sequentialIdGenerator) reset() { type TestHelloService struct { service.UnimplementedHelloServiceServer + responseHeader metadata.MD + responseTrailer metadata.MD } func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) { @@ -158,6 +160,9 @@ func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest) } } + grpc.SetTrailer(ctx, s.responseTrailer) + grpc.SendHeader(ctx, s.responseHeader) + return resp, nil } diff --git a/middleware/grpc/server.go b/middleware/grpc/server.go index f229420e..bb0778b1 100644 --- a/middleware/grpc/server.go +++ b/middleware/grpc/server.go @@ -1,4 +1,4 @@ -// Copyright 2019 The OpenZipkin Authors +// Copyright 2020 The OpenZipkin Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -41,26 +41,42 @@ func ServerTags(tags map[string]string) ServerOption { } // WithServerInPayloadParser adds a parser for the stats.InPayload to be able to access -// the request payload -func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ServerOption { +// the incoming request payload +func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.SpanCustomizer)) ServerOption { return func(h *serverHandler) { h.handleRPCParser.inPayload = parser } } -// WithserverInTrailerParser adds a parser for the stats.InTrailer to be able to access -// the request trailer -func WithserverInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ServerOption { +// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access +// the incoming request header +func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.SpanCustomizer)) ServerOption { return func(h *serverHandler) { - h.handleRPCParser.inTrailer = parser + h.handleRPCParser.inHeader = parser } } -// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access -// the request payload -func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ServerOption { +// WithServerOutPayloadParser adds a parser for the stats.OutPayload to be able to access +// the outgoing response payload +func WithServerOutPayloadParser(parser func(*stats.OutPayload, zipkin.SpanCustomizer)) ServerOption { return func(h *serverHandler) { - h.handleRPCParser.inHeader = parser + h.handleRPCParser.outPayload = parser + } +} + +// WithServerOutTrailerParser adds a parser for the stats.OutTrailer to be able to access +// the outgoing response trailer +func WithServerOutTrailerParser(parser func(*stats.OutTrailer, zipkin.SpanCustomizer)) ServerOption { + return func(h *serverHandler) { + h.handleRPCParser.outTrailer = parser + } +} + +// WithServerOutHeaderParser adds a parser for the stats.OutHeader to be able to access +// the outgoing response payload +func WithServerOutHeaderParser(parser func(*stats.OutHeader, zipkin.SpanCustomizer)) ServerOption { + return func(h *serverHandler) { + h.handleRPCParser.outHeader = parser } } diff --git a/middleware/grpc/server_parser_test.go b/middleware/grpc/server_parser_test.go index 979e3224..bbcba93e 100644 --- a/middleware/grpc/server_parser_test.go +++ b/middleware/grpc/server_parser_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The OpenZipkin Authors +// Copyright 2020 The OpenZipkin Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -77,7 +77,7 @@ func TestGRPCServerCreatesASpanAndContext(t *testing.T) { } } -func TestGRPCServerCanAccessToHeaders(t *testing.T) { +func TestGRPCServerCanAccessToPayloadAndMetadata(t *testing.T) { tracer, flusher := createTracer(false) s := grpc.NewServer( @@ -85,14 +85,37 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) { zipkingrpc.NewServerHandler( tracer, zipkingrpc.ServerTags(map[string]string{"default": "tag"}), - zipkingrpc.WithServerInHeaderParser(func(inHeader *stats.InHeader, span zipkin.Span) { + zipkingrpc.WithServerInPayloadParser(func(inPayload *stats.InPayload, span zipkin.SpanCustomizer) { + m, ok := inPayload.Payload.(*service.HelloRequest) + if !ok { + t.Fatal("failed to cast the payload as a service.HelloRequest") + } + if want, have := "Hello", m.Payload; want != have { + t.Errorf("incorrect payload: want %q, have %q", want, have) + } + }), + zipkingrpc.WithServerInHeaderParser(func(inHeader *stats.InHeader, span zipkin.SpanCustomizer) { if want, have := "test_value", inHeader.Header.Get("test_key")[0]; want != have { - t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have) + t.Errorf("incorrect header value, want %q, have %q", want, have) + } + }), + zipkingrpc.WithServerOutPayloadParser(func(outPayload *stats.OutPayload, span zipkin.SpanCustomizer) { + m, ok := outPayload.Payload.(*service.HelloResponse) + if !ok { + t.Fatal("failed to cast the payload as a service.HelloResponse") + } + if want, have := "World", m.Payload; want != have { + t.Errorf("incorrect payload: want %q, have %q", want, have) + } + }), + zipkingrpc.WithServerOutHeaderParser(func(outHeader *stats.OutHeader, span zipkin.SpanCustomizer) { + if want, have := "test_value_1", outHeader.Header.Get("test_key")[0]; want != have { + t.Errorf("incorrect header value, want %q, have %q", want, have) } }), - zipkingrpc.WithServerInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.Span) { - if want, have := "test_value", inTrailer.Trailer.Get("test_key")[0]; want != have { - t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have) + zipkingrpc.WithServerOutTrailerParser(func(outTrailer *stats.OutTrailer, span zipkin.SpanCustomizer) { + if want, have := "test_value_2", outTrailer.Trailer.Get("test_key")[0]; want != have { + t.Errorf("incorrect trailer value, want %q, have %q", want, have) } }), ), @@ -100,7 +123,10 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) { ) defer s.Stop() - service.RegisterHelloServiceServer(s, &TestHelloService{}) + service.RegisterHelloServiceServer(s, &TestHelloService{ + responseHeader: metadata.Pairs("test_key", "test_value_1"), + responseTrailer: metadata.Pairs("test_key", "test_value_2"), + }) dialer := initListener(s) @@ -118,7 +144,7 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) { client := service.NewHelloServiceClient(conn) - ctx = metadata.AppendToOutgoingContext(ctx, "test_key", "test_value") + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value")) _, err = client.Hello(ctx, &service.HelloRequest{ Payload: "Hello", }) @@ -130,9 +156,4 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) { if want, have := 1, len(spans); want != have { t.Errorf("unexpected number of spans, want %d, have %d", want, have) } - - span := spans[0] - if want, have := model.Server, span.Kind; want != have { - t.Errorf("unexpected kind, want %q, have %q", want, have) - } } diff --git a/middleware/grpc/shared.go b/middleware/grpc/shared.go index 8cc14ca0..1394f7a7 100644 --- a/middleware/grpc/shared.go +++ b/middleware/grpc/shared.go @@ -1,4 +1,4 @@ -// Copyright 2019 The OpenZipkin Authors +// Copyright 2020 The OpenZipkin Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -28,9 +28,21 @@ import ( ) type handleRPCParser struct { - inPayload func(*stats.InPayload, zipkin.Span) - inTrailer func(*stats.InTrailer, zipkin.Span) - inHeader func(*stats.InHeader, zipkin.Span) + inPayload func(*stats.InPayload, zipkin.SpanCustomizer) + inTrailer func(*stats.InTrailer, zipkin.SpanCustomizer) + inHeader func(*stats.InHeader, zipkin.SpanCustomizer) + outPayload func(*stats.OutPayload, zipkin.SpanCustomizer) + outTrailer func(*stats.OutTrailer, zipkin.SpanCustomizer) + outHeader func(*stats.OutHeader, zipkin.SpanCustomizer) +} + +func (h handleRPCParser) IsZero() bool { + return h.inPayload == nil && + h.inTrailer == nil && + h.inHeader == nil && + h.outPayload == nil && + h.outTrailer == nil && + h.outHeader == nil } // A RPCHandler can be registered using WithClientRPCHandler or WithServerRPCHandler to intercept calls to HandleRPC of @@ -45,19 +57,39 @@ func spanName(rti *stats.RPCTagInfo) string { func handleRPC(ctx context.Context, rs stats.RPCStats, h handleRPCParser) { span := zipkin.SpanFromContext(ctx) + if span.IsNoop() { + return + } + + var spanCustomizer zipkin.SpanCustomizer + if !h.IsZero() { + spanCustomizer = zipkin.WrapWithSpanCustomizerShield(span) + } switch rs := rs.(type) { case *stats.InPayload: if h.inPayload != nil { - h.inPayload(rs, span) + h.inPayload(rs, spanCustomizer) } case *stats.InHeader: if h.inHeader != nil { - h.inHeader(rs, span) + h.inHeader(rs, spanCustomizer) } case *stats.InTrailer: if h.inTrailer != nil { - h.inTrailer(rs, span) + h.inTrailer(rs, spanCustomizer) + } + case *stats.OutPayload: + if h.outPayload != nil { + h.outPayload(rs, spanCustomizer) + } + case *stats.OutHeader: + if h.outHeader != nil { + h.outHeader(rs, spanCustomizer) + } + case *stats.OutTrailer: + if h.outTrailer != nil { + h.outTrailer(rs, spanCustomizer) } case *stats.End: s, ok := status.FromError(rs.Error) diff --git a/noop.go b/noop.go index 1368b9e7..5c914830 100644 --- a/noop.go +++ b/noop.go @@ -39,3 +39,5 @@ func (*noopSpan) Finish() {} func (*noopSpan) FinishedWithDuration(duration time.Duration) {} func (*noopSpan) Flush() {} + +func (*noopSpan) IsNoop() bool { return true } diff --git a/span.go b/span.go index cc915681..041ac5fb 100644 --- a/span.go +++ b/span.go @@ -1,4 +1,4 @@ -// Copyright 2019 The OpenZipkin Authors +// Copyright 2020 The OpenZipkin Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -55,4 +55,8 @@ type Span interface { // This can be used if the DelaySend SpanOption was set or when dealing with // one-way RPC tracing where duration might not be measured. Flush() + + // IsNoop tells whether the span is noop or not. Usually used to avoid resource misusage + // when customizing a span + IsNoop() bool } diff --git a/span_customizer.go b/span_customizer.go new file mode 100644 index 00000000..dabed66b --- /dev/null +++ b/span_customizer.go @@ -0,0 +1,48 @@ +// Copyright 2020 The OpenZipkin Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import "time" + +// SpanCustomizer allows to safely customize a span without accesing its lifecycle +// methods +type SpanCustomizer interface { + // Annotate adds a timed event to the Span. + Annotate(time.Time, string) + + // Tag sets Tag with given key and value to the Span. If key already exists in + // the Span the value will be overridden except for error tags where the first + // value is persisted. + Tag(string, string) +} + +type shield struct { + s Span +} + +var _ SpanCustomizer = &shield{} + +func (sc *shield) Annotate(t time.Time, value string) { + sc.s.Annotate(t, value) +} + +func (sc *shield) Tag(key string, value string) { + sc.s.Tag(key, value) +} + +// WrapWithSpanCustomizerShield wraps a span with the span customizer shield +func WrapWithSpanCustomizerShield(s Span) SpanCustomizer { + return &shield{s} +} diff --git a/span_implementation.go b/span_implementation.go index 72904a84..7edf30a9 100644 --- a/span_implementation.go +++ b/span_implementation.go @@ -1,4 +1,4 @@ -// Copyright 2019 The OpenZipkin Authors +// Copyright 2020 The OpenZipkin Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -99,3 +99,7 @@ func (s *spanImpl) Flush() { s.tracer.reporter.Send(s.SpanModel) } } + +func (s *spanImpl) IsNoop() bool { + return false +}