From ba455dee5ec34e2ff6983f0fceb8cd3d9a586e46 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Thu, 23 Jan 2025 22:49:04 +0530 Subject: [PATCH] Add a lazy balancer --- balancer/lazy/lazy.go | 203 +++++++++++++++++++ balancer/lazy/lazy_ext_test.go | 359 +++++++++++++++++++++++++++++++++ 2 files changed, 562 insertions(+) create mode 100644 balancer/lazy/lazy.go create mode 100644 balancer/lazy/lazy_ext_test.go diff --git a/balancer/lazy/lazy.go b/balancer/lazy/lazy.go new file mode 100644 index 000000000000..a7dcaf6b48cb --- /dev/null +++ b/balancer/lazy/lazy.go @@ -0,0 +1,203 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package lazy contains load balancer that starts in IDLE instead of +// CONNECTING. Once it starts connecting, it instantiates its delegate. +// +// # Experimental +// +// Notice: This package is EXPERIMENTAL and may be changed or removed in a +// later release. +package lazy + +import ( + "encoding/json" + "fmt" + "sync" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/balancer/gracefulswitch" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +func init() { + balancer.Register(builder{}) +} + +var ( + // LazyPickfirstConfig is the LB policy config json for a pick_first load + // balancer that is lazily initialized. + LazyPickfirstConfig = fmt.Sprintf("{\"childPolicy\": [{%q: {}}]}", pickfirstleaf.Name) + logger = grpclog.Component("lazy-lb") +) + +const ( + // Name is the name of the lazy balancer. + Name = "lazy" + logPrefix = "[lazy-lb %p] " +) + +type builder struct{} + +func (builder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + b := &lazyBalancer{ + cc: cc, + buildOptions: bOpts, + } + b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b)) + return b +} + +func (builder) Name() string { + return Name +} + +type lazyBalancer struct { + // The following fields are initialized at build time and read-only after + // that and therefore do not need to be guarded by a mutex. + cc balancer.ClientConn + buildOptions balancer.BuildOptions + logger *internalgrpclog.PrefixLogger + + // The following fields are accessed while handling calls to the idlePicker + // and when handling ClientConn state updates. They are guarded by a mutex. + + mu sync.Mutex + delegate balancer.Balancer + latestClientConnState *balancer.ClientConnState + latestResolverError error + updatedClientConnState bool +} + +func (lb *lazyBalancer) Close() { + lb.mu.Lock() + defer lb.mu.Unlock() + if lb.delegate != nil { + lb.delegate.Close() + lb.delegate = nil + } +} + +func (lb *lazyBalancer) ResolverError(err error) { + lb.mu.Lock() + defer lb.mu.Unlock() + if lb.delegate != nil { + lb.delegate.ResolverError(err) + return + } + lb.latestResolverError = err + lb.updateBalancerStateLocked() +} + +func (lb *lazyBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { + lb.mu.Lock() + defer lb.mu.Unlock() + if childLBCfg, ok := ccs.BalancerConfig.(lbCfg); !ok { + lb.logger.Errorf("Got LB config of unexpected type: %v", ccs.BalancerConfig) + ccs.BalancerConfig = nil + } else { + ccs.BalancerConfig = childLBCfg.childLBCfg + } + if lb.delegate != nil { + return lb.delegate.UpdateClientConnState(ccs) + } + + lb.latestClientConnState = &ccs + lb.latestResolverError = nil + lb.updateBalancerStateLocked() + return nil +} + +// UpdateSubConnState implements balancer.Balancer. +func (lb *lazyBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) { + // UpdateSubConnState is deprecated. +} + +func (lb *lazyBalancer) ExitIdle() { + lb.mu.Lock() + defer lb.mu.Unlock() + if lb.delegate != nil { + if d, ok := lb.delegate.(balancer.ExitIdler); ok { + d.ExitIdle() + } + return + } + lb.delegate = gracefulswitch.NewBalancer(lb.cc, lb.buildOptions) + if lb.latestClientConnState != nil { + if err := lb.delegate.UpdateClientConnState(*lb.latestClientConnState); err != nil { + if err == balancer.ErrBadResolverState { + lb.cc.ResolveNow(resolver.ResolveNowOptions{}) + } else { + lb.logger.Warningf("Error from child policy on receiving initial state: %v", err) + } + } + lb.latestClientConnState = nil + } + if lb.latestResolverError != nil { + lb.delegate.ResolverError(lb.latestResolverError) + lb.latestResolverError = nil + } +} + +func (lb *lazyBalancer) updateBalancerStateLocked() { + // optimization to avoid extra picker updates. + if lb.updatedClientConnState { + return + } + lb.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.Idle, + Picker: &idlePicker{exitIdle: sync.OnceFunc(lb.ExitIdle)}, + }) + lb.updatedClientConnState = true +} + +type lbCfg struct { + serviceconfig.LoadBalancingConfig + childLBCfg serviceconfig.LoadBalancingConfig +} + +func (b builder) ParseConfig(lbConfig json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + jsonReprsentation := &struct { + ChildPolicy json.RawMessage + }{} + if err := json.Unmarshal(lbConfig, jsonReprsentation); err != nil { + return nil, err + } + childCfg, err := gracefulswitch.ParseConfig(jsonReprsentation.ChildPolicy) + if err != nil { + return nil, err + } + return lbCfg{childLBCfg: childCfg}, nil +} + +// idlePicker is used when the SubConn is IDLE and kicks the SubConn into +// CONNECTING when Pick is called. +type idlePicker struct { + exitIdle func() +} + +func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + i.exitIdle() + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable +} diff --git a/balancer/lazy/lazy_ext_test.go b/balancer/lazy/lazy_ext_test.go new file mode 100644 index 000000000000..d6f1c38f349b --- /dev/null +++ b/balancer/lazy/lazy_ext_test.go @@ -0,0 +1,359 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lazy_test + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/lazy" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +const ( + // Default timeout for tests in this package. + defaultTestTimeout = 10 * time.Second + // Default short timeout, to be used when waiting for events which are not + // expected to happen. + defaultTestShortTimeout = 100 * time.Millisecond +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// TestExitIdle creates a lazy balancer than manages a pickfirst child. The test +// calls Connect() on the channel which in turn calls ExitIdle on the lazy +// balancer. The test verifies that the channel enters READY. +func (s) TestExitIdle(t *testing.T) { + backend := stubserver.StartTestService(t, nil) + defer backend.Stop() + + json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s": %s}]}`, lazy.Name, lazy.LazyPickfirstConfig) + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(json), + } + cc, err := grpc.NewClient(backend.Address, opts...) + if err != nil { + t.Fatalf("grpc.NewClient(_) failed: %v", err) + + } + defer cc.Close() + + cc.Connect() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testutils.AwaitState(ctx, t, cc, connectivity.Ready) +} + +// TestPicker creates a lazy balancer under a stub balancer which block all +// calls to ExitIdle. This ensures the only way to trigger lazy to exit idle is +// through the picker. The test makes an RPC and ensures it succeeds. +func (s) TestPicker(t *testing.T) { + backend := stubserver.StartTestService(t, nil) + defer backend.Stop() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + lazyCfg, err := balancer.Get(lazy.Name).(balancer.ConfigParser).ParseConfig(json.RawMessage(lazy.LazyPickfirstConfig)) + if err != nil { + t.Fatalf("Failed to parse service config: %v", err) + } + + bf := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + bd.Data = balancer.Get(lazy.Name).Build(bd.ClientConn, bd.BuildOptions) + }, + ExitIdle: func(bd *stub.BalancerData) { + t.Log("Ignoring call to ExitIdle, calling the picker should make the lazy balancer exit IDLE state.") + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + return bd.Data.(balancer.Balancer).UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: lazyCfg, + ResolverState: ccs.ResolverState, + }) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + } + + name := strings.ReplaceAll(strings.ToLower(t.Name()), "/", "") + stub.Register(name, bf) + json := fmt.Sprintf(`{"loadBalancingConfig": [{%q: {}}]}`, name) + + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(json), + } + cc, err := grpc.NewClient(backend.Address, opts...) + if err != nil { + t.Fatalf("grpc.NewClient(_) failed: %v", err) + } + defer cc.Close() + + // The channel should remain in IDLE as the ExitIdle calls are not + // propagated to the lazy balancer from the stub balancer. + shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer shortCancel() + testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Idle) + + // The picker from the lazy balancer should be send to the channel when the + // first resolver update is received by lazy. Making an RPC should trigger + // child creation. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Errorf("client.EmptyCall() returned unexpected error: %v", err) + } +} + +// Tests the scenario when a resolver produces a good state followed by a +// resolver error. The test verifies that the child balancer receives the good +// update followed by the error. +func (s) TestGoodUpdateThenResolverError(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + backend := stubserver.StartTestService(t, nil) + defer backend.Stop() + resolverStateReceived := false + resolverErrorReceived := false + + childBF := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + bd.Data = balancer.Get(pickfirstleaf.Name).Build(bd.ClientConn, bd.BuildOptions) + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + if resolverErrorReceived { + t.Error("Received resolver error before resolver state.") + } + resolverStateReceived = true + return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) + }, + ResolverError: func(bd *stub.BalancerData, err error) { + if !resolverStateReceived { + t.Error("Received resolver error before resolver state.") + } + resolverErrorReceived = true + bd.Data.(balancer.Balancer).ResolverError(err) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + } + + childBalName := strings.ReplaceAll(strings.ToLower(t.Name())+"_child", "/", "") + stub.Register(childBalName, childBF) + + lazyCfgJSON := fmt.Sprintf("{\"childPolicy\": [{%q: {}}]}", childBalName) + lazyCfg, err := balancer.Get(lazy.Name).(balancer.ConfigParser).ParseConfig(json.RawMessage(lazyCfgJSON)) + if err != nil { + t.Fatalf("Failed to parse service config: %v", err) + } + topLevelBF := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + bd.Data = balancer.Get(lazy.Name).Build(bd.ClientConn, bd.BuildOptions) + }, + ExitIdle: func(bd *stub.BalancerData) { + t.Log("Ignoring call to ExitIdle to delay lazy child creation till RPC time.") + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + return bd.Data.(balancer.Balancer).UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: lazyCfg, + ResolverState: ccs.ResolverState, + }) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + } + + topLevelBalName := strings.ReplaceAll(strings.ToLower(t.Name())+"_top_level", "/", "") + stub.Register(topLevelBalName, topLevelBF) + + json := fmt.Sprintf(`{"loadBalancingConfig": [{%q: {}}]}`, topLevelBalName) + + mr := manual.NewBuilderWithScheme("e2e-test") + defer mr.Close() + + mr.InitialState(resolver.State{ + Endpoints: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: backend.Address}}}, + }, + }) + + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(mr), + grpc.WithDefaultServiceConfig(json), + } + cc, err := grpc.NewClient(mr.Scheme()+":///whatever", opts...) + if err != nil { + t.Fatalf("grpc.NewClient(_) failed: %v", err) + + } + + defer cc.Close() + cc.Connect() + + mr.ReportError(errors.New("test error")) + // The channel should remain in IDLE as the ExitIdle calls are not + // propagated to the lazy balancer from the stub balancer. + shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer shortCancel() + testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Idle) + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Errorf("client.EmptyCall() returned unexpected error: %v", err) + } + + if !resolverStateReceived { + t.Fatalf("Child balancer did not receive resolver state.") + } +} + +// Tests the scenario when a resolver produces an error followed by a resolver +// error. The test verifies that the child balancer receives only the good +// update. +func (s) TestResolverErrorThenGoodUpdate(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + backend := stubserver.StartTestService(t, nil) + defer backend.Stop() + resolverStateReceived := false + + childBF := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + bd.Data = balancer.Get(pickfirstleaf.Name).Build(bd.ClientConn, bd.BuildOptions) + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + resolverStateReceived = true + return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) + }, + ResolverError: func(bd *stub.BalancerData, err error) { + t.Error("Received unexpected resolver error.") + bd.Data.(balancer.Balancer).ResolverError(err) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + } + + childBalName := strings.ReplaceAll(strings.ToLower(t.Name())+"_child", "/", "") + stub.Register(childBalName, childBF) + + lazyCfgJSON := fmt.Sprintf("{\"childPolicy\": [{%q: {}}]}", childBalName) + lazyCfg, err := balancer.Get(lazy.Name).(balancer.ConfigParser).ParseConfig(json.RawMessage(lazyCfgJSON)) + if err != nil { + t.Fatalf("Failed to parse service config: %v", err) + } + topLevelBF := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + bd.Data = balancer.Get(lazy.Name).Build(bd.ClientConn, bd.BuildOptions) + }, + ExitIdle: func(bd *stub.BalancerData) { + t.Log("Ignoring call to ExitIdle to delay lazy child creation till RPC time.") + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + return bd.Data.(balancer.Balancer).UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: lazyCfg, + ResolverState: ccs.ResolverState, + }) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + } + + topLevelBalName := strings.ReplaceAll(strings.ToLower(t.Name())+"_top_level", "/", "") + stub.Register(topLevelBalName, topLevelBF) + + json := fmt.Sprintf(`{"loadBalancingConfig": [{%q: {}}]}`, topLevelBalName) + + mr := manual.NewBuilderWithScheme("e2e-test") + defer mr.Close() + + mr.InitialState(resolver.State{ + Endpoints: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: backend.Address}}}, + }, + }) + + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(mr), + grpc.WithDefaultServiceConfig(json), + } + cc, err := grpc.NewClient(mr.Scheme()+":///whatever", opts...) + if err != nil { + t.Fatalf("grpc.NewClient(_) failed: %v", err) + + } + + defer cc.Close() + cc.Connect() + + // Send an error followed by a good update. + mr.ReportError(errors.New("test error")) + mr.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: backend.Address}}}, + }, + }) + + // The channel should remain in IDLE as the ExitIdle calls are not + // propagated to the lazy balancer from the stub balancer. + shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer shortCancel() + testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Idle) + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Errorf("client.EmptyCall() returned unexpected error: %v", err) + } + + if !resolverStateReceived { + t.Fatalf("Child balancer did not receive resolver state.") + } +}