Skip to content

Commit

Permalink
[RSDK-9149] Use RDK Logger Across Interceptors (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
bashar-515 authored Jan 14, 2025
1 parent a4cfa8d commit 52a853a
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 46 deletions.
36 changes: 36 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package utils

import (
"reflect"
"time"

"github.com/edaniels/golog"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
)

// Logger is used various parts of the package for informational/debugging purposes.
Expand Down Expand Up @@ -127,3 +131,35 @@ func AddFieldsToLogger(inp ZapCompatibleLogger, args ...interface{}) (loggerRet

return loggerRet
}

// LogFinalLine is used to log the final status of a gRPC request along with its execution time, an associated error (if any), and the
// gRPC status code. If there is an error, the log level is upgraded (if necessary) to ERROR. Otherwise, it is set to DEBUG. This code is
// taken from
// https://github.com/grpc-ecosystem/go-grpc-middleware/blob/560829fc74fcf9a69b7ab01d484f8b8961dc734b/logging/zap/client_interceptors.go
func LogFinalLine(logger ZapCompatibleLogger, startTime time.Time, err error, msg string, code codes.Code) {
level := grpc_zap.DefaultCodeToLevel(code)

// this calculation is done because duration.Milliseconds() will return an integer, which is not precise enough.
duration := float32(time.Since(startTime).Nanoseconds()/1000) / 1000
fields := []any{}
if err == nil {
level = zap.DebugLevel
} else {
if level < zap.ErrorLevel {
level = zap.ErrorLevel
}
fields = append(fields, "error", err)
}
fields = append(fields, "grpc.code", code.String(), "grpc.time_ms", duration)
// grpc_zap.DefaultCodeToLevel will only return zap.DebugLevel, zap.InfoLevel, zap.ErrorLevel, zap.WarnLevel
switch level {
case zap.DebugLevel:
logger.Debugw(msg, fields...)
case zap.InfoLevel:
logger.Infow(msg, fields...)
case zap.ErrorLevel:
logger.Errorw(msg, fields...)
case zap.WarnLevel, zap.DPanicLevel, zap.PanicLevel, zap.FatalLevel, zapcore.InvalidLevel:
logger.Warnw(msg, fields...)
}
}
15 changes: 4 additions & 11 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ import (
"github.com/golang-jwt/jwt/v4"
"github.com/google/uuid"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -327,10 +324,6 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
logger: logger,
}

grpcLogger := logger.Desugar()
if !(sOpts.debug || utils.Debug) {
grpcLogger = grpcLogger.WithOptions(zap.IncreaseLevel(zap.LevelEnablerFunc(zapcore.ErrorLevel.Enabled)))
}
if sOpts.unknownStreamDesc != nil {
serverOpts = append(serverOpts, grpc.UnknownServiceHandler(sOpts.unknownStreamDesc.Handler))
}
Expand All @@ -342,10 +335,10 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
logger.Errorw("panicked while calling unary server method", "error", errors.WithStack(err))
return err
}))),
grpc_zap.UnaryServerInterceptor(grpcLogger),
grpcUnaryServerInterceptor(logger),
unaryServerCodeInterceptor(),
)
unaryInterceptors = append(unaryInterceptors, UnaryServerTracingInterceptor(grpcLogger))
unaryInterceptors = append(unaryInterceptors, UnaryServerTracingInterceptor())
unaryAuthIntPos := -1
if !sOpts.unauthenticated {
unaryInterceptors = append(unaryInterceptors, server.authUnaryInterceptor)
Expand Down Expand Up @@ -375,10 +368,10 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
logger.Errorw("panicked while calling stream server method", "error", errors.WithStack(err))
return err
}))),
grpc_zap.StreamServerInterceptor(grpcLogger),
grpcStreamServerInterceptor(logger),
streamServerCodeInterceptor(),
)
streamInterceptors = append(streamInterceptors, StreamServerTracingInterceptor(grpcLogger))
streamInterceptors = append(streamInterceptors, StreamServerTracingInterceptor())
streamAuthIntPos := -1
if !sOpts.unauthenticated {
streamInterceptors = append(streamInterceptors, server.authStreamInterceptor)
Expand Down
53 changes: 51 additions & 2 deletions rpc/server_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ import (
"context"
"encoding/hex"
"fmt"
"path"
"strconv"
"time"

grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
"github.com/pkg/errors"
"go.opencensus.io/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.viam.com/utils"
)

// UnaryServerTracingInterceptor starts a new Span if Span metadata exists in the context.
func UnaryServerTracingInterceptor(logger *zap.Logger) grpc.UnaryServerInterceptor {
func UnaryServerTracingInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if remoteSpanContext, err := remoteSpanContextFromContext(ctx); err == nil {
var span *trace.Span
Expand All @@ -38,7 +43,7 @@ func UnaryServerTracingInterceptor(logger *zap.Logger) grpc.UnaryServerIntercept
}

// StreamServerTracingInterceptor starts a new Span if Span metadata exists in the context.
func StreamServerTracingInterceptor(logger *zap.Logger) grpc.StreamServerInterceptor {
func StreamServerTracingInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if remoteSpanContext, err := remoteSpanContextFromContext(stream.Context()); err == nil {
newCtx, span := trace.StartSpanWithRemoteParent(stream.Context(), "server_root", remoteSpanContext)
Expand Down Expand Up @@ -119,3 +124,47 @@ func remoteSpanContextFromContext(ctx context.Context) (trace.SpanContext, error

return trace.SpanContext{TraceID: traceID, SpanID: spanID, TraceOptions: traceOptions, Tracestate: nil}, nil
}

func grpcUnaryServerInterceptor(logger utils.ZapCompatibleLogger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
resp, err := handler(ctx, req)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(logger, serverCallFields(ctx, info.FullMethod, startTime)...)

utils.LogFinalLine(loggerWithFields, startTime, err, "finished unary call with code "+code.String(), code)

return resp, err
}
}

func grpcStreamServerInterceptor(logger utils.ZapCompatibleLogger) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
startTime := time.Now()
err := handler(srv, stream)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(logger, serverCallFields(stream.Context(), info.FullMethod, startTime)...)

utils.LogFinalLine(loggerWithFields, startTime, err, "finished stream call with code "+code.String(), code)

return err
}
}

const iso8601 = "2006-01-02T15:04:05.000Z0700" // keep timestamp formatting constant

func serverCallFields(ctx context.Context, fullMethodString string, start time.Time) []any {
var f []any
f = append(f, "grpc.start_time", start.UTC().Format(iso8601))
if d, ok := ctx.Deadline(); ok {
f = append(f, zap.String("grpc.request.deadline", d.UTC().Format(iso8601)))
}
service := path.Dir(fullMethodString)[1:]
method := path.Base(fullMethodString)
return append(f, []any{
"span.kind", "server",
"system", "grpc",
"grpc.service", service,
"grpc.method", method,
})
}
37 changes: 4 additions & 33 deletions rpc/wrtc_client_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ import (
"time"

grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
"github.com/viamrobotics/webrtc/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -110,8 +107,9 @@ func (ch *webrtcClientChannel) Invoke(
) error {
startTime := time.Now()
err := ch.invokeWithInterceptor(ctx, method, args, reply, opts...)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(ch.webrtcBaseChannel.logger, newClientLoggerFields(method)...)
logFinalClientLine(loggerWithFields, startTime, err, "finished client unary call")
utils.LogFinalLine(loggerWithFields, startTime, err, "finished client unary call", code)
return err
}

Expand Down Expand Up @@ -197,8 +195,9 @@ func (ch *webrtcClientChannel) NewStream(
) (grpc.ClientStream, error) {
startTime := time.Now()
clientStream, err := ch.streamWithInterceptor(ctx, method)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(ch.webrtcBaseChannel.logger, newClientLoggerFields(method)...)
logFinalClientLine(loggerWithFields, startTime, err, "finished client streaming call")
utils.LogFinalLine(loggerWithFields, startTime, err, "finished client streaming call", code)
return clientStream, err
}

Expand Down Expand Up @@ -341,34 +340,6 @@ func (ch *webrtcClientChannel) writeReset(stream *webrtcpb.Stream) error {
})
}

// taken from
// https://github.com/grpc-ecosystem/go-grpc-middleware/blob/560829fc74fcf9a69b7ab01d484f8b8961dc734b/logging/zap/client_interceptors.go
func logFinalClientLine(logger utils.ZapCompatibleLogger, startTime time.Time, err error, msg string) {
code := grpc_logging.DefaultErrorToCode(err)
level := grpc_zap.DefaultCodeToLevel(code)

// this calculation is done because duration.Milliseconds() will return an integer, which is not precise enough.
duration := float32(time.Since(startTime).Nanoseconds()/1000) / 1000
fields := []any{}
if err == nil {
level = zap.DebugLevel
} else {
fields = append(fields, "error", err)
}
fields = append(fields, "grpc.code", code.String(), "grpc.time_ms", duration)
// grpc_zap.DefaultCodeToLevel will only return zap.DebugLevel, zap.InfoLevel, zap.ErrorLevel, zap.WarnLevel
switch level {
case zap.DebugLevel:
logger.Debugw(msg, fields...)
case zap.InfoLevel:
logger.Infow(msg, fields...)
case zap.ErrorLevel:
logger.Errorw(msg, fields...)
case zap.WarnLevel, zap.DPanicLevel, zap.PanicLevel, zap.FatalLevel, zapcore.InvalidLevel:
logger.Warnw(msg, fields...)
}
}

func newClientLoggerFields(fullMethodString string) []any {
service := path.Dir(fullMethodString)[1:]
method := path.Base(fullMethodString)
Expand Down

0 comments on commit 52a853a

Please sign in to comment.