Skip to content

Commit

Permalink
Merge pull request #65 from GoogleCloudPlatform/logs
Browse files Browse the repository at this point in the history
chore: add debug logs
  • Loading branch information
nimf authored Jan 22, 2024
2 parents 0a9a597 + 8cb774e commit 2271df0
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 28 deletions.
42 changes: 24 additions & 18 deletions grpcgcp/gcp_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
)

var _ balancer.Balancer = &gcpBalancer{} // Ensure gcpBalancer implements Balancer
var _ balancer.Balancer = (*gcpBalancer)(nil) // Ensure gcpBalancer implements Balancer

const (
// Name is the name of grpc_gcp balancer.
Expand Down Expand Up @@ -65,7 +65,7 @@ func (bb *gcpBalancerBuilder) Build(
cc balancer.ClientConn,
opt balancer.BuildOptions,
) balancer.Balancer {
return &gcpBalancer{
gb := &gcpBalancer{
cc: cc,
methodCfg: make(map[string]*pb.AffinityConfig),
affinityMap: make(map[string]balancer.SubConn),
Expand All @@ -81,6 +81,8 @@ func (bb *gcpBalancerBuilder) Build(
// may call UpdateBalancerState with this picker.
picker: newErrPicker(balancer.ErrNoSubConnAvailable),
}
gb.log = NewGCPLogger(compLogger, fmt.Sprintf("[gcpBalancer %p]", gb))
return gb
}

func (*gcpBalancerBuilder) Name() string {
Expand Down Expand Up @@ -194,8 +196,6 @@ func (ref *subConnRef) gotResp() {
}

type gcpBalancer struct {
balancer.Balancer // Embed V1 Balancer so it compiles with Builder

cfg *GcpBalancerConfig
methodCfg map[string]*pb.AffinityConfig

Expand All @@ -218,6 +218,7 @@ type gcpBalancer struct {
unresponsiveDetection bool

picker balancer.Picker
log grpclog.LoggerV2
}

func (gb *gcpBalancer) initializeConfig(cfg *GcpBalancerConfig) {
Expand Down Expand Up @@ -271,7 +272,9 @@ func (gb *gcpBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
gb.mu.Lock()
defer gb.mu.Unlock()
addrs := ccs.ResolverState.Addresses
grpclog.Infoln("grpcgcp.gcpBalancer: got new resolved addresses: ", addrs, " and balancer config: ", ccs.BalancerConfig)
if gb.log.V(FINE) {
gb.log.Infoln("got new resolved addresses: ", addrs, " and balancer config: ", ccs.BalancerConfig)
}
gb.addrs = addrs
if gb.cfg == nil {
cfg, ok := ccs.BalancerConfig.(*GcpBalancerConfig)
Expand All @@ -296,10 +299,7 @@ func (gb *gcpBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
}

func (gb *gcpBalancer) ResolverError(err error) {
grpclog.Warningf(
"grpcgcp.gcpBalancer: ResolverError: %v",
err,
)
gb.log.Warningf("ResolverError: %v", err)
}

// check current connection pool size
Expand Down Expand Up @@ -334,7 +334,7 @@ func (gb *gcpBalancer) addSubConn() {
balancer.NewSubConnOptions{HealthCheckEnabled: healthCheckEnabled},
)
if err != nil {
grpclog.Errorf("grpcgcp.gcpBalancer: failed to NewSubConn: %v", err)
gb.log.Errorf("failed to NewSubConn: %v", err)
return
}
gb.scRefs[sc] = &subConnRef{
Expand Down Expand Up @@ -430,7 +430,9 @@ func (gb *gcpBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubC
s := scs.ConnectivityState

if scRef, found := gb.refreshingScRefs[sc]; found {
grpclog.Infof("grpcgcp.gcpBalancer: handle replacement SubConn state change: %p, %v", sc, s)
if gb.log.V(FINE) {
gb.log.Infof("handle replacement SubConn state change: %p, %v", sc, s)
}
if s != connectivity.Ready {
// Ignore the replacement sc until it's ready.
return
Expand All @@ -452,15 +454,19 @@ func (gb *gcpBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubC
gb.cc.RemoveSubConn(oldSc)
}

grpclog.Infof("grpcgcp.gcpBalancer: handle SubConn state change: %p, %v", sc, s)
if gb.log.V(FINE) {
gb.log.Infof("handle SubConn state change: %p, %v", sc, s)
}

oldS, ok := gb.scStates[sc]
if !ok {
grpclog.Infof(
"grpcgcp.gcpBalancer: got state changes for an unknown/replaced SubConn: %p, %v",
sc,
s,
)
if gb.log.V(FINE) {
gb.log.Infof(
"got state changes for an unknown/replaced SubConn: %p, %v",
sc,
s,
)
}
return
}
gb.scStates[sc] = s
Expand Down Expand Up @@ -523,7 +529,7 @@ func (gb *gcpBalancer) refresh(ref *subConnRef) {
balancer.NewSubConnOptions{HealthCheckEnabled: healthCheckEnabled},
)
if err != nil {
grpclog.Errorf("grpcgcp.gcpBalancer: failed to create a replacement SubConn with NewSubConn: %v", err)
gb.log.Errorf("failed to create a replacement SubConn with NewSubConn: %v", err)
return
}
gb.refreshingScRefs[sc] = ref
Expand Down
98 changes: 98 additions & 0 deletions grpcgcp/gcp_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package grpcgcp

import (
"strings"

"google.golang.org/grpc/grpclog"
)

const (
FINE = 90
FINEST = 99
)

var compLogger = grpclog.Component("grpcgcp")

type gcpLogger struct {
logger grpclog.LoggerV2
prefix string
}

// Make sure gcpLogger implements grpclog.LoggerV2.
var _ grpclog.LoggerV2 = (*gcpLogger)(nil)

func NewGCPLogger(logger grpclog.LoggerV2, prefix string) *gcpLogger {
p := prefix
if !strings.HasSuffix(p, " ") {
p = p + " "
}
return &gcpLogger{
logger: logger,
prefix: p,
}
}

// Error implements grpclog.LoggerV2.
func (l *gcpLogger) Error(args ...interface{}) {
l.logger.Error(append([]interface{}{l.prefix}, args)...)
}

// Errorf implements grpclog.LoggerV2.
func (l *gcpLogger) Errorf(format string, args ...interface{}) {
l.logger.Errorf(l.prefix+format, args...)
}

// Errorln implements grpclog.LoggerV2.
func (l *gcpLogger) Errorln(args ...interface{}) {
l.logger.Errorln(append([]interface{}{l.prefix}, args)...)
}

// Fatal implements grpclog.LoggerV2.
func (l *gcpLogger) Fatal(args ...interface{}) {
l.logger.Fatal(append([]interface{}{l.prefix}, args)...)
}

// Fatalf implements grpclog.LoggerV2.
func (l *gcpLogger) Fatalf(format string, args ...interface{}) {
l.logger.Fatalf(l.prefix+format, args...)
}

// Fatalln implements grpclog.LoggerV2.
func (l *gcpLogger) Fatalln(args ...interface{}) {
l.Fatalln(append([]interface{}{l.prefix}, args)...)
}

// Info implements grpclog.LoggerV2.
func (l *gcpLogger) Info(args ...interface{}) {
l.logger.Info(append([]interface{}{l.prefix}, args)...)
}

// Infof implements grpclog.LoggerV2.
func (l *gcpLogger) Infof(format string, args ...interface{}) {
l.logger.Infof(l.prefix+format, args...)
}

// Infoln implements grpclog.LoggerV2.
func (l *gcpLogger) Infoln(args ...interface{}) {
l.logger.Infoln(append([]interface{}{l.prefix}, args)...)
}

// V implements grpclog.LoggerV2.
func (l *gcpLogger) V(level int) bool {
return l.logger.V(level)
}

// Warning implements grpclog.LoggerV2.
func (l *gcpLogger) Warning(args ...interface{}) {
l.logger.Warning(append([]interface{}{l.prefix}, args)...)
}

// Warningf implements grpclog.LoggerV2.
func (l *gcpLogger) Warningf(format string, args ...interface{}) {
l.logger.Warningf(l.prefix+format, args...)
}

// Warningln implements grpclog.LoggerV2.
func (l *gcpLogger) Warningln(args ...interface{}) {
l.logger.Warningln(append([]interface{}{l.prefix}, args)...)
}
27 changes: 24 additions & 3 deletions grpcgcp/gcp_multiendpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"

"github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint"
"google.golang.org/grpc"
Expand All @@ -39,7 +40,7 @@ var (
}
)

var log = grpclog.Component("GCPMultiEndpoint")
var gmeCounter uint32

type contextMEKey int

Expand Down Expand Up @@ -123,6 +124,7 @@ type GCPMultiEndpoint struct {
pools map[string]*monitoredConn
opts []grpc.DialOption
gcpConfig *pb.ApiConfig
log grpclog.LoggerV2

grpc.ClientConnInterface
}
Expand Down Expand Up @@ -153,7 +155,10 @@ func (gme *GCPMultiEndpoint) Close() error {
mc.stopMonitoring()
if err := mc.conn.Close(); err != nil {
errs = append(errs, err)
log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
}
if gme.log.V(FINE) {
gme.log.Infof("closed channel pool for %q endpoint.", e)
}
}
return errs.Combine()
Expand Down Expand Up @@ -187,6 +192,7 @@ func NewGcpMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOptio
defaultName: meOpts.Default,
opts: o,
gcpConfig: meOpts.GRPCgcpConfig,
log: NewGCPLogger(compLogger, fmt.Sprintf("[GCPMultiEndpoint #%d]", atomic.AddUint32(&gmeCounter, 1))),
}
if err := gme.UpdateMultiEndpoints(meOpts); err != nil {
return nil, err
Expand Down Expand Up @@ -233,6 +239,9 @@ func (sm *monitoredConn) monitor(ctx context.Context) {
currentState := sm.conn.GetState()
for sm.conn.WaitForStateChange(ctx, currentState) {
currentState = sm.conn.GetState()
if sm.gme.log.V(FINE) {
sm.gme.log.Infof("%q endpoint state changed to %v", sm.endpoint, currentState)
}
// Inform all multiendpoints.
for _, me := range sm.gme.mes {
me.SetEndpointAvailability(sm.endpoint, currentState == connectivity.Ready)
Expand Down Expand Up @@ -283,6 +292,9 @@ func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOption
if err != nil {
return err
}
if gme.log.V(FINE) {
gme.log.Infof("created new channel pool for %q endpoint.", e)
}
gme.pools[e] = newMonitoredConn(e, conn, gme)
}
}
Expand All @@ -296,6 +308,9 @@ func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOption
}

// Add new MultiEndpoint.
if gme.log.V(FINE) {
gme.log.Infof("creating new %q multiendpoint.", name)
}
me, err := multiendpoint.NewMultiEndpoint(meo)
if err != nil {
return err
Expand All @@ -308,14 +323,20 @@ func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOption
for name := range gme.mes {
if _, ok := meOpts.MultiEndpoints[name]; !ok {
delete(gme.mes, name)
if gme.log.V(FINE) {
gme.log.Infof("removed obsolete %q multiendpoint.", name)
}
}
}

// Remove obsolete pools.
for e, mc := range gme.pools {
if _, ok := validPools[e]; !ok {
if err := mc.conn.Close(); err != nil {
log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
}
if gme.log.V(FINE) {
gme.log.Infof("closed channel pool for %q endpoint.", e)
}
mc.stopMonitoring()
delete(gme.pools, e)
Expand Down
19 changes: 18 additions & 1 deletion grpcgcp/gcp_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,34 @@ import (
"github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
)

// Deadline exceeded gRPC error caused by client-side context reached deadline.
var deErr = status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error())

func newGCPPicker(readySCRefs []*subConnRef, gb *gcpBalancer) balancer.Picker {
return &gcpPicker{
gp := &gcpPicker{
gb: gb,
scRefs: readySCRefs,
}
gp.log = NewGCPLogger(gb.log, fmt.Sprintf("[gcpPicker %p]", gp))
return gp
}

type gcpPicker struct {
gb *gcpBalancer
mu sync.Mutex
scRefs []*subConnRef
log grpclog.LoggerV2
}

func (p *gcpPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if len(p.scRefs) <= 0 {
if p.log.V(FINEST) {
p.log.Info("returning balancer.ErrNoSubConnAvailable as no subconns are available.")
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

Expand Down Expand Up @@ -77,6 +84,9 @@ func (p *gcpPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{}, err
}
if scRef == nil {
if p.log.V(FINEST) {
p.log.Info("returning balancer.ErrNoSubConnAvailable as no SubConn was picked.")
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

Expand All @@ -101,6 +111,10 @@ func (p *gcpPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
p.gb.unbindSubConn(boundKey)
}
}

if p.log.V(FINEST) {
p.log.Infof("picked SubConn: %p", scRef.subConn)
}
return balancer.PickResult{SubConn: scRef.subConn, Done: callback}, nil
}

Expand Down Expand Up @@ -143,6 +157,9 @@ func (p *gcpPicker) getAndIncrementSubConnRef(boundKey string, cmd grpc_gcp.Affi
var err error
if cmd == grpc_gcp.AffinityConfig_BIND && p.gb.cfg.GetChannelPool().GetBindPickStrategy() == grpc_gcp.ChannelPoolConfig_ROUND_ROBIN {
scRef = p.gb.getSubConnRoundRobin()
if p.log.V(FINEST) {
p.log.Infof("picking SubConn for round-robin bind: %p", scRef.subConn)
}
} else {
scRef, err = p.getSubConnRef(boundKey)
}
Expand Down
Loading

0 comments on commit 2271df0

Please sign in to comment.