Skip to content

Commit

Permalink
Merge pull request #64 from GoogleCloudPlatform/fix-nil
Browse files Browse the repository at this point in the history
fix: immediately closing GcpMultiEndpoint
  • Loading branch information
nimf authored Jan 11, 2024
2 parents 36b8ba6 + a69f851 commit 0a9a597
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 11 deletions.
28 changes: 17 additions & 11 deletions grpcgcp/gcp_multiendpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,11 @@ func (gme *GCPMultiEndpoint) pickConn(ctx context.Context) *grpc.ClientConn {

func (gme *GCPMultiEndpoint) Close() error {
var errs multiError
for _, mc := range gme.pools {
for e, mc := range gme.pools {
mc.stopMonitoring()
if err := mc.conn.Close(); err != nil {
errs = append(errs, err)
log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
}
}
return errs.Combine()
Expand Down Expand Up @@ -216,9 +217,19 @@ type monitoredConn struct {
cancel context.CancelFunc
}

func (sm *monitoredConn) monitor() {
var ctx context.Context
ctx, sm.cancel = context.WithCancel(context.Background())
func newMonitoredConn(endpoint string, conn *grpc.ClientConn, gme *GCPMultiEndpoint) (mc *monitoredConn) {
ctx, cancel := context.WithCancel(context.Background())
mc = &monitoredConn{
endpoint: endpoint,
conn: conn,
gme: gme,
cancel: cancel,
}
go mc.monitor(ctx)
return
}

func (sm *monitoredConn) monitor(ctx context.Context) {
currentState := sm.conn.GetState()
for sm.conn.WaitForStateChange(ctx, currentState) {
currentState = sm.conn.GetState()
Expand Down Expand Up @@ -272,12 +283,7 @@ func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOption
if err != nil {
return err
}
gme.pools[e] = &monitoredConn{
endpoint: e,
conn: conn,
gme: gme,
}
go gme.pools[e].monitor()
gme.pools[e] = newMonitoredConn(e, conn, gme)
}
}

Expand Down Expand Up @@ -309,7 +315,7 @@ func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOption
for e, mc := range gme.pools {
if _, ok := validPools[e]; !ok {
if err := mc.conn.Close(); err != nil {
// TODO: log error.
log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
}
mc.stopMonitoring()
delete(gme.pools, e)
Expand Down
37 changes: 37 additions & 0 deletions grpcgcp/test_grpc/gcp_multiendpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,3 +664,40 @@ func TestGcpMultiEndpointWithDelays(t *testing.T) {
// Give some time to connect and switching delay. Follower must be used.
tc.SayHelloWorksWithin(context.Background(), fEndpoint, waitTO+switchingDelay)
}

func TestGcpMultiEndpointInstantShutdown(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("Panic: %v", r)
}
}()

defaultME := "default"

apiCfg := &configpb.ApiConfig{
ChannelPool: &configpb.ChannelPoolConfig{
MinSize: 3,
MaxSize: 3,
},
}

conn, err := grpcgcp.NewGcpMultiEndpoint(
&grpcgcp.GCPMultiEndpointOptions{
GRPCgcpConfig: apiCfg,
MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{
defaultME: {
Endpoints: []string{"localhost:50051"},
},
},
Default: defaultME,
},
grpc.WithInsecure(),
)

if err != nil {
t.Fatalf("NewMultiEndpointConn returns unexpected error: %v", err)
}

// Closing GcpMultiEndpoint immediately should not cause panic.
conn.Close()
}

0 comments on commit 0a9a597

Please sign in to comment.