From 04c5b1be09ba7bf936423c6c00423e9cbf270b9d Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 28 Aug 2019 23:45:20 +0800 Subject: [PATCH] Add external services into Mesh (#16543) * add external services into Mesh * fix lint * fix test * using matching lables for deployments * change IP as list * modify msg --- istioctl/cmd/add-to-mesh.go | 326 +++++++++++++++++++++++++++++-- istioctl/cmd/add-to-mesh_test.go | 87 +++++++-- 2 files changed, 382 insertions(+), 31 deletions(-) diff --git a/istioctl/cmd/add-to-mesh.go b/istioctl/cmd/add-to-mesh.go index 8f19306ae434..6003acb8ac47 100644 --- a/istioctl/cmd/add-to-mesh.go +++ b/istioctl/cmd/add-to-mesh.go @@ -18,6 +18,17 @@ import ( "fmt" "io" "io/ioutil" + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/kubernetes/pkg/apis/core" + + "istio.io/api/networking/v1alpha3" + "istio.io/istio/pilot/pkg/model" + "istio.io/istio/pkg/config/schemas" + "istio.io/istio/pkg/kube" "github.com/ghodss/yaml" "github.com/spf13/cobra" @@ -30,12 +41,30 @@ import ( "istio.io/pkg/log" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8s_labels "k8s.io/apimachinery/pkg/labels" meshconfig "istio.io/api/mesh/v1alpha1" + kube_registry "istio.io/istio/pilot/pkg/serviceregistry/kube" + istioProtocol "istio.io/istio/pkg/config/protocol" +) + +var ( + crdFactory = createDynamicInterface ) +// vmServiceOpts contains the options of a mesh expansion service running on VM. +type vmServiceOpts struct { + Name string + Namespace string + ServiceAccount string + IP []string + PortList model.PortList + Labels map[string]string + Annotations map[string]string +} + func addToMeshCmd() *cobra.Command { addToMeshCmd := &cobra.Command{ Use: "add-to-mesh", @@ -50,18 +79,7 @@ func addToMeshCmd() *cobra.Command { }, } addToMeshCmd.AddCommand(svcMeshifyCmd()) - addToMeshCmd.PersistentFlags().StringVar(&meshConfigFile, "meshConfigFile", "", - "mesh configuration filename. Takes precedence over --meshConfigMapName if set") - addToMeshCmd.PersistentFlags().StringVar(&injectConfigFile, "injectConfigFile", "", - "injection configuration filename. Cannot be used with --injectConfigMapName") - addToMeshCmd.PersistentFlags().StringVar(&valuesFile, "valuesFile", "", - "injection values configuration filename.") - - addToMeshCmd.PersistentFlags().StringVar(&meshConfigMapName, "meshConfigMapName", defaultMeshConfigMapName, - fmt.Sprintf("ConfigMap name for Istio mesh configuration, key should be %q", configMapKey)) - addToMeshCmd.PersistentFlags().StringVar(&injectConfigMapName, "injectConfigMapName", defaultInjectConfigMapName, - fmt.Sprintf("ConfigMap name for Istio sidecar injection, key should be %q.", injectConfigMapKey)) - + addToMeshCmd.AddCommand(externalSvcMeshifyCmd()) return addToMeshCmd } @@ -69,7 +87,7 @@ func svcMeshifyCmd() *cobra.Command { cmd := &cobra.Command{ Use: "service", Short: "Add Service to Istio service mesh", - Long: `istioctl experimental add-to-mesh restarts pods with the Istio sidecar. Use 'add-to-mesh' + Long: `istioctl experimental add-to-mesh service restarts pods with the Istio sidecar. Use 'add-to-mesh' to test deployments for compatibility with Istio. If your service does not function after using 'add-to-mesh' you must re-deploy it and troubleshoot it for Istio compatibility. See https://istio.io/docs/setup/kubernetes/additional-setup/requirements/ @@ -104,8 +122,63 @@ THIS COMMAND IS STILL UNDER ACTIVE DEVELOPMENT AND NOT READY FOR PRODUCTION USE. args[0], ns, meshConfig, writer) }, } + cmd.PersistentFlags().StringVar(&meshConfigFile, "meshConfigFile", "", + "mesh configuration filename. Takes precedence over --meshConfigMapName if set") + cmd.PersistentFlags().StringVar(&injectConfigFile, "injectConfigFile", "", + "injection configuration filename. Cannot be used with --injectConfigMapName") + cmd.PersistentFlags().StringVar(&valuesFile, "valuesFile", "", + "injection values configuration filename.") + + cmd.PersistentFlags().StringVar(&meshConfigMapName, "meshConfigMapName", defaultMeshConfigMapName, + fmt.Sprintf("ConfigMap name for Istio mesh configuration, key should be %q", configMapKey)) + cmd.PersistentFlags().StringVar(&injectConfigMapName, "injectConfigMapName", defaultInjectConfigMapName, + fmt.Sprintf("ConfigMap name for Istio sidecar injection, key should be %q.", injectConfigMapKey)) + + return cmd +} + +func externalSvcMeshifyCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "external-service ... [name1:]port1 [name2:]port2 ...", + Short: "Add external service(eg:services running on VM) to Istio service mesh", + Long: `istioctl experimental add-to-mesh external-service create a ServiceEntry and\ +a Service without selector for the specified external service in Istio service mesh. +The typical usage scenario is Mesh Expansion on VMs. +THIS COMMAND IS STILL UNDER ACTIVE DEVELOPMENT AND NOT READY FOR PRODUCTION USE. +`, + Example: `istioctl experimental add-to-mesh external-service vmhttp 172.12.23.125,172.12.23.126\ +http:9080 tcp:8888 -l app=test,version=v1 -a env=stage -s stageAdmin`, + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) < 3 { + return fmt.Errorf("provide service name, IP and Port List") + } + client, err := interfaceFactory(kubeconfig) + if err != nil { + return err + } + seClient, err := crdFactory(kubeconfig) + if err != nil { + return err + } + writer := cmd.OutOrStdout() + ns := handlers.HandleNamespace(namespace, defaultNamespace) + _, err = client.CoreV1().Services(ns).Get(args[0], metav1.GetOptions{ + IncludeUninitialized: true}) + if err != nil { + return addServiceOnVMToMesh(seClient, client, ns, args, labels, annotations, svcAcctAnn, writer) + } + return fmt.Errorf("service %q already exists, skip", args[0]) + }, + } + cmd.PersistentFlags().StringSliceVarP(&labels, "labels", "l", + nil, "List of labels to apply if creating a service/endpoint; e.g. -l env=prod,vers=2") + cmd.PersistentFlags().StringSliceVarP(&annotations, "annotations", "a", + nil, "List of string annotations to apply if creating a service/endpoint; e.g. -a foo=bar,x=y") + cmd.PersistentFlags().StringVarP(&svcAcctAnn, "serviceaccount", "s", + "default", "Service account to link to the service") return cmd } + func setupParameters(sidecarTemplate, valuesConfig *string) (*meshconfig.MeshConfig, error) { var meshConfig *meshconfig.MeshConfig var err error @@ -168,7 +241,14 @@ func injectSideCarIntoDeployment(client kubernetes.Interface, deps []appsv1.Depl continue } - if _, err = client.AppsV1().Deployments(svcNamespace).UpdateStatus(res); err != nil { + d := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: dep.Name, + Namespace: dep.Namespace, + UID: dep.UID, + }, + } + if _, err = client.AppsV1().Deployments(svcNamespace).UpdateStatus(d); err != nil { errs = multierr.Append(fmt.Errorf("failed to update deployment %s.%s for service %s.%s due to %v", dep.Name, dep.Namespace, svcName, svcNamespace, err), errs) continue @@ -188,15 +268,231 @@ func findDeploymentsForSvc(client kubernetes.Interface, ns, name string) ([]apps return nil, err } svcSelector := k8s_labels.SelectorFromSet(svc.Spec.Selector) + if svcSelector.Empty() { + return nil, nil + } deployments, err := client.AppsV1().Deployments(ns).List(metav1.ListOptions{}) if err != nil { return nil, err } for _, dep := range deployments.Items { - depLabels := k8s_labels.Set(dep.ObjectMeta.Labels) + depLabels := k8s_labels.Set(dep.Spec.Selector.MatchLabels) if svcSelector.Matches(depLabels) { deps = append(deps, dep) } } return deps, nil } + +func createDynamicInterface(kubeconfig string) (dynamic.Interface, error) { + restConfig, err := kube.BuildClientConfig(kubeconfig, configContext) + + if err != nil { + return nil, err + } + dynamicClient, err := dynamic.NewForConfig(restConfig) + if err != nil { + return nil, err + } + return dynamicClient, nil +} + +func convertPortList(ports []string) (model.PortList, error) { + portList := model.PortList{} + for _, p := range ports { + np, err := kube_registry.Str2NamedPort(p) + if err != nil { + return nil, fmt.Errorf("invalid port format %v", p) + } + protocol := istioProtocol.Parse(np.Name) + if protocol == istioProtocol.Unsupported { + return nil, fmt.Errorf("protocol %s is not supported by Istio", np.Name) + } + portList = append(portList, &model.Port{ + Port: int(np.Port), + Protocol: protocol, + Name: np.Name, + }) + } + return portList, nil +} + +// addServiceOnVMToMesh adds a service running on VM into Istio service mesh +func addServiceOnVMToMesh(dynamicClient dynamic.Interface, client kubernetes.Interface, ns string, + args, l, a []string, svcAcctAnn string, writer io.Writer) error { + svcName := args[0] + ips := strings.Split(args[1], ",") + portsListStr := args[2:] + ports, err := convertPortList(portsListStr) + if err != nil { + return err + } + labels := convertToMap(l) + annotations := convertToMap(a) + opts := &vmServiceOpts{ + Name: svcName, + Namespace: ns, + PortList: ports, + IP: ips, + ServiceAccount: svcAcctAnn, + Labels: labels, + Annotations: annotations, + } + + u := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "networking.istio.io/" + schemas.ServiceEntry.Version, + "kind": schemas.ServiceEntry.VariableName, + "metadata": map[string]interface{}{ + "namespace": opts.Namespace, + "name": resourceName(opts.Name), + }, + }, + } + annotations[core.ServiceAccountNameKey] = opts.ServiceAccount + s := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.Name, + Namespace: opts.Namespace, + Annotations: annotations, + Labels: labels, + }, + } + + // Pre-check Kubernetes service and service entry does not exist. + _, err = client.CoreV1().Services(ns).Get(opts.Name, metav1.GetOptions{ + IncludeUninitialized: true, + }) + if err == nil { + return fmt.Errorf("service %q already exists, skip", opts.Name) + } + serviceEntryGVR := schema.GroupVersionResource{ + Group: "networking.istio.io", + Version: schemas.ServiceEntry.Version, + Resource: "serviceentries", + } + _, err = dynamicClient.Resource(serviceEntryGVR).Namespace(ns).Get(resourceName(opts.Name), metav1.GetOptions{}) + if err == nil { + return fmt.Errorf("service entry %q already exists, skip", resourceName(opts.Name)) + } + + if err = generateServiceEntry(u, opts); err != nil { + return err + } + generateK8sService(s, opts) + if err = createServiceEntry(dynamicClient, ns, u, opts.Name, writer); err != nil { + return err + } + return createK8sService(client, ns, s, writer) +} + +func generateServiceEntry(u *unstructured.Unstructured, o *vmServiceOpts) error { + if o == nil { + return fmt.Errorf("empty vm service options") + } + ports := []*v1alpha3.Port{} + for _, p := range o.PortList { + ports = append(ports, &v1alpha3.Port{ + Number: uint32(p.Port), + Protocol: string(p.Protocol), + Name: p.Name, + }) + } + eps := []*v1alpha3.ServiceEntry_Endpoint{} + for _, ip := range o.IP { + eps = append(eps, &v1alpha3.ServiceEntry_Endpoint{ + Address: ip, + Labels: o.Labels, + }) + } + host := fmt.Sprintf("%v.%v.svc.cluster.local", o.Name, o.Namespace) + spec := &v1alpha3.ServiceEntry{ + Hosts: []string{host}, + Ports: ports, + Endpoints: eps, + Resolution: v1alpha3.ServiceEntry_STATIC, + } + u.Object["spec"] = spec + return nil +} + +func resourceName(hostShortName string) string { + return fmt.Sprintf("mesh-expansion-%v", hostShortName) +} + +func generateK8sService(s *corev1.Service, o *vmServiceOpts) { + ports := []corev1.ServicePort{} + for _, p := range o.PortList { + ports = append(ports, corev1.ServicePort{ + Name: strings.ToLower(p.Name), + Port: int32(p.Port), + }) + } + + spec := corev1.ServiceSpec{ + Ports: ports, + } + s.Spec = spec +} + +func convertToMap(s []string) map[string]string { + out := make(map[string]string, len(s)) + for _, l := range s { + k, v := splitEqual(l) + out[k] = v + } + return out +} + +// splitEqual splits key=value string into key,value. if no = is found +// the whole string is the key and value is empty. +func splitEqual(str string) (string, string) { + idx := strings.Index(str, "=") + var k string + var v string + if idx >= 0 { + k = str[:idx] + v = str[idx+1:] + } else { + k = str + } + return k, v +} + +// createK8sService creates k8s service object for external services in order for DNS query and cluster VIP. +func createK8sService(client kubernetes.Interface, ns string, svc *corev1.Service, writer io.Writer) error { + if svc == nil { + return fmt.Errorf("failed to create vm service") + } + if _, err := client.CoreV1().Services(ns).Create(svc); err != nil { + return fmt.Errorf("failed to create kuberenetes service %v", err) + } + if _, err := client.CoreV1().Services(ns).UpdateStatus(svc); err != nil { + return fmt.Errorf("failed to create kuberenetes service %v", err) + } + sName := strings.Join([]string{svc.Name, svc.Namespace}, ".") + fmt.Fprintf(writer, "Kubernetes Service %q has been created in the Istio service mesh"+ + " for the external service %q\n", sName, svc.Name) + return nil +} + +// createServiceEntry creates an Istio ServiceEntry object in order to register vm service. +func createServiceEntry(dynamicClient dynamic.Interface, ns string, + u *unstructured.Unstructured, name string, writer io.Writer) error { + if u == nil { + return fmt.Errorf("failed to create vm service") + } + serviceEntryGVR := schema.GroupVersionResource{ + Group: "networking.istio.io", + Version: schemas.ServiceEntry.Version, + Resource: "serviceentries", + } + _, err := dynamicClient.Resource(serviceEntryGVR).Namespace(ns).Create(u, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create service entry %v", err) + } + seName := strings.Join([]string{u.GetName(), u.GetNamespace()}, ".") + fmt.Fprintf(writer, "ServiceEntry %q has been created in the Istio service mesh"+ + " for the external service %q\n", seName, name) + return nil +} diff --git a/istioctl/cmd/add-to-mesh_test.go b/istioctl/cmd/add-to-mesh_test.go index 0a7aec910ae5..90399b3b36e4 100644 --- a/istioctl/cmd/add-to-mesh_test.go +++ b/istioctl/cmd/add-to-mesh_test.go @@ -17,18 +17,21 @@ package cmd import ( "bytes" "fmt" + "strings" + "testing" v1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" - "strings" - "testing" + "istio.io/istio/pkg/config/schemas" + //"istio.io/istio/pilot/pkg/config/kube/crd" appsv1 "k8s.io/api/apps/v1" coreV1 "k8s.io/api/core/v1" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/dynamic/fake" ) type testcase struct { @@ -36,12 +39,13 @@ type testcase struct { expectedException bool args []string k8sConfigs []runtime.Object + dynamicConfigs []runtime.Object expectedOutput string } var ( - one = int32(1) - tck8sConfigs = []runtime.Object{ + one = int32(1) + cannedK8sConfigs = []runtime.Object{ &coreV1.ConfigMapList{Items: []coreV1.ConfigMap{}}, &appsv1.DeploymentList{Items: []appsv1.Deployment{ @@ -104,6 +108,18 @@ var ( }, }}, } + cannedDynamicConfigs = []runtime.Object{ + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "networking.istio.io/" + schemas.ServiceEntry.Version, + "kind": schemas.ServiceEntry.VariableName, + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "mesh-expansion-vmtest", + }, + }, + }, + } ) func TestAddToMesh(t *testing.T) { @@ -120,7 +136,7 @@ func TestAddToMesh(t *testing.T) { " --injectConfigFile testdata/inject-config.yaml"+ " --valuesFile testdata/inject-values.yaml", " "), expectedException: false, - k8sConfigs: tck8sConfigs, + k8sConfigs: cannedK8sConfigs, expectedOutput: "deployment details-v1.default updated successfully with Istio sidecar injected.\n" + "Next Step: Add related labels to the deployment to align with Istio's requirement: " + "https://istio.io/docs/setup/kubernetes/additional-setup/requirements/\n", @@ -131,7 +147,7 @@ func TestAddToMesh(t *testing.T) { " --injectConfigFile testdata/inject-config.yaml"+ " --valuesFile testdata/inject-values.yaml", " "), expectedException: true, - k8sConfigs: tck8sConfigs, + k8sConfigs: cannedK8sConfigs, expectedOutput: "Error: services \"test\" not found\n", }, { @@ -140,9 +156,49 @@ func TestAddToMesh(t *testing.T) { " --injectConfigFile testdata/inject-config.yaml"+ " --valuesFile testdata/inject-values.yaml", " "), expectedException: false, - k8sConfigs: tck8sConfigs, + k8sConfigs: cannedK8sConfigs, expectedOutput: "No deployments found for service dummyservice.default\n", }, + { + description: "Invalid command args - missing service name", + args: strings.Split("experimental add-to-mesh service", " "), + expectedException: true, + expectedOutput: "Error: expecting service name\n", + }, + { + description: "Invalid command args - missing service IP", + args: strings.Split("experimental add-to-mesh external-service test tcp:12345", " "), + expectedException: true, + expectedOutput: "Error: provide service name, IP and Port List\n", + }, + { + description: "Invalid command args - missing service Ports", + args: strings.Split("experimental add-to-mesh external-service test 172.186.15.123", " "), + expectedException: true, + expectedOutput: "Error: provide service name, IP and Port List\n", + }, + { + description: "Invalid command args - invalid port protocol", + args: strings.Split("experimental add-to-mesh external-service test 172.186.15.123 tcp1:12345", " "), + expectedException: true, + expectedOutput: "Error: protocol tcp1 is not supported by Istio\n", + }, + { + description: "service already exists", + args: strings.Split("experimental add-to-mesh external-service dummyservice 11.11.11.11 tcp:12345", " "), + expectedException: true, + k8sConfigs: cannedK8sConfigs, + dynamicConfigs: cannedDynamicConfigs, + expectedOutput: "Error: service \"dummyservice\" already exists, skip\n", + }, + { + description: "ServiceEntry already exists", + args: strings.Split("experimental add-to-mesh external-service vmtest 11.11.11.11 tcp:12345", " "), + expectedException: true, + k8sConfigs: cannedK8sConfigs, + dynamicConfigs: cannedDynamicConfigs, + expectedOutput: "Error: service entry \"mesh-expansion-vmtest\" already exists, skip\n", + }, } for i, c := range cases { @@ -155,13 +211,12 @@ func TestAddToMesh(t *testing.T) { func verifyAddToMeshOutput(t *testing.T, c testcase) { t.Helper() - interfaceFactory = mockInterfaceFactory(c.k8sConfigs) + interfaceFactory = mockInterfaceFactoryGenerator(c.k8sConfigs) + crdFactory = mockDynamicClientGenerator(c.dynamicConfigs) var out bytes.Buffer rootCmd := GetRootCmd(c.args) rootCmd.SetOutput(&out) - file = "" // Clear, because we re-use - fErr := rootCmd.Execute() output := out.String() @@ -181,11 +236,11 @@ func verifyAddToMeshOutput(t *testing.T, c testcase) { } } -func mockInterfaceFactory(k8sConfigs []runtime.Object) func(kubeconfig string) (kubernetes.Interface, error) { - outFactory := func(_ string) (kubernetes.Interface, error) { - client := fake.NewSimpleClientset(k8sConfigs...) +func mockDynamicClientGenerator(dynamicConfigs []runtime.Object) func(kubeconfig string) (dynamic.Interface, error) { + outFactory := func(_ string) (dynamic.Interface, error) { + types := runtime.NewScheme() + client := fake.NewSimpleDynamicClient(types, dynamicConfigs...) return client, nil } - return outFactory }