Skip to content

Commit

Permalink
Remove subconn wrappers map
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Jan 27, 2025
1 parent 9d4fa67 commit 0e60808
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 24 deletions.
28 changes: 7 additions & 21 deletions xds/internal/balancer/outlierdetection/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
addrs: make(map[string]*addressInfo),
scWrappers: make(map[balancer.SubConn]*subConnWrapper),
scUpdateCh: buffer.NewUnbounded(),
pickerUpdateCh: buffer.NewUnbounded(),
channelzParent: bOpts.ChannelzParent,
Expand Down Expand Up @@ -198,7 +197,6 @@ type outlierDetectionBalancer struct {
mu sync.Mutex
addrs map[string]*addressInfo
cfg *LBConfig
scWrappers map[balancer.SubConn]*subConnWrapper
timerStartTime time.Time
intervalTimer *time.Timer
inhibitPickerUpdates bool
Expand Down Expand Up @@ -337,19 +335,9 @@ func (b *outlierDetectionBalancer) ResolverError(err error) {
b.child.resolverError(err)
}

func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
func (b *outlierDetectionBalancer) updateSubConnState(scw *subConnWrapper, state balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
scw, ok := b.scWrappers[sc]
if !ok {
// Shouldn't happen if passed down a SubConnWrapper to child on SubConn
// creation.
b.logger.Errorf("UpdateSubConnState called with SubConn that has no corresponding SubConnWrapper")
return
}
if state.ConnectivityState == connectivity.Shutdown {
delete(b.scWrappers, scw.SubConn)
}
scw.setLatestConnectivityState(state.ConnectivityState)
b.scUpdateCh.Put(&scUpdate{
scw: scw,
Expand Down Expand Up @@ -459,25 +447,23 @@ func (b *outlierDetectionBalancer) UpdateState(s balancer.State) {
}

func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
var sc balancer.SubConn
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) }
sc, err := b.cc.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
scw := &subConnWrapper{
SubConn: sc,
addresses: addrs,
scUpdateCh: b.scUpdateCh,
listener: oldListener,
latestRawConnectivityState: balancer.SubConnState{ConnectivityState: connectivity.Idle},
latestHealthState: balancer.SubConnState{ConnectivityState: connectivity.Connecting},
healthListenerEnabled: len(addrs) == 1 && pickfirstleaf.IsManagedByPickfirst(addrs[0]),
}
opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(scw, state) }
b.mu.Lock()
defer b.mu.Unlock()
b.scWrappers[sc] = scw
sc, err := b.cc.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
scw.SubConn = sc
if len(addrs) != 1 {
return scw, nil
}
Expand Down
9 changes: 6 additions & 3 deletions xds/internal/balancer/outlierdetection/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) {

// Since no addresses are ejected, a SubConn update should forward down
// to the child.
od.updateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{
od.updateSubConnState(scw1.(*subConnWrapper), balancer.SubConnState{
ConnectivityState: connectivity.Connecting,
})

Expand Down Expand Up @@ -1124,6 +1124,9 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) {
if err != nil {
t.Fatalf("picker.Pick failed with error: %v", err)
}
if got, want := pi.SubConn, scw3.(*subConnWrapper).SubConn; got != want {
t.Fatalf("Unexpected SubConn chosen by picker: got %v, want %v", got, want)
}
for c := 0; c < 5; c++ {
pi.Done(balancer.DoneInfo{Err: errors.New("some error")})
}
Expand Down Expand Up @@ -1154,7 +1157,7 @@ func (s) TestEjectUnejectSuccessRate(t *testing.T) {
// that address should not be forwarded downward. These SubConn updates
// will be cached to update the child sometime in the future when the
// address gets unejected.
od.updateSubConnState(pi.SubConn, balancer.SubConnState{
od.updateSubConnState(scw3.(*subConnWrapper), balancer.SubConnState{
ConnectivityState: connectivity.Connecting,
})
sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
Expand Down Expand Up @@ -1566,7 +1569,7 @@ func (s) TestConcurrentOperations(t *testing.T) {

// Call balancer.Balancers synchronously in this goroutine, upholding the
// balancer.Balancer API guarantee.
od.updateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{
od.updateSubConnState(scw1.(*subConnWrapper), balancer.SubConnState{
ConnectivityState: connectivity.Connecting,
})
od.ResolverError(errors.New("some error"))
Expand Down

0 comments on commit 0e60808

Please sign in to comment.