Skip to content

Commit

Permalink
Multi-network shared control plane workload labels update (istio#16501)
Browse files Browse the repository at this point in the history
* fix single control plane multi-network model workload labels update

* fix
  • Loading branch information
hzxuzhonghu authored and istio-testing committed Aug 31, 2019
1 parent 802bd75 commit 4560159
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pilot/pkg/model/push_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type XDSUpdater interface {
// In future it will include the 'network id' for pods in a different network, behind a zvpn gate.
// The IP is used because K8S Endpoints object associated with a Service only include the IP.
// We use Endpoints to track the membership to a service and readiness.
WorkloadUpdate(id string, labels map[string]string, annotations map[string]string)
WorkloadUpdate(shard, id string, labels map[string]string, annotations map[string]string)

// ConfigUpdate is called to notify the XDS server of config updates and request a push.
// The requests may be collapsed and throttled.
Expand Down
5 changes: 3 additions & 2 deletions pilot/pkg/proxy/envoy/v2/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ type DiscoveryServer struct {

// WorkloadsById keeps track of information about a workload, based on direct notifications
// from registry. This acts as a cache and allows detecting changes.
WorkloadsByID map[string]*Workload
// clusterID => workloadID => Workload
WorkloadsByID map[string]map[string]*Workload

pushChannel chan *model.PushRequest

Expand Down Expand Up @@ -160,7 +161,7 @@ func NewDiscoveryServer(
ConfigController: configCache,
KubeController: kubeController,
EndpointShardsByService: map[string]map[string]*EndpointShards{},
WorkloadsByID: map[string]*Workload{},
WorkloadsByID: map[string]map[string]*Workload{},
concurrentPushLimit: make(chan struct{}, features.PushThrottle),
pushChannel: make(chan *model.PushRequest, 10),
pushQueue: NewPushQueue(),
Expand Down
65 changes: 38 additions & 27 deletions pilot/pkg/proxy/envoy/v2/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,55 +430,43 @@ func (s *DiscoveryServer) edsIncremental(version string, push *model.PushContext
}

// WorkloadUpdate is called when workload labels/annotations are updated.
func (s *DiscoveryServer) WorkloadUpdate(id string, workloadLabels map[string]string, _ map[string]string) {
func (s *DiscoveryServer) WorkloadUpdate(shard, id string, workloadLabels map[string]string, _ map[string]string) {
inboundWorkloadUpdates.Increment()
s.mutex.Lock()
defer s.mutex.Unlock()
if workloadLabels == nil {
// No push needed - the Endpoints object will also be triggered.
delete(s.WorkloadsByID, id)
delete(s.WorkloadsByID[shard], id)
return
}
w, f := s.WorkloadsByID[id]
if !f {
s.WorkloadsByID[id] = &Workload{
Labels: workloadLabels,
}

adsClientsMutex.RLock()
for _, connection := range adsClients {
// if the workload has envoy proxy and connected to server,
// then do a full xDS push for this proxy;
// otherwise:
// case 1: the workload has no sidecar proxy, no need xDS push at all.
// case 2: the workload xDS connection has not been established,
// also no need to trigger a full push here.
if connection.modelNode.IPAddresses[0] == id {
// There is a possibility that the pod comes up later than endpoint.
// So no endpoints add/update events after this, we should request
// full push immediately to speed up sidecar startup.
s.pushQueue.Enqueue(connection, &model.PushRequest{Full: true, Push: s.globalPushContext(), Start: time.Now()})
break
}
}
adsClientsMutex.RUnlock()
_, f := s.WorkloadsByID[shard]
if !f {
s.WorkloadsByID[shard] = map[string]*Workload{id: {Labels: workloadLabels}}
s.sendPushRequestTo(id)
return
}

w, ok := s.WorkloadsByID[shard][id]
if !ok {
s.WorkloadsByID[shard][id] = &Workload{Labels: workloadLabels}
s.sendPushRequestTo(id)
return
}

if reflect.DeepEqual(w.Labels, workloadLabels) {
// No label change.
return
}

w.Labels = workloadLabels

// update workload labels, so that can improve perf of Proxy.SetWorkloadLabels
adsClientsMutex.RLock()
for _, connection := range adsClients {
// update node label
if connection.modelNode.IPAddresses[0] == id {
connection.modelNode.WorkloadLabels = labels.Collection{workloadLabels}
break
// set to nil, trigger re-set in SetWorkloadLabels
connection.modelNode.WorkloadLabels = nil
}
}
adsClientsMutex.RUnlock()
Expand All @@ -489,6 +477,29 @@ func (s *DiscoveryServer) WorkloadUpdate(id string, workloadLabels map[string]st

adsLog.Infof("Label change, full push %s ", id)
s.ConfigUpdate(&model.PushRequest{Full: true})

}

// Note: we can not get the specific xds connection in single control plane multi-network model,
// as there may be overlapping network address across clusters.
// So we trigger pushes to all the potential proxies.
func (s *DiscoveryServer) sendPushRequestTo(ip string) {
adsClientsMutex.RLock()
for _, connection := range adsClients {
// if the workload has envoy proxy and connected to server,
// then do a full xDS push for this proxy;
// otherwise:
// case 1: the workload has no sidecar proxy, no need xDS push at all.
// case 2: the workload xDS connection has not been established,
// also no need to trigger a full push here.
if connection.modelNode.IPAddresses[0] == ip {
// There is a possibility that the pod comes up later than endpoint.
// So no endpoints add/update events after this, we should request
// full push immediately to speed up sidecar startup.
s.pushQueue.Enqueue(connection, &model.PushRequest{Full: true, Push: s.globalPushContext(), Start: time.Now()})
}
}
adsClientsMutex.RUnlock()
}

// EDSUpdate computes destination address membership across all clusters and networks.
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/proxy/envoy/v2/eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func edsUpdateInc(server *bootstrap.Server, adsc *adsc.ADSC, t *testing.T) {
testTCPEndpoints("127.0.0.4", adsc, t)

// Update the endpoint with different label - expect full
server.EnvoyXdsServer.WorkloadUpdate("127.0.0.4", map[string]string{"version": "v2"}, nil)
server.EnvoyXdsServer.WorkloadUpdate("", "127.0.0.4", map[string]string{"version": "v2"}, nil)

edsFullUpdateCheck(adsc, t)
testTCPEndpoints("127.0.0.4", adsc, t)
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/proxy/envoy/v2/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (sd *MemServiceDiscovery) ClearErrors() {

func (sd *MemServiceDiscovery) AddWorkload(ip string, labels labels.Instance) {
sd.ip2workloadLabels[ip] = &labels
sd.EDSUpdater.WorkloadUpdate(sd.ClusterID, map[string]string(labels), nil)
sd.EDSUpdater.WorkloadUpdate(sd.ClusterID, ip, map[string]string(labels), nil)
}

// AddHTTPService is a helper to add a service of type http, named 'http-main', with the
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/proxy/envoy/v2/xds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func initLocalPilotTestEnv(t *testing.T) (*bootstrap.Server, util.TearDownFunc)
server.EnvoyXdsServer.MemRegistry.SetEndpoints(edsIncSvc, "",
newEndpointWithAccount("127.0.0.1", "hello-sa", "v1"))
// Set the initial workload labels
server.EnvoyXdsServer.WorkloadUpdate("127.0.0.4", map[string]string{"version": "v1"}, nil)
server.EnvoyXdsServer.WorkloadUpdate("", "127.0.0.4", map[string]string{"version": "v1"}, nil)

// Update cache
server.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (fx *FakeXdsUpdater) SvcUpdate(shard, hostname string, ports map[string]uin
}
}

func (fx *FakeXdsUpdater) WorkloadUpdate(id string, labels map[string]string, annotations map[string]string) {
func (fx *FakeXdsUpdater) WorkloadUpdate(shard, id string, labels map[string]string, annotations map[string]string) {
select {
case fx.Events <- XdsEvent{Type: "workload", ID: id}:
default:
Expand Down
10 changes: 5 additions & 5 deletions pilot/pkg/serviceregistry/kube/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (pc *PodCache) event(obj interface{}, ev model.Event) error {
// add to cache if the pod is running or pending
pc.keys[ip] = key
if pc.c != nil && pc.c.XDSUpdater != nil {
pc.c.XDSUpdater.WorkloadUpdate(ip, pod.ObjectMeta.Labels, pod.ObjectMeta.Annotations)
pc.c.XDSUpdater.WorkloadUpdate(pc.c.ClusterID, ip, pod.ObjectMeta.Labels, pod.ObjectMeta.Annotations)
}
}
case model.EventUpdate:
Expand All @@ -94,7 +94,7 @@ func (pc *PodCache) event(obj interface{}, ev model.Event) error {
if pc.keys[ip] == key {
delete(pc.keys, ip)
if pc.c != nil && pc.c.XDSUpdater != nil {
pc.c.XDSUpdater.WorkloadUpdate(ip, nil, nil)
pc.c.XDSUpdater.WorkloadUpdate(pc.c.ClusterID, ip, nil, nil)
}
}
return nil
Expand All @@ -104,14 +104,14 @@ func (pc *PodCache) event(obj interface{}, ev model.Event) error {
// add to cache if the pod is running or pending
pc.keys[ip] = key
if pc.c != nil && pc.c.XDSUpdater != nil {
pc.c.XDSUpdater.WorkloadUpdate(ip, pod.ObjectMeta.Labels, pod.ObjectMeta.Annotations)
pc.c.XDSUpdater.WorkloadUpdate(pc.c.ClusterID, ip, pod.ObjectMeta.Labels, pod.ObjectMeta.Annotations)
}
default:
// delete if the pod switched to other states and is in the cache
if pc.keys[ip] == key {
delete(pc.keys, ip)
if pc.c != nil && pc.c.XDSUpdater != nil {
pc.c.XDSUpdater.WorkloadUpdate(ip, nil, nil)
pc.c.XDSUpdater.WorkloadUpdate(pc.c.ClusterID, ip, nil, nil)
}
}
}
Expand All @@ -120,7 +120,7 @@ func (pc *PodCache) event(obj interface{}, ev model.Event) error {
if pc.keys[ip] == key {
delete(pc.keys, ip)
if pc.c != nil && pc.c.XDSUpdater != nil {
pc.c.XDSUpdater.WorkloadUpdate(ip, nil, nil)
pc.c.XDSUpdater.WorkloadUpdate(pc.c.ClusterID, ip, nil, nil)
}
}
}
Expand Down

0 comments on commit 4560159

Please sign in to comment.