Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-9149] Use RDK Logger Across Interceptors #400

Merged
merged 24 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8097618
manually import grpc interceptor helpers
bashar-515 Jan 7, 2025
e5dea9c
convert almost everything to use a zap compatible logger
bashar-515 Jan 8, 2025
61533b2
finish converting to zap compatible logger
bashar-515 Jan 8, 2025
ce5bc5c
implement stream interceptor
bashar-515 Jan 8, 2025
1dd945e
extract to utility function
bashar-515 Jan 8, 2025
beae200
remove note
bashar-515 Jan 8, 2025
ebdb5bd
no longer pass unnecessary logger
bashar-515 Jan 8, 2025
d3979fd
remove options
bashar-515 Jan 8, 2025
e5579f7
remove unused import
bashar-515 Jan 9, 2025
919e940
only output error level logs and up
bashar-515 Jan 9, 2025
6e396f3
take wrapped logger out of stream interceptor
bashar-515 Jan 9, 2025
0f383d6
remove unused code
bashar-515 Jan 10, 2025
6869b6e
lint
bashar-515 Jan 10, 2025
e89b8a0
create new logger first
bashar-515 Jan 10, 2025
73fd933
explicitly add options to logger
bashar-515 Jan 10, 2025
be22266
include 'nolint' comments that were removed
bashar-515 Jan 13, 2025
61b50da
fix 'nolint' comments
bashar-515 Jan 13, 2025
724cbc0
fix final 'nolint' comment
bashar-515 Jan 13, 2025
471f3a4
remove new added line
bashar-515 Jan 13, 2025
f5b96ae
set log level to error whenever there is an error
bashar-515 Jan 14, 2025
17fde6b
ensure timestamps field is formatted like log timestamp
bashar-515 Jan 14, 2025
4cd14e2
don't export ISO8601 formatting constant and only set level to error …
bashar-515 Jan 14, 2025
9762a28
remove ineffectual assignment
bashar-515 Jan 14, 2025
5b9a1a5
document LogFinalLine()
bashar-515 Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package utils

import (
"reflect"
"time"

"github.com/edaniels/golog"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
)

// Logger is used various parts of the package for informational/debugging purposes.
Expand Down Expand Up @@ -127,3 +131,35 @@ func AddFieldsToLogger(inp ZapCompatibleLogger, args ...interface{}) (loggerRet

return loggerRet
}

// LogFinalLine is used to log the final status of a gRPC request along with its execution time, an associated error (if any), and the
// gRPC status code. If there is an error, the log level is upgraded (if necessary) to ERROR. Otherwise, it is set to DEBUG. This code is
// taken from
// https://github.com/grpc-ecosystem/go-grpc-middleware/blob/560829fc74fcf9a69b7ab01d484f8b8961dc734b/logging/zap/client_interceptors.go
func LogFinalLine(logger ZapCompatibleLogger, startTime time.Time, err error, msg string, code codes.Code) {
level := grpc_zap.DefaultCodeToLevel(code)

// this calculation is done because duration.Milliseconds() will return an integer, which is not precise enough.
duration := float32(time.Since(startTime).Nanoseconds()/1000) / 1000
fields := []any{}
if err == nil {
level = zap.DebugLevel
} else {
if level < zap.ErrorLevel {
level = zap.ErrorLevel
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cheukt Ended up wrapping this to only happen if the level was already below ERROR. Otherwise the linter labels line 140 an "ineffectual assignment."

}
fields = append(fields, "error", err)
}
fields = append(fields, "grpc.code", code.String(), "grpc.time_ms", duration)
// grpc_zap.DefaultCodeToLevel will only return zap.DebugLevel, zap.InfoLevel, zap.ErrorLevel, zap.WarnLevel
switch level {
case zap.DebugLevel:
logger.Debugw(msg, fields...)
case zap.InfoLevel:
logger.Infow(msg, fields...)
case zap.ErrorLevel:
logger.Errorw(msg, fields...)
case zap.WarnLevel, zap.DPanicLevel, zap.PanicLevel, zap.FatalLevel, zapcore.InvalidLevel:
logger.Warnw(msg, fields...)
}
}
15 changes: 4 additions & 11 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ import (
"github.com/golang-jwt/jwt/v4"
"github.com/google/uuid"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -327,10 +324,6 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
logger: logger,
}

grpcLogger := logger.Desugar()
if !(sOpts.debug || utils.Debug) {
grpcLogger = grpcLogger.WithOptions(zap.IncreaseLevel(zap.LevelEnablerFunc(zapcore.ErrorLevel.Enabled)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[qu] is removing this safe? Not sure if we lose out on key functionality here by not setting these options. I'm assuming that these options are replaced by the cases outlined in LogFinalLine().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea here is that if debug mode is not on, we only want to see error level logs and up. We should port that functionality to the interceptor themselves (see if you're seeing info level logs with this set of changes)

}
if sOpts.unknownStreamDesc != nil {
serverOpts = append(serverOpts, grpc.UnknownServiceHandler(sOpts.unknownStreamDesc.Handler))
}
Expand All @@ -342,10 +335,10 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
logger.Errorw("panicked while calling unary server method", "error", errors.WithStack(err))
return err
}))),
grpc_zap.UnaryServerInterceptor(grpcLogger),
grpcUnaryServerInterceptor(logger),
unaryServerCodeInterceptor(),
)
unaryInterceptors = append(unaryInterceptors, UnaryServerTracingInterceptor(grpcLogger))
unaryInterceptors = append(unaryInterceptors, UnaryServerTracingInterceptor())
unaryAuthIntPos := -1
if !sOpts.unauthenticated {
unaryInterceptors = append(unaryInterceptors, server.authUnaryInterceptor)
Expand Down Expand Up @@ -375,10 +368,10 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
logger.Errorw("panicked while calling stream server method", "error", errors.WithStack(err))
return err
}))),
grpc_zap.StreamServerInterceptor(grpcLogger),
grpcStreamServerInterceptor(logger),
streamServerCodeInterceptor(),
)
streamInterceptors = append(streamInterceptors, StreamServerTracingInterceptor(grpcLogger))
streamInterceptors = append(streamInterceptors, StreamServerTracingInterceptor())
streamAuthIntPos := -1
if !sOpts.unauthenticated {
streamInterceptors = append(streamInterceptors, server.authStreamInterceptor)
Expand Down
53 changes: 51 additions & 2 deletions rpc/server_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ import (
"context"
"encoding/hex"
"fmt"
"path"
"strconv"
"time"

grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
"github.com/pkg/errors"
"go.opencensus.io/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.viam.com/utils"
)

// UnaryServerTracingInterceptor starts a new Span if Span metadata exists in the context.
func UnaryServerTracingInterceptor(logger *zap.Logger) grpc.UnaryServerInterceptor {
func UnaryServerTracingInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if remoteSpanContext, err := remoteSpanContextFromContext(ctx); err == nil {
var span *trace.Span
Expand All @@ -38,7 +43,7 @@ func UnaryServerTracingInterceptor(logger *zap.Logger) grpc.UnaryServerIntercept
}

// StreamServerTracingInterceptor starts a new Span if Span metadata exists in the context.
func StreamServerTracingInterceptor(logger *zap.Logger) grpc.StreamServerInterceptor {
func StreamServerTracingInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if remoteSpanContext, err := remoteSpanContextFromContext(stream.Context()); err == nil {
newCtx, span := trace.StartSpanWithRemoteParent(stream.Context(), "server_root", remoteSpanContext)
Expand Down Expand Up @@ -119,3 +124,47 @@ func remoteSpanContextFromContext(ctx context.Context) (trace.SpanContext, error

return trace.SpanContext{TraceID: traceID, SpanID: spanID, TraceOptions: traceOptions, Tracestate: nil}, nil
}

func grpcUnaryServerInterceptor(logger utils.ZapCompatibleLogger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
resp, err := handler(ctx, req)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(logger, serverCallFields(ctx, info.FullMethod, startTime)...)

utils.LogFinalLine(loggerWithFields, startTime, err, "finished unary call with code "+code.String(), code)

return resp, err
}
}

func grpcStreamServerInterceptor(logger utils.ZapCompatibleLogger) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
startTime := time.Now()
err := handler(srv, stream)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(logger, serverCallFields(stream.Context(), info.FullMethod, startTime)...)

utils.LogFinalLine(loggerWithFields, startTime, err, "finished stream call with code "+code.String(), code)

return err
}
}

const iso8601 = "2006-01-02T15:04:05.000Z0700" // keep timestamp formatting constant

func serverCallFields(ctx context.Context, fullMethodString string, start time.Time) []any {
var f []any
f = append(f, "grpc.start_time", start.UTC().Format(iso8601))
if d, ok := ctx.Deadline(); ok {
f = append(f, zap.String("grpc.request.deadline", d.UTC().Format(iso8601)))
}
service := path.Dir(fullMethodString)[1:]
method := path.Base(fullMethodString)
return append(f, []any{
"span.kind", "server",
"system", "grpc",
"grpc.service", service,
"grpc.method", method,
})
}
37 changes: 4 additions & 33 deletions rpc/wrtc_client_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ import (
"time"

grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
"github.com/viamrobotics/webrtc/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -110,8 +107,9 @@ func (ch *webrtcClientChannel) Invoke(
) error {
startTime := time.Now()
err := ch.invokeWithInterceptor(ctx, method, args, reply, opts...)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(ch.webrtcBaseChannel.logger, newClientLoggerFields(method)...)
logFinalClientLine(loggerWithFields, startTime, err, "finished client unary call")
utils.LogFinalLine(loggerWithFields, startTime, err, "finished client unary call", code)
return err
}

Expand Down Expand Up @@ -197,8 +195,9 @@ func (ch *webrtcClientChannel) NewStream(
) (grpc.ClientStream, error) {
startTime := time.Now()
clientStream, err := ch.streamWithInterceptor(ctx, method)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(ch.webrtcBaseChannel.logger, newClientLoggerFields(method)...)
logFinalClientLine(loggerWithFields, startTime, err, "finished client streaming call")
utils.LogFinalLine(loggerWithFields, startTime, err, "finished client streaming call", code)
return clientStream, err
}

Expand Down Expand Up @@ -341,34 +340,6 @@ func (ch *webrtcClientChannel) writeReset(stream *webrtcpb.Stream) error {
})
}

// taken from
// https://github.com/grpc-ecosystem/go-grpc-middleware/blob/560829fc74fcf9a69b7ab01d484f8b8961dc734b/logging/zap/client_interceptors.go
func logFinalClientLine(logger utils.ZapCompatibleLogger, startTime time.Time, err error, msg string) {
code := grpc_logging.DefaultErrorToCode(err)
level := grpc_zap.DefaultCodeToLevel(code)

// this calculation is done because duration.Milliseconds() will return an integer, which is not precise enough.
duration := float32(time.Since(startTime).Nanoseconds()/1000) / 1000
fields := []any{}
if err == nil {
level = zap.DebugLevel
} else {
fields = append(fields, "error", err)
}
fields = append(fields, "grpc.code", code.String(), "grpc.time_ms", duration)
// grpc_zap.DefaultCodeToLevel will only return zap.DebugLevel, zap.InfoLevel, zap.ErrorLevel, zap.WarnLevel
switch level {
case zap.DebugLevel:
logger.Debugw(msg, fields...)
case zap.InfoLevel:
logger.Infow(msg, fields...)
case zap.ErrorLevel:
logger.Errorw(msg, fields...)
case zap.WarnLevel, zap.DPanicLevel, zap.PanicLevel, zap.FatalLevel, zapcore.InvalidLevel:
logger.Warnw(msg, fields...)
}
}

func newClientLoggerFields(fullMethodString string) []any {
service := path.Dir(fullMethodString)[1:]
method := path.Base(fullMethodString)
Expand Down
Loading