From cda495f5093cc917d80f25686c2a16dffcbe85dd Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Mon, 2 Sep 2019 22:54:31 +0800 Subject: [PATCH] Fix data race (#16742) * remove * fix race * rename * fix comment * ignore err, which can not happen --- pilot/pkg/model/context.go | 4 +- pilot/pkg/networking/core/v1alpha3/gateway.go | 4 +- .../networking/core/v1alpha3/gateway_test.go | 2 +- .../pkg/networking/core/v1alpha3/httproute.go | 10 ++--- .../core/v1alpha3/httproute_test.go | 2 +- .../networking/core/v1alpha3/route/route.go | 17 ++++---- .../core/v1alpha3/route/route_test.go | 19 +++++---- pilot/pkg/proxy/envoy/v2/ads.go | 33 ++++++++++----- pilot/pkg/proxy/envoy/v2/discovery.go | 2 +- pilot/pkg/proxy/envoy/v2/eds.go | 40 ++++++++++--------- 10 files changed, 74 insertions(+), 59 deletions(-) diff --git a/pilot/pkg/model/context.go b/pilot/pkg/model/context.go index b575dfe49258..c8ef47ec23cd 100644 --- a/pilot/pkg/model/context.go +++ b/pilot/pkg/model/context.go @@ -266,7 +266,9 @@ func (node *Proxy) SetServiceInstances(env *Environment) error { return nil } -func (node *Proxy) SetWorkloadLabels(env *Environment) error { +// SetWorkloadLabels will reset the proxy.WorkloadLabels if `force` = true, +// otherwise only set it when it is nil. +func (node *Proxy) SetWorkloadLabels(env *Environment, force bool) error { // The WorkloadLabels is already parsed from Node metadata["LABELS"] // Or updated in DiscoveryServer.WorkloadUpdate. if node.WorkloadLabels != nil { diff --git a/pilot/pkg/networking/core/v1alpha3/gateway.go b/pilot/pkg/networking/core/v1alpha3/gateway.go index 6623f6749990..8ace2b2a73b2 100644 --- a/pilot/pkg/networking/core/v1alpha3/gateway.go +++ b/pilot/pkg/networking/core/v1alpha3/gateway.go @@ -207,7 +207,7 @@ func (configgen *ConfigGeneratorImpl) buildGatewayListeners( } func (configgen *ConfigGeneratorImpl) buildGatewayHTTPRouteConfig(env *model.Environment, node *model.Proxy, push *model.PushContext, - _ []*model.ServiceInstance, routeName string) *xdsapi.RouteConfiguration { + routeName string) *xdsapi.RouteConfiguration { services := push.Services(node) @@ -252,7 +252,7 @@ func (configgen *ConfigGeneratorImpl) buildGatewayHTTPRouteConfig(env *model.Env continue } - routes, err := istio_route.BuildHTTPRoutesForVirtualService(node, push, virtualService, nameToServiceMap, port, nil, map[string]bool{gatewayName: true}) + routes, err := istio_route.BuildHTTPRoutesForVirtualService(node, push, virtualService, nameToServiceMap, port, map[string]bool{gatewayName: true}) if err != nil { log.Debugf("%s omitting routes for service %v due to error: %v", node.ID, virtualService, err) continue diff --git a/pilot/pkg/networking/core/v1alpha3/gateway_test.go b/pilot/pkg/networking/core/v1alpha3/gateway_test.go index 4d75eb56f944..b918366bc605 100644 --- a/pilot/pkg/networking/core/v1alpha3/gateway_test.go +++ b/pilot/pkg/networking/core/v1alpha3/gateway_test.go @@ -946,7 +946,7 @@ func TestGatewayHTTPRouteConfig(t *testing.T) { configgen := NewConfigGenerator([]plugin.Plugin{p}) env := buildEnv(t, tt.gateways, tt.virtualServices) proxy13Gateway.SetGatewaysForProxy(env.PushContext) - route := configgen.buildGatewayHTTPRouteConfig(&env, &proxy13Gateway, env.PushContext, proxyInstances, tt.routeName) + route := configgen.buildGatewayHTTPRouteConfig(&env, &proxy13Gateway, env.PushContext, tt.routeName) if route == nil { t.Fatal("got an empty route configuration") } diff --git a/pilot/pkg/networking/core/v1alpha3/httproute.go b/pilot/pkg/networking/core/v1alpha3/httproute.go index 1f5d12a17e68..aa84fe2421f9 100644 --- a/pilot/pkg/networking/core/v1alpha3/httproute.go +++ b/pilot/pkg/networking/core/v1alpha3/httproute.go @@ -40,18 +40,16 @@ import ( // BuildHTTPRoutes produces a list of routes for the proxy func (configgen *ConfigGeneratorImpl) BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext, routeName string) *xdsapi.RouteConfiguration { - // TODO: Move all this out - proxyInstances := node.ServiceInstances var rc *xdsapi.RouteConfiguration switch node.Type { case model.SidecarProxy: - rc = configgen.buildSidecarOutboundHTTPRouteConfig(env, node, push, proxyInstances, routeName) + rc = configgen.buildSidecarOutboundHTTPRouteConfig(env, node, push, routeName) if rc != nil { rc = envoyfilter.ApplyRouteConfigurationPatches(networking.EnvoyFilter_SIDECAR_OUTBOUND, node, push, rc) } return rc case model.Router: - rc = configgen.buildGatewayHTTPRouteConfig(env, node, push, proxyInstances, routeName) + rc = configgen.buildGatewayHTTPRouteConfig(env, node, push, routeName) if rc != nil { rc = envoyfilter.ApplyRouteConfigurationPatches(networking.EnvoyFilter_GATEWAY, node, push, rc) } @@ -101,7 +99,7 @@ func (configgen *ConfigGeneratorImpl) buildSidecarInboundHTTPRouteConfig(env *mo // buildSidecarOutboundHTTPRouteConfig builds an outbound HTTP Route for sidecar. // Based on port, will determine all virtual hosts that listen on the port. func (configgen *ConfigGeneratorImpl) buildSidecarOutboundHTTPRouteConfig(env *model.Environment, node *model.Proxy, push *model.PushContext, - _ []*model.ServiceInstance, routeName string) *xdsapi.RouteConfiguration { + routeName string) *xdsapi.RouteConfiguration { listenerPort := 0 var err error @@ -159,7 +157,7 @@ func (configgen *ConfigGeneratorImpl) buildSidecarOutboundHTTPRouteConfig(env *m // Get list of virtual services bound to the mesh gateway virtualHostWrappers := istio_route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, nameToServiceMap, - node.WorkloadLabels, virtualServices, listenerPort) + virtualServices, listenerPort) vHostPortMap := make(map[int][]*route.VirtualHost) uniques := make(map[string]struct{}) for _, virtualHostWrapper := range virtualHostWrappers { diff --git a/pilot/pkg/networking/core/v1alpha3/httproute_test.go b/pilot/pkg/networking/core/v1alpha3/httproute_test.go index b9daa5f6d8cb..079645d9a848 100644 --- a/pilot/pkg/networking/core/v1alpha3/httproute_test.go +++ b/pilot/pkg/networking/core/v1alpha3/httproute_test.go @@ -578,7 +578,7 @@ func testSidecarRDSVHosts(t *testing.T, services []*model.Service, _ = os.Setenv("PILOT_ENABLE_FALLTHROUGH_ROUTE", "1") } - route := configgen.buildSidecarOutboundHTTPRouteConfig(&env, &proxy, env.PushContext, proxyInstances, routeName) + route := configgen.buildSidecarOutboundHTTPRouteConfig(&env, &proxy, env.PushContext, routeName) if route == nil { t.Fatalf("got nil route for %s", routeName) } diff --git a/pilot/pkg/networking/core/v1alpha3/route/route.go b/pilot/pkg/networking/core/v1alpha3/route/route.go index 38a201f9f82b..5f369f61471e 100644 --- a/pilot/pkg/networking/core/v1alpha3/route/route.go +++ b/pilot/pkg/networking/core/v1alpha3/route/route.go @@ -80,14 +80,14 @@ func BuildSidecarVirtualHostsFromConfigAndRegistry( node *model.Proxy, push *model.PushContext, serviceRegistry map[host.Name]*model.Service, - proxyLabels labels.Collection, - virtualServices []model.Config, listenPort int) []VirtualHostWrapper { + virtualServices []model.Config, + listenPort int) []VirtualHostWrapper { out := make([]VirtualHostWrapper, 0) // translate all virtual service configs into virtual hosts for _, virtualService := range virtualServices { - wrappers := buildSidecarVirtualHostsForVirtualService(node, push, virtualService, serviceRegistry, proxyLabels, listenPort) + wrappers := buildSidecarVirtualHostsForVirtualService(node, push, virtualService, serviceRegistry, listenPort) if len(wrappers) == 0 { // If none of the routes matched by source (i.e. proxyLabels), then discard this entire virtual service continue @@ -168,7 +168,6 @@ func buildSidecarVirtualHostsForVirtualService( push *model.PushContext, virtualService model.Config, serviceRegistry map[host.Name]*model.Service, - proxyLabels labels.Collection, listenPort int) []VirtualHostWrapper { hosts, servicesInVirtualService := separateVSHostsAndServices(virtualService, serviceRegistry) @@ -198,7 +197,7 @@ func buildSidecarVirtualHostsForVirtualService( meshGateway := map[string]bool{constants.IstioMeshGateway: true} out := make([]VirtualHostWrapper, 0, len(serviceByPort)) for port, portServices := range serviceByPort { - routes, err := BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, listenPort, proxyLabels, meshGateway) + routes, err := BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, listenPort, meshGateway) if err != nil || len(routes) == 0 { continue } @@ -253,7 +252,6 @@ func BuildHTTPRoutesForVirtualService( virtualService model.Config, serviceRegistry map[host.Name]*model.Service, listenPort int, - proxyLabels labels.Collection, gatewayNames map[string]bool) ([]*route.Route, error) { vs, ok := virtualService.Spec.(*networking.VirtualService) @@ -265,13 +263,13 @@ func BuildHTTPRoutesForVirtualService( allroutes: for _, http := range vs.Http { if len(http.Match) == 0 { - if r := translateRoute(push, node, http, nil, listenPort, virtualService, serviceRegistry, proxyLabels, gatewayNames); r != nil { + if r := translateRoute(push, node, http, nil, listenPort, virtualService, serviceRegistry, gatewayNames); r != nil { out = append(out, r) } break allroutes // we have a rule with catch all match prefix: /. Other rules are of no use } else { for _, match := range http.Match { - if r := translateRoute(push, node, http, match, listenPort, virtualService, serviceRegistry, proxyLabels, gatewayNames); r != nil { + if r := translateRoute(push, node, http, match, listenPort, virtualService, serviceRegistry, gatewayNames); r != nil { out = append(out, r) rType, _ := getEnvoyRouteTypeAndVal(r) if rType == envoyCatchAll { @@ -315,14 +313,13 @@ func translateRoute(push *model.PushContext, node *model.Proxy, in *networking.H match *networking.HTTPMatchRequest, port int, virtualService model.Config, serviceRegistry map[host.Name]*model.Service, - proxyLabels labels.Collection, gatewayNames map[string]bool) *route.Route { // When building routes, its okay if the target cluster cannot be // resolved Traffic to such clusters will blackhole. // Match by source labels/gateway names inside the match condition - if !sourceMatchHTTP(match, proxyLabels, gatewayNames) { + if !sourceMatchHTTP(match, node.WorkloadLabels, gatewayNames) { return nil } diff --git a/pilot/pkg/networking/core/v1alpha3/route/route_test.go b/pilot/pkg/networking/core/v1alpha3/route/route_test.go index d5685c913d94..5b44fc4abfd8 100644 --- a/pilot/pkg/networking/core/v1alpha3/route/route_test.go +++ b/pilot/pkg/networking/core/v1alpha3/route/route_test.go @@ -27,7 +27,6 @@ import ( "istio.io/istio/pilot/pkg/model" "istio.io/istio/pilot/pkg/networking/core/v1alpha3/route" "istio.io/istio/pkg/config/host" - "istio.io/istio/pkg/config/labels" "istio.io/istio/pkg/config/mesh" "istio.io/istio/pkg/config/protocol" "istio.io/istio/pkg/config/schemas" @@ -62,7 +61,7 @@ func TestBuildHTTPRoutes(t *testing.T) { t.Run("for virtual service", func(t *testing.T) { g := gomega.NewGomegaWithT(t) - routes, err := route.BuildHTTPRoutesForVirtualService(node, nil, virtualServicePlain, serviceRegistry, 8080, labels.Collection{}, gatewayNames) + routes, err := route.BuildHTTPRoutesForVirtualService(node, nil, virtualServicePlain, serviceRegistry, 8080, gatewayNames) g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(len(routes)).To(gomega.Equal(1)) }) @@ -104,7 +103,7 @@ func TestBuildHTTPRoutes(t *testing.T) { }, }) - routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualServicePlain, serviceRegistry, 8080, labels.Collection{}, gatewayNames) + routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualServicePlain, serviceRegistry, 8080, gatewayNames) g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(len(routes)).To(gomega.Equal(1)) @@ -151,7 +150,7 @@ func TestBuildHTTPRoutes(t *testing.T) { }, }) - routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, labels.Collection{}, gatewayNames) + routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, gatewayNames) g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(len(routes)).To(gomega.Equal(1)) @@ -194,7 +193,7 @@ func TestBuildHTTPRoutes(t *testing.T) { Spec: portLevelDestinationRuleWithSubsetPolicy, }}) - routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, labels.Collection{}, gatewayNames) + routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, gatewayNames) g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(len(routes)).To(gomega.Equal(1)) @@ -242,7 +241,7 @@ func TestBuildHTTPRoutes(t *testing.T) { push.SetDestinationRules([]model.Config{ cnfg}) - routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, labels.Collection{}, gatewayNames) + routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, gatewayNames) g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(len(routes)).To(gomega.Equal(1)) @@ -278,7 +277,7 @@ func TestBuildHTTPRoutes(t *testing.T) { }}) gatewayNames := map[string]bool{"some-gateway": true} - routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualServicePlain, serviceRegistry, 8080, labels.Collection{}, gatewayNames) + routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualServicePlain, serviceRegistry, 8080, gatewayNames) g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(len(routes)).To(gomega.Equal(1)) @@ -297,7 +296,7 @@ func TestBuildHTTPRoutes(t *testing.T) { g := gomega.NewGomegaWithT(t) routes, err := route.BuildHTTPRoutesForVirtualService(node, nil, virtualServiceWithRedirect, - serviceRegistry, 8080, labels.Collection{}, gatewayNames) + serviceRegistry, 8080, gatewayNames) g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(len(routes)).To(gomega.Equal(1)) @@ -322,7 +321,7 @@ func TestBuildHTTPRoutes(t *testing.T) { }, Spec: networkingDestinationRule, }}) - vhosts := route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, serviceRegistry, labels.Collection{}, []model.Config{}, 8080) + vhosts := route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, serviceRegistry, []model.Config{}, 8080) g.Expect(vhosts[0].Routes[0].Action.(*envoyroute.Route_Route).Route.HashPolicy).NotTo(gomega.BeNil()) }) t.Run("for no virtualservice but has destinationrule with portLevel consistentHash loadbalancer", func(t *testing.T) { @@ -343,7 +342,7 @@ func TestBuildHTTPRoutes(t *testing.T) { Spec: networkingDestinationRuleWithPortLevelTrafficPolicy, }}) - vhosts := route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, serviceRegistry, labels.Collection{}, []model.Config{}, 8080) + vhosts := route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, serviceRegistry, []model.Config{}, 8080) hashPolicy := &envoyroute.RouteAction_HashPolicy{ PolicySpecifier: &envoyroute.RouteAction_HashPolicy_Cookie_{ diff --git a/pilot/pkg/proxy/envoy/v2/ads.go b/pilot/pkg/proxy/envoy/v2/ads.go index ab4be44ae420..56f14e492102 100644 --- a/pilot/pkg/proxy/envoy/v2/ads.go +++ b/pilot/pkg/proxy/envoy/v2/ads.go @@ -84,6 +84,10 @@ type XdsConnection struct { // same info can be sent to all clients, without recomputing. pushChannel chan *XdsEvent + // Sending on this channel results in a reset on proxy attributes. + // Generally it comes before a XdsEvent. + updateChannel chan *UpdateEvent + // TODO: migrate other fields as needed from model.Proxy and replace it //HttpConnectionManagers map[string]*http_conn.HttpConnectionManager @@ -187,15 +191,22 @@ type XdsEvent struct { done func() } +// UpdateEvent represents a update request for the proxy. +// This will trigger proxy specific attributes reset before each push. +type UpdateEvent struct { + workloadLabel bool +} + func newXdsConnection(peerAddr string, stream DiscoveryStream) *XdsConnection { return &XdsConnection{ - pushChannel: make(chan *XdsEvent), - PeerAddr: peerAddr, - Clusters: []string{}, - Connect: time.Now(), - stream: stream, - LDSListeners: []*xdsapi.Listener{}, - RouteConfigs: map[string]*xdsapi.RouteConfiguration{}, + pushChannel: make(chan *XdsEvent), + updateChannel: make(chan *UpdateEvent, 1), + PeerAddr: peerAddr, + Clusters: []string{}, + Connect: time.Now(), + stream: stream, + LDSListeners: []*xdsapi.Listener{}, + RouteConfigs: map[string]*xdsapi.RouteConfiguration{}, } } @@ -444,6 +455,10 @@ func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscove } else { con.mu.Unlock() } + case updateEv := <-con.updateChannel: + if updateEv.workloadLabel && con.modelNode != nil { + _ = con.modelNode.SetWorkloadLabels(s.Env, true) + } case pushEv := <-con.pushChannel: // It is called when config changes. // This is not optimized yet - we should detect what changed based on event and only @@ -500,7 +515,7 @@ func (s *DiscoveryServer) initConnectionNode(discReq *xdsapi.DiscoveryRequest, c nt.Locality = discReq.Node.Locality } - if err := nt.SetWorkloadLabels(s.Env); err != nil { + if err := nt.SetWorkloadLabels(s.Env, false); err != nil { return err } @@ -544,7 +559,7 @@ func (s *DiscoveryServer) pushConnection(con *XdsConnection, pushEv *XdsEvent) e return nil } - if err := con.modelNode.SetWorkloadLabels(s.Env); err != nil { + if err := con.modelNode.SetWorkloadLabels(s.Env, false); err != nil { return err } diff --git a/pilot/pkg/proxy/envoy/v2/discovery.go b/pilot/pkg/proxy/envoy/v2/discovery.go index 4d00677f46a4..0071baf2793a 100644 --- a/pilot/pkg/proxy/envoy/v2/discovery.go +++ b/pilot/pkg/proxy/envoy/v2/discovery.go @@ -109,7 +109,7 @@ type DiscoveryServer struct { // WorkloadsById keeps track of information about a workload, based on direct notifications // from registry. This acts as a cache and allows detecting changes. - // clusterID => workloadID => Workload + // clusterID => workload ip => Workload WorkloadsByID map[string]map[string]*Workload pushChannel chan *model.PushRequest diff --git a/pilot/pkg/proxy/envoy/v2/eds.go b/pilot/pkg/proxy/envoy/v2/eds.go index 5abe79998638..22ceed840573 100644 --- a/pilot/pkg/proxy/envoy/v2/eds.go +++ b/pilot/pkg/proxy/envoy/v2/eds.go @@ -430,27 +430,27 @@ func (s *DiscoveryServer) edsIncremental(version string, push *model.PushContext } // WorkloadUpdate is called when workload labels/annotations are updated. -func (s *DiscoveryServer) WorkloadUpdate(shard, id string, workloadLabels map[string]string, _ map[string]string) { +func (s *DiscoveryServer) WorkloadUpdate(clusterID, ip string, workloadLabels map[string]string, _ map[string]string) { inboundWorkloadUpdates.Increment() s.mutex.Lock() defer s.mutex.Unlock() if workloadLabels == nil { // No push needed - the Endpoints object will also be triggered. - delete(s.WorkloadsByID[shard], id) + delete(s.WorkloadsByID[clusterID], ip) return } - _, f := s.WorkloadsByID[shard] + _, f := s.WorkloadsByID[clusterID] if !f { - s.WorkloadsByID[shard] = map[string]*Workload{id: {Labels: workloadLabels}} - s.sendPushRequestTo(id) + s.WorkloadsByID[clusterID] = map[string]*Workload{ip: {Labels: workloadLabels}} + s.sendPushRequestTo(ip) return } - w, ok := s.WorkloadsByID[shard][id] + w, ok := s.WorkloadsByID[clusterID][ip] if !ok { - s.WorkloadsByID[shard][id] = &Workload{Labels: workloadLabels} - s.sendPushRequestTo(id) + s.WorkloadsByID[clusterID][ip] = &Workload{Labels: workloadLabels} + s.sendPushRequestTo(ip) return } @@ -464,9 +464,13 @@ func (s *DiscoveryServer) WorkloadUpdate(shard, id string, workloadLabels map[st adsClientsMutex.RLock() for _, connection := range adsClients { // update node label - if connection.modelNode.IPAddresses[0] == id { - // set to nil, trigger re-set in SetWorkloadLabels - connection.modelNode.WorkloadLabels = nil + if connection.modelNode.IPAddresses[0] == ip { + select { + // trigger re-set in SetWorkloadLabels + case connection.updateChannel <- &UpdateEvent{workloadLabel: true}: + default: + adsLog.Infof("A workload %s label update request is ongoing", ip) + } } } adsClientsMutex.RUnlock() @@ -475,7 +479,7 @@ func (s *DiscoveryServer) WorkloadUpdate(shard, id string, workloadLabels map[st // TODO: we can do a push for the affected workload only, but we need to confirm // no other workload can be affected. Safer option is to fallback to full push. - adsLog.Infof("Label change, full push %s ", id) + adsLog.Infof("Label change, full push %s", ip) s.ConfigUpdate(&model.PushRequest{Full: true}) } @@ -507,15 +511,15 @@ func (s *DiscoveryServer) sendPushRequestTo(ip string) { // It replaces InstancesByPort in model - instead of iterating over all endpoints it uses // the hostname-keyed map. And it avoids the conversion from Endpoint to ServiceEntry to envoy // on each step: instead the conversion happens once, when an endpoint is first discovered. -func (s *DiscoveryServer) EDSUpdate(shard, serviceName string, namespace string, istioEndpoints []*model.IstioEndpoint) error { +func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string, istioEndpoints []*model.IstioEndpoint) error { inboundEDSUpdates.Increment() - s.edsUpdate(shard, serviceName, namespace, istioEndpoints, false) + s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints, false) return nil } -// edsUpdate updates edsUpdates by shard, serviceName, IstioEndpoints, +// edsUpdate updates edsUpdates by clusterID, serviceName, IstioEndpoints, // and requests a full/eds push. -func (s *DiscoveryServer) edsUpdate(shard, serviceName string, namespace string, +func (s *DiscoveryServer) edsUpdate(clusterID, serviceName string, namespace string, istioEndpoints []*model.IstioEndpoint, internal bool) { // edsShardUpdate replaces a subset (shard) of endpoints, as result of an incremental // update. The endpoint updates may be grouped by K8S clusters, other service registries @@ -530,7 +534,7 @@ func (s *DiscoveryServer) edsUpdate(shard, serviceName string, namespace string, if len(istioEndpoints) == 0 { if s.EndpointShardsByService[serviceName][namespace] != nil { s.EndpointShardsByService[serviceName][namespace].mutex.Lock() - delete(s.EndpointShardsByService[serviceName][namespace].Shards, shard) + delete(s.EndpointShardsByService[serviceName][namespace].Shards, clusterID) svcShards := len(s.EndpointShardsByService[serviceName][namespace].Shards) s.EndpointShardsByService[serviceName][namespace].mutex.Unlock() if svcShards == 0 { @@ -582,7 +586,7 @@ func (s *DiscoveryServer) edsUpdate(shard, serviceName string, namespace string, } } ep.mutex.Lock() - ep.Shards[shard] = istioEndpoints + ep.Shards[clusterID] = istioEndpoints ep.mutex.Unlock() // for internal update: this called by DiscoveryServer.Push --> updateServiceShards,