Skip to content

Commit

Permalink
Determine ServiceInstances from Proxy labels (istio#16483)
Browse files Browse the repository at this point in the history
  • Loading branch information
howardjohn authored and istio-testing committed Aug 30, 2019
1 parent 73d3f87 commit 2e789b9
Show file tree
Hide file tree
Showing 74 changed files with 728 additions and 29 deletions.
11 changes: 11 additions & 0 deletions install/kubernetes/helm/istio/files/injection-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ containers:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: ISTIO_META_POD_PORTS
value: |-
[
{{- range $index1, $c := .Spec.Containers }}
{{- range $index2, $p := $c.Ports }}
{{if or (ne $index1 0) (ne $index2 0)}},{{end}}{{ structToJSON $p }}
{{- end}}
{{- end}}
]
- name: ISTIO_META_CLUSTER_ID
value: "{{ valueOrDefault .Values.global.multicluster.clusterName `Kubernetes` }}"
- name: POD_NAMESPACE
valueFrom:
fieldRef:
Expand Down
4 changes: 4 additions & 0 deletions install/kubernetes/helm/istio/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ global:
# have a shared root CA for this model to work.
enabled: false

# Should be set to the name of the cluster this installation will run in. This is required for sidecar injection
# to properly label proxies
clusterName: ""

# A minimal set of requested resources to applied to all deployments so that
# Horizontal Pod Autoscaler will be able to function (if set).
# Each component can overwrite these default values by adding its own resources
Expand Down
14 changes: 9 additions & 5 deletions pilot/pkg/model/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,15 @@ func ParseMetadata(metadata *types.Struct) map[string]string {
switch s := v.GetKind().(type) {
case *types.Value_StringValue:
res[k] = s.StringValue
case *types.Value_StructValue:
// Some fields are not simple strings, they are structs. Dump these to json strings.
default:
// Some fields are not simple strings, dump these to json strings.
// TODO: convert metadata to a properly typed struct rather than map[string]string
j, err := (&jsonpb.Marshaler{}).MarshalToString(s.StructValue)
j, err := (&jsonpb.Marshaler{}).MarshalToString(v)
if err != nil {
log.Warnf("failed to unmarshal metadata field %v with value %v: %v", k, v, err)
continue
}
res[k] = j
default:
continue
}
}
if len(res) == 0 {
Expand Down Expand Up @@ -485,6 +483,9 @@ const (
// will be replaced with the gateway defined in the settings.
NodeMetadataNetwork = "NETWORK"

// NodeMetadataNetwork defines the cluster the node belongs to.
NodeMetadataClusterID = "CLUSTER_ID"

// NodeMetadataInterceptionMode is the name of the metadata variable that carries info about
// traffic interception mode at the proxy
NodeMetadataInterceptionMode = "INTERCEPTION_MODE"
Expand Down Expand Up @@ -543,6 +544,9 @@ const (
// If not set, no timeout is set.
NodeMetadataIdleTimeout = "IDLE_TIMEOUT"

// NodeMetadataPodPorts the ports on a pod. This is used to lookup named ports.
NodeMetadataPodPorts = "POD_PORTS"

// NodeMetadataCanonicalTelemetryService specifies the service name to use for all node telemetry.
NodeMetadataCanonicalTelemetryService = "CANONICAL_TELEMETRY_SERVICE"

Expand Down
10 changes: 10 additions & 0 deletions pilot/pkg/model/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ func TestParseMetadata(t *testing.T) {
"foo": "bar",
}}},
},
{
name: "Capture Pod Ports",
metadata: map[string]interface{}{
"POD_PORTS": `[{"name":"http","containerPort":8080,"protocol":"TCP"},{"name":"grpc","containerPort":8079,"protocol":"TCP"}]`,
},
out: model.Proxy{Type: "sidecar", IPAddresses: []string{"1.1.1.1"}, DNSDomain: "domain", ID: "id", IstioVersion: model.MaxIstioVersion,
Metadata: map[string]string{
"POD_PORTS": `[{"name":"http","containerPort":8080,"protocol":"TCP"},{"name":"grpc","containerPort":8079,"protocol":"TCP"}]`,
}},
},
}

nodeID := "sidecar~1.1.1.1~id~domain"
Expand Down
17 changes: 17 additions & 0 deletions pilot/pkg/networking/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,23 @@ func ConvertLocality(locality string) *core.Locality {
}
}

// ConvertLocality converts '/' separated locality string to Locality struct.
func LocalityToString(l *core.Locality) string {
if l == nil {
return ""
}
resp := l.Region
if l.Zone == "" {
return resp
}
resp += "/" + l.Zone
if l.SubZone == "" {
return resp
}
resp += "/" + l.SubZone
return resp
}

// IsLocalityEmpty checks if a locality is empty (checking region is good enough, based on how its initialized)
func IsLocalityEmpty(locality *core.Locality) bool {
if locality == nil || (len(locality.GetRegion()) == 0) {
Expand Down
42 changes: 27 additions & 15 deletions pilot/pkg/networking/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,44 +180,46 @@ func TestConvertLocality(t *testing.T) {
name string
locality string
want *core.Locality
reverse string
}{
{
"nil locality",
"",
nil,
name: "nil locality",
locality: "",
want: nil,
},
{
"locality with only region",
"region",
&core.Locality{
name: "locality with only region",
locality: "region",
want: &core.Locality{
Region: "region",
},
},
{
"locality with region and zone",
"region/zone",
&core.Locality{
name: "locality with region and zone",
locality: "region/zone",
want: &core.Locality{
Region: "region",
Zone: "zone",
},
},
{
"locality with region zone and subzone",
"region/zone/subzone",
&core.Locality{
name: "locality with region zone and subzone",
locality: "region/zone/subzone",
want: &core.Locality{
Region: "region",
Zone: "zone",
SubZone: "subzone",
},
},
{
"locality with region zone subzone and rack",
"region/zone/subzone/rack",
&core.Locality{
name: "locality with region zone subzone and rack",
locality: "region/zone/subzone/rack",
want: &core.Locality{
Region: "region",
Zone: "zone",
SubZone: "subzone",
},
reverse: "region/zone/subzone",
},
}

Expand All @@ -227,6 +229,16 @@ func TestConvertLocality(t *testing.T) {
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Expected locality %#v, but got %#v", tt.want, got)
}
// Verify we can reverse the conversion back to the original input
reverse := LocalityToString(got)
if tt.reverse != "" {
// Special case, reverse lookup is different than original input
if tt.reverse != reverse {
t.Errorf("Expected locality string %s, got %v", tt.reverse, reverse)
}
} else if tt.locality != reverse {
t.Errorf("Expected locality string %s, got %v", tt.locality, reverse)
}
})
}
}
Expand Down
116 changes: 116 additions & 0 deletions pilot/pkg/serviceregistry/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package controller

import (
"encoding/json"
"errors"
"fmt"
"net"
Expand All @@ -26,6 +27,10 @@ import (

"github.com/yl2chen/cidranger"
v1 "k8s.io/api/core/v1"

"istio.io/istio/pilot/pkg/networking/util"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -496,6 +501,16 @@ func (c *Controller) GetProxyServiceInstances(proxy *model.Proxy) ([]*model.Serv

proxyNamespace := ""
if len(proxy.IPAddresses) > 0 {
// Fetching the pod from Kubernetes is slow, and a pod may not even be present when this is called
// due to eventual consistency issues. However, we have a lot of information about the pod from the proxy
// metadata already. Because of this, we can still get most of the information we need.
// If we cannot accurately construct ServiceInstances from just the metadata, this will return an error and we can
// attempt to read the real pod.
instances, err := c.getProxyServiceInstancesFromMetadata(proxy)
if err == nil {
return instances, nil
}

// only need to fetch the corresponding pod through the first IP, although there are multiple IP scenarios,
// because multiple ips belong to the same pod
proxyIP := proxy.IPAddresses[0]
Expand Down Expand Up @@ -540,6 +555,7 @@ func (c *Controller) GetProxyServiceInstances(proxy *model.Proxy) ([]*model.Serv
// referring to the same IP/port, the one in endpointsForPodInSameNS will be used. (The other one
// in endpointsForPodInDifferentNS will thus be rejected by Pilot).
out = append(endpointsForPodInSameNS, endpointsForPodInDifferentNS...)

if len(out) == 0 {
if c.Env != nil {
c.Env.PushContext.Add(model.ProxyStatusNoService, proxy.ID, proxy, "")
Expand All @@ -554,6 +570,106 @@ func (c *Controller) GetProxyServiceInstances(proxy *model.Proxy) ([]*model.Serv
return out, nil
}

// getProxyServiceInstancesFromMetadata retrieves ServiceInstances using proxy Metadata rather than
// from the Pod. This allows retrieving Instances immediately, regardless of delays in Kubernetes.
// If the proxy doesn't have enough metadata, an error is returned
func (c *Controller) getProxyServiceInstancesFromMetadata(proxy *model.Proxy) ([]*model.ServiceInstance, error) {
if len(proxy.WorkloadLabels) == 0 {
return nil, fmt.Errorf("no workload labels found")
}

if proxy.Metadata[model.NodeMetadataClusterID] != c.ClusterID {
return nil, fmt.Errorf("proxy is in cluster %v, but controller is for cluster %v", proxy.Metadata[model.NodeMetadataClusterID], c.ClusterID)
}

// Create a pod with just the information needed to find the associated Services
dummyPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: proxy.ConfigNamespace,
Labels: proxy.WorkloadLabels[0],
},
}

// Find the Service associated with the pod.
svcLister := listerv1.NewServiceLister(c.services.informer.GetIndexer())
services, err := svcLister.GetPodServices(dummyPod)
if err != nil {
return nil, fmt.Errorf("error getting instances: %v", err)

}
if len(services) == 0 {
return nil, fmt.Errorf("no instances found: %v ", err)
}

// This is a reference to a port by name. We need to do a lookup from the ports map
// podPorts must first be unmarshalled, then we can do a lookup.
var podPorts []*v1.ContainerPort
// We only need the ports if they do a port reference by name, so we don't need to fail yet if
// port metadata is not provided
if _, f := proxy.Metadata[model.NodeMetadataPodPorts]; f {
if err := json.Unmarshal([]byte(proxy.Metadata[model.NodeMetadataPodPorts]), &podPorts); err != nil {
return nil, err
}
}

out := make([]*model.ServiceInstance, 0)
for _, svc := range services {
svcAccount := proxy.Metadata[model.NodeMetadataServiceAccount]
hostname := kube.ServiceHostname(svc.Name, svc.Namespace, c.domainSuffix)
c.RLock()
modelService, f := c.servicesMap[hostname]
c.RUnlock()
if !f {
return nil, fmt.Errorf("failed to find model service for %v", hostname)
}
for _, port := range svc.Spec.Ports {
svcPort, f := modelService.Ports.Get(port.Name)
if !f {
return nil, fmt.Errorf("failed to get svc port for %v", port.Name)
}
targetPort, err := findPortFromMetadata(port, podPorts)
if err != nil {
return nil, fmt.Errorf("failed to find target port for %v: %v", proxy.ID, err)
}
// Construct the ServiceInstance
out = append(out, &model.ServiceInstance{
Endpoint: model.NetworkEndpoint{
Address: proxy.IPAddresses[0],
Port: targetPort,
ServicePort: svcPort,
Network: c.endpointNetwork(proxy.IPAddresses[0]),
Locality: util.LocalityToString(proxy.Locality),
},
Service: modelService,
// Kubernetes service will only have a single instance of labels, and we return early if there are no labels.
Labels: proxy.WorkloadLabels[0],
ServiceAccount: svcAccount,
})
}
}
return out, nil
}

// findPortFromMetadata resolves the TargetPort of a Service Port, by reading the Pod spec.
func findPortFromMetadata(svcPort v1.ServicePort, podPorts []*v1.ContainerPort) (int, error) {
target := svcPort.TargetPort

switch target.Type {
case intstr.String:
name := target.StrVal
for _, port := range podPorts {
if port.Name == name && port.Protocol == svcPort.Protocol {
return int(port.ContainerPort), nil
}
}
case intstr.Int:
// For a direct reference we can just return the port number
return target.IntValue(), nil
}

return 0, fmt.Errorf("no matching port found for %+v", svcPort)
}

func (c *Controller) getProxyServiceInstancesByEndpoint(endpoints v1.Endpoints, proxy *model.Proxy) []*model.ServiceInstance {
out := make([]*model.ServiceInstance, 0)

Expand Down
37 changes: 37 additions & 0 deletions pilot/pkg/serviceregistry/kube/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -496,6 +497,42 @@ func TestGetProxyServiceInstances(t *testing.T) {
t.Errorf("GetProxyServiceInstances() wrong service instance returned => hostname %q, want %q",
services[0].Service.Hostname, hostname)
}

// Test that we can look up instances just by Proxy metadata
metaServices, err := controller.GetProxyServiceInstances(&model.Proxy{
Type: "sidecar",
IPAddresses: []string{"1.1.1.1"},
Locality: &core.Locality{Region: "r", Zone: "z"},
ConfigNamespace: "nsa",
Metadata: map[string]string{model.NodeMetadataServiceAccount: "account"},
WorkloadLabels: labels.Collection{labels.Instance{"app": "prod-app"}},
})
if err != nil {
t.Fatalf("got err getting service instances")
}

expected := &model.ServiceInstance{
Endpoint: model.NetworkEndpoint{Family: 0,
Address: "1.1.1.1",
ServicePort: &model.Port{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP},
Locality: "r/z",
},
Service: &model.Service{
Hostname: "svc1.nsa.svc.company.com",
Address: "10.0.0.1",
Ports: []*model.Port{{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP}},
ServiceAccounts: []string{"[email protected]", "spiffe://cluster.local/ns/nsa/sa/acct4"},
Attributes: model.ServiceAttributes{Name: "svc1", Namespace: "nsa", UID: "istio://nsa/services/svc1"},
},
Labels: labels.Instance{"app": "prod-app"},
ServiceAccount: "account",
}
if len(metaServices) != 1 {
t.Fatalf("expected 1 instance, got %v", len(metaServices))
}
if !reflect.DeepEqual(expected, metaServices[0]) {
t.Fatalf("expected instance %v, got %v", expected, metaServices[0])
}
}

func TestGetProxyServiceInstancesWithMultiIPs(t *testing.T) {
Expand Down
Loading

0 comments on commit 2e789b9

Please sign in to comment.