Skip to content

Commit

Permalink
Fix data race (istio#16742)
Browse files Browse the repository at this point in the history
* remove

* fix race

* rename

* fix comment

* ignore err, which can not happen
  • Loading branch information
hzxuzhonghu authored and istio-testing committed Sep 2, 2019
1 parent d1fd91b commit cda495f
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 59 deletions.
4 changes: 3 additions & 1 deletion pilot/pkg/model/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ func (node *Proxy) SetServiceInstances(env *Environment) error {
return nil
}

func (node *Proxy) SetWorkloadLabels(env *Environment) error {
// SetWorkloadLabels will reset the proxy.WorkloadLabels if `force` = true,
// otherwise only set it when it is nil.
func (node *Proxy) SetWorkloadLabels(env *Environment, force bool) error {
// The WorkloadLabels is already parsed from Node metadata["LABELS"]
// Or updated in DiscoveryServer.WorkloadUpdate.
if node.WorkloadLabels != nil {
Expand Down
4 changes: 2 additions & 2 deletions pilot/pkg/networking/core/v1alpha3/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (configgen *ConfigGeneratorImpl) buildGatewayListeners(
}

func (configgen *ConfigGeneratorImpl) buildGatewayHTTPRouteConfig(env *model.Environment, node *model.Proxy, push *model.PushContext,
_ []*model.ServiceInstance, routeName string) *xdsapi.RouteConfiguration {
routeName string) *xdsapi.RouteConfiguration {

services := push.Services(node)

Expand Down Expand Up @@ -252,7 +252,7 @@ func (configgen *ConfigGeneratorImpl) buildGatewayHTTPRouteConfig(env *model.Env
continue
}

routes, err := istio_route.BuildHTTPRoutesForVirtualService(node, push, virtualService, nameToServiceMap, port, nil, map[string]bool{gatewayName: true})
routes, err := istio_route.BuildHTTPRoutesForVirtualService(node, push, virtualService, nameToServiceMap, port, map[string]bool{gatewayName: true})
if err != nil {
log.Debugf("%s omitting routes for service %v due to error: %v", node.ID, virtualService, err)
continue
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/networking/core/v1alpha3/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ func TestGatewayHTTPRouteConfig(t *testing.T) {
configgen := NewConfigGenerator([]plugin.Plugin{p})
env := buildEnv(t, tt.gateways, tt.virtualServices)
proxy13Gateway.SetGatewaysForProxy(env.PushContext)
route := configgen.buildGatewayHTTPRouteConfig(&env, &proxy13Gateway, env.PushContext, proxyInstances, tt.routeName)
route := configgen.buildGatewayHTTPRouteConfig(&env, &proxy13Gateway, env.PushContext, tt.routeName)
if route == nil {
t.Fatal("got an empty route configuration")
}
Expand Down
10 changes: 4 additions & 6 deletions pilot/pkg/networking/core/v1alpha3/httproute.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,16 @@ import (
// BuildHTTPRoutes produces a list of routes for the proxy
func (configgen *ConfigGeneratorImpl) BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext,
routeName string) *xdsapi.RouteConfiguration {
// TODO: Move all this out
proxyInstances := node.ServiceInstances
var rc *xdsapi.RouteConfiguration
switch node.Type {
case model.SidecarProxy:
rc = configgen.buildSidecarOutboundHTTPRouteConfig(env, node, push, proxyInstances, routeName)
rc = configgen.buildSidecarOutboundHTTPRouteConfig(env, node, push, routeName)
if rc != nil {
rc = envoyfilter.ApplyRouteConfigurationPatches(networking.EnvoyFilter_SIDECAR_OUTBOUND, node, push, rc)
}
return rc
case model.Router:
rc = configgen.buildGatewayHTTPRouteConfig(env, node, push, proxyInstances, routeName)
rc = configgen.buildGatewayHTTPRouteConfig(env, node, push, routeName)
if rc != nil {
rc = envoyfilter.ApplyRouteConfigurationPatches(networking.EnvoyFilter_GATEWAY, node, push, rc)
}
Expand Down Expand Up @@ -101,7 +99,7 @@ func (configgen *ConfigGeneratorImpl) buildSidecarInboundHTTPRouteConfig(env *mo
// buildSidecarOutboundHTTPRouteConfig builds an outbound HTTP Route for sidecar.
// Based on port, will determine all virtual hosts that listen on the port.
func (configgen *ConfigGeneratorImpl) buildSidecarOutboundHTTPRouteConfig(env *model.Environment, node *model.Proxy, push *model.PushContext,
_ []*model.ServiceInstance, routeName string) *xdsapi.RouteConfiguration {
routeName string) *xdsapi.RouteConfiguration {

listenerPort := 0
var err error
Expand Down Expand Up @@ -159,7 +157,7 @@ func (configgen *ConfigGeneratorImpl) buildSidecarOutboundHTTPRouteConfig(env *m

// Get list of virtual services bound to the mesh gateway
virtualHostWrappers := istio_route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, nameToServiceMap,
node.WorkloadLabels, virtualServices, listenerPort)
virtualServices, listenerPort)
vHostPortMap := make(map[int][]*route.VirtualHost)
uniques := make(map[string]struct{})
for _, virtualHostWrapper := range virtualHostWrappers {
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/networking/core/v1alpha3/httproute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func testSidecarRDSVHosts(t *testing.T, services []*model.Service,
_ = os.Setenv("PILOT_ENABLE_FALLTHROUGH_ROUTE", "1")
}

route := configgen.buildSidecarOutboundHTTPRouteConfig(&env, &proxy, env.PushContext, proxyInstances, routeName)
route := configgen.buildSidecarOutboundHTTPRouteConfig(&env, &proxy, env.PushContext, routeName)
if route == nil {
t.Fatalf("got nil route for %s", routeName)
}
Expand Down
17 changes: 7 additions & 10 deletions pilot/pkg/networking/core/v1alpha3/route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ func BuildSidecarVirtualHostsFromConfigAndRegistry(
node *model.Proxy,
push *model.PushContext,
serviceRegistry map[host.Name]*model.Service,
proxyLabels labels.Collection,
virtualServices []model.Config, listenPort int) []VirtualHostWrapper {
virtualServices []model.Config,
listenPort int) []VirtualHostWrapper {

out := make([]VirtualHostWrapper, 0)

// translate all virtual service configs into virtual hosts
for _, virtualService := range virtualServices {
wrappers := buildSidecarVirtualHostsForVirtualService(node, push, virtualService, serviceRegistry, proxyLabels, listenPort)
wrappers := buildSidecarVirtualHostsForVirtualService(node, push, virtualService, serviceRegistry, listenPort)
if len(wrappers) == 0 {
// If none of the routes matched by source (i.e. proxyLabels), then discard this entire virtual service
continue
Expand Down Expand Up @@ -168,7 +168,6 @@ func buildSidecarVirtualHostsForVirtualService(
push *model.PushContext,
virtualService model.Config,
serviceRegistry map[host.Name]*model.Service,
proxyLabels labels.Collection,
listenPort int) []VirtualHostWrapper {
hosts, servicesInVirtualService := separateVSHostsAndServices(virtualService, serviceRegistry)

Expand Down Expand Up @@ -198,7 +197,7 @@ func buildSidecarVirtualHostsForVirtualService(
meshGateway := map[string]bool{constants.IstioMeshGateway: true}
out := make([]VirtualHostWrapper, 0, len(serviceByPort))
for port, portServices := range serviceByPort {
routes, err := BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, listenPort, proxyLabels, meshGateway)
routes, err := BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, listenPort, meshGateway)
if err != nil || len(routes) == 0 {
continue
}
Expand Down Expand Up @@ -253,7 +252,6 @@ func BuildHTTPRoutesForVirtualService(
virtualService model.Config,
serviceRegistry map[host.Name]*model.Service,
listenPort int,
proxyLabels labels.Collection,
gatewayNames map[string]bool) ([]*route.Route, error) {

vs, ok := virtualService.Spec.(*networking.VirtualService)
Expand All @@ -265,13 +263,13 @@ func BuildHTTPRoutesForVirtualService(
allroutes:
for _, http := range vs.Http {
if len(http.Match) == 0 {
if r := translateRoute(push, node, http, nil, listenPort, virtualService, serviceRegistry, proxyLabels, gatewayNames); r != nil {
if r := translateRoute(push, node, http, nil, listenPort, virtualService, serviceRegistry, gatewayNames); r != nil {
out = append(out, r)
}
break allroutes // we have a rule with catch all match prefix: /. Other rules are of no use
} else {
for _, match := range http.Match {
if r := translateRoute(push, node, http, match, listenPort, virtualService, serviceRegistry, proxyLabels, gatewayNames); r != nil {
if r := translateRoute(push, node, http, match, listenPort, virtualService, serviceRegistry, gatewayNames); r != nil {
out = append(out, r)
rType, _ := getEnvoyRouteTypeAndVal(r)
if rType == envoyCatchAll {
Expand Down Expand Up @@ -315,14 +313,13 @@ func translateRoute(push *model.PushContext, node *model.Proxy, in *networking.H
match *networking.HTTPMatchRequest, port int,
virtualService model.Config,
serviceRegistry map[host.Name]*model.Service,
proxyLabels labels.Collection,
gatewayNames map[string]bool) *route.Route {

// When building routes, its okay if the target cluster cannot be
// resolved Traffic to such clusters will blackhole.

// Match by source labels/gateway names inside the match condition
if !sourceMatchHTTP(match, proxyLabels, gatewayNames) {
if !sourceMatchHTTP(match, node.WorkloadLabels, gatewayNames) {
return nil
}

Expand Down
19 changes: 9 additions & 10 deletions pilot/pkg/networking/core/v1alpha3/route/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/networking/core/v1alpha3/route"
"istio.io/istio/pkg/config/host"
"istio.io/istio/pkg/config/labels"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/config/protocol"
"istio.io/istio/pkg/config/schemas"
Expand Down Expand Up @@ -62,7 +61,7 @@ func TestBuildHTTPRoutes(t *testing.T) {
t.Run("for virtual service", func(t *testing.T) {
g := gomega.NewGomegaWithT(t)

routes, err := route.BuildHTTPRoutesForVirtualService(node, nil, virtualServicePlain, serviceRegistry, 8080, labels.Collection{}, gatewayNames)
routes, err := route.BuildHTTPRoutesForVirtualService(node, nil, virtualServicePlain, serviceRegistry, 8080, gatewayNames)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(len(routes)).To(gomega.Equal(1))
})
Expand Down Expand Up @@ -104,7 +103,7 @@ func TestBuildHTTPRoutes(t *testing.T) {
},
})

routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualServicePlain, serviceRegistry, 8080, labels.Collection{}, gatewayNames)
routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualServicePlain, serviceRegistry, 8080, gatewayNames)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(len(routes)).To(gomega.Equal(1))

Expand Down Expand Up @@ -151,7 +150,7 @@ func TestBuildHTTPRoutes(t *testing.T) {
},
})

routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, labels.Collection{}, gatewayNames)
routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, gatewayNames)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(len(routes)).To(gomega.Equal(1))

Expand Down Expand Up @@ -194,7 +193,7 @@ func TestBuildHTTPRoutes(t *testing.T) {
Spec: portLevelDestinationRuleWithSubsetPolicy,
}})

routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, labels.Collection{}, gatewayNames)
routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, gatewayNames)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(len(routes)).To(gomega.Equal(1))

Expand Down Expand Up @@ -242,7 +241,7 @@ func TestBuildHTTPRoutes(t *testing.T) {
push.SetDestinationRules([]model.Config{
cnfg})

routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, labels.Collection{}, gatewayNames)
routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualService, serviceRegistry, 8080, gatewayNames)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(len(routes)).To(gomega.Equal(1))

Expand Down Expand Up @@ -278,7 +277,7 @@ func TestBuildHTTPRoutes(t *testing.T) {
}})

gatewayNames := map[string]bool{"some-gateway": true}
routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualServicePlain, serviceRegistry, 8080, labels.Collection{}, gatewayNames)
routes, err := route.BuildHTTPRoutesForVirtualService(node, push, virtualServicePlain, serviceRegistry, 8080, gatewayNames)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(len(routes)).To(gomega.Equal(1))

Expand All @@ -297,7 +296,7 @@ func TestBuildHTTPRoutes(t *testing.T) {
g := gomega.NewGomegaWithT(t)

routes, err := route.BuildHTTPRoutesForVirtualService(node, nil, virtualServiceWithRedirect,
serviceRegistry, 8080, labels.Collection{}, gatewayNames)
serviceRegistry, 8080, gatewayNames)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(len(routes)).To(gomega.Equal(1))

Expand All @@ -322,7 +321,7 @@ func TestBuildHTTPRoutes(t *testing.T) {
},
Spec: networkingDestinationRule,
}})
vhosts := route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, serviceRegistry, labels.Collection{}, []model.Config{}, 8080)
vhosts := route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, serviceRegistry, []model.Config{}, 8080)
g.Expect(vhosts[0].Routes[0].Action.(*envoyroute.Route_Route).Route.HashPolicy).NotTo(gomega.BeNil())
})
t.Run("for no virtualservice but has destinationrule with portLevel consistentHash loadbalancer", func(t *testing.T) {
Expand All @@ -343,7 +342,7 @@ func TestBuildHTTPRoutes(t *testing.T) {
Spec: networkingDestinationRuleWithPortLevelTrafficPolicy,
}})

vhosts := route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, serviceRegistry, labels.Collection{}, []model.Config{}, 8080)
vhosts := route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, serviceRegistry, []model.Config{}, 8080)

hashPolicy := &envoyroute.RouteAction_HashPolicy{
PolicySpecifier: &envoyroute.RouteAction_HashPolicy_Cookie_{
Expand Down
33 changes: 24 additions & 9 deletions pilot/pkg/proxy/envoy/v2/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type XdsConnection struct {
// same info can be sent to all clients, without recomputing.
pushChannel chan *XdsEvent

// Sending on this channel results in a reset on proxy attributes.
// Generally it comes before a XdsEvent.
updateChannel chan *UpdateEvent

// TODO: migrate other fields as needed from model.Proxy and replace it

//HttpConnectionManagers map[string]*http_conn.HttpConnectionManager
Expand Down Expand Up @@ -187,15 +191,22 @@ type XdsEvent struct {
done func()
}

// UpdateEvent represents a update request for the proxy.
// This will trigger proxy specific attributes reset before each push.
type UpdateEvent struct {
workloadLabel bool
}

func newXdsConnection(peerAddr string, stream DiscoveryStream) *XdsConnection {
return &XdsConnection{
pushChannel: make(chan *XdsEvent),
PeerAddr: peerAddr,
Clusters: []string{},
Connect: time.Now(),
stream: stream,
LDSListeners: []*xdsapi.Listener{},
RouteConfigs: map[string]*xdsapi.RouteConfiguration{},
pushChannel: make(chan *XdsEvent),
updateChannel: make(chan *UpdateEvent, 1),
PeerAddr: peerAddr,
Clusters: []string{},
Connect: time.Now(),
stream: stream,
LDSListeners: []*xdsapi.Listener{},
RouteConfigs: map[string]*xdsapi.RouteConfiguration{},
}
}

Expand Down Expand Up @@ -444,6 +455,10 @@ func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscove
} else {
con.mu.Unlock()
}
case updateEv := <-con.updateChannel:
if updateEv.workloadLabel && con.modelNode != nil {
_ = con.modelNode.SetWorkloadLabels(s.Env, true)
}
case pushEv := <-con.pushChannel:
// It is called when config changes.
// This is not optimized yet - we should detect what changed based on event and only
Expand Down Expand Up @@ -500,7 +515,7 @@ func (s *DiscoveryServer) initConnectionNode(discReq *xdsapi.DiscoveryRequest, c
nt.Locality = discReq.Node.Locality
}

if err := nt.SetWorkloadLabels(s.Env); err != nil {
if err := nt.SetWorkloadLabels(s.Env, false); err != nil {
return err
}

Expand Down Expand Up @@ -544,7 +559,7 @@ func (s *DiscoveryServer) pushConnection(con *XdsConnection, pushEv *XdsEvent) e
return nil
}

if err := con.modelNode.SetWorkloadLabels(s.Env); err != nil {
if err := con.modelNode.SetWorkloadLabels(s.Env, false); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/proxy/envoy/v2/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type DiscoveryServer struct {

// WorkloadsById keeps track of information about a workload, based on direct notifications
// from registry. This acts as a cache and allows detecting changes.
// clusterID => workloadID => Workload
// clusterID => workload ip => Workload
WorkloadsByID map[string]map[string]*Workload

pushChannel chan *model.PushRequest
Expand Down
Loading

0 comments on commit cda495f

Please sign in to comment.