From 60d09e21f959c9faac642c386c67b52ca3ea158e Mon Sep 17 00:00:00 2001 From: Michael Zalimeni Date: Fri, 15 Sep 2023 16:26:17 -0400 Subject: [PATCH] [NET-5314] Limit v2 Service port registration to L4 TCP ports (#2965) Limit v2 Service port registration to L4 TCP ports Ignore non-TCP L4 ports in K8s services. This is expected behavior and also prevents unintended duplication of Service port values registered to Consul (which is not supported) when ports have multiplexed L4 traffic. --- .../endpointsv2/endpoints_controller.go | 42 +++-- .../endpointsv2/endpoints_controller_test.go | 144 +++++++++++++++--- 2 files changed, 153 insertions(+), 33 deletions(-) diff --git a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go index dca7dad950..739f1bade2 100644 --- a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go +++ b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller.go @@ -4,6 +4,7 @@ package endpointsv2 import ( "context" + "fmt" "net" "sort" "strings" @@ -202,23 +203,24 @@ func getOwnerPrefixFromPod(pod *corev1.Pod) string { // registerService creates a Consul service registration from the provided Kuberetes service and endpoint information. func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, service corev1.Service, selector *pbcatalog.WorkloadSelector) error { - serviceResource := r.getServiceResource( - &pbcatalog.Service{ - Workloads: selector, - Ports: getServicePorts(service), - VirtualIps: r.getServiceVIPs(service), - }, + consulSvc := &pbcatalog.Service{ + Workloads: selector, + Ports: getServicePorts(service), + VirtualIps: r.getServiceVIPs(service), + } + consulSvcResource := r.getServiceResource( + consulSvc, service.Name, // Consul and Kubernetes service name will always match r.getConsulNamespace(service.Namespace), r.getConsulPartition(), getServiceMeta(service), ) - r.Log.Info("registering service with Consul", getLogFieldsForResource(serviceResource.Id)...) + r.Log.Info("registering service with Consul", getLogFieldsForResource(consulSvcResource.Id)...) //TODO: Maybe attempt to debounce redundant writes. For now, we blindly rewrite state on each reconcile. - _, err := resourceClient.Write(ctx, &pbresource.WriteRequest{Resource: serviceResource}) + _, err := resourceClient.Write(ctx, &pbresource.WriteRequest{Resource: consulSvcResource}) if err != nil { - r.Log.Error(err, "failed to register service", getLogFieldsForResource(serviceResource.Id)...) + r.Log.Error(err, fmt.Sprintf("failed to register service: %+v", consulSvc), getLogFieldsForResource(consulSvcResource.Id)...) return err } @@ -254,13 +256,21 @@ func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort { ports := make([]*pbcatalog.ServicePort, 0, len(service.Spec.Ports)+1) for _, p := range service.Spec.Ports { - ports = append(ports, &pbcatalog.ServicePort{ - VirtualPort: uint32(p.Port), - //TODO: If the value is a number, infer the correct name value based on - // the most prevalent endpoint subset for the port (best-effot, inspect a pod). - TargetPort: p.TargetPort.String(), - Protocol: common.GetPortProtocol(p.AppProtocol), - }) + // Service mesh only supports TCP as the L4 Protocol (not to be confused w/ L7 AppProtocol). + // + // This check is necessary to deduplicate VirtualPort values when multiple declared ServicePort values exist + // for the same port, which is possible in K8s when e.g. multiplexing TCP and UDP traffic over a single port. + // + // If we otherwise see repeat port values in a K8s service, we pass along and allow Consul to fail validation. + if p.Protocol == corev1.ProtocolTCP { + ports = append(ports, &pbcatalog.ServicePort{ + VirtualPort: uint32(p.Port), + //TODO: If the value is a number, infer the correct name value based on + // the most prevalent endpoint subset for the port (best-effot, inspect a pod). + TargetPort: p.TargetPort.String(), + Protocol: common.GetPortProtocol(p.AppProtocol), + }) + } } //TODO: Error check reserved "mesh" target port diff --git a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go index 2458444008..caadfb8d9c 100644 --- a/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go +++ b/control-plane/connect-inject/controllers/endpointsv2/endpoints_controller_test.go @@ -89,20 +89,23 @@ func TestReconcile_CreateService(t *testing.T) { { Name: "public", Port: 8080, + Protocol: "TCP", TargetPort: intstr.FromString("my-http-port"), AppProtocol: &appProtocolHttp, }, { Name: "api", Port: 9090, + Protocol: "TCP", TargetPort: intstr.FromString("my-grpc-port"), AppProtocol: &appProtocolGrpc, }, { Name: "other", Port: 10001, + Protocol: "TCP", TargetPort: intstr.FromString("10001"), - // no protocol specified + // no app protocol specified }, }, }, @@ -169,18 +172,21 @@ func TestReconcile_CreateService(t *testing.T) { Addresses: addressesForPods(pod1, pod2), Ports: []corev1.EndpointPort{ { - Name: "my-http-port", - AppProtocol: &appProtocolHttp, + Name: "public", Port: 2345, + Protocol: "TCP", + AppProtocol: &appProtocolHttp, }, { - Name: "my-grpc-port", - AppProtocol: &appProtocolGrpc, + Name: "api", Port: 6789, + Protocol: "TCP", + AppProtocol: &appProtocolGrpc, }, { - Name: "10001", - Port: 10001, + Name: "other", + Port: 10001, + Protocol: "TCP", }, }, }, @@ -197,20 +203,23 @@ func TestReconcile_CreateService(t *testing.T) { { Name: "public", Port: 8080, + Protocol: "TCP", TargetPort: intstr.FromString("my-http-port"), AppProtocol: &appProtocolHttp, }, { Name: "api", Port: 9090, + Protocol: "TCP", TargetPort: intstr.FromString("my-grpc-port"), AppProtocol: &appProtocolGrpc, }, { Name: "other", Port: 10001, + Protocol: "TCP", TargetPort: intstr.FromString("10001"), - // no protocol specified + // no app protocol specified }, }, }, @@ -282,9 +291,10 @@ func TestReconcile_CreateService(t *testing.T) { NotReadyAddresses: addressesForPods(pod2), Ports: []corev1.EndpointPort{ { - Name: "my-http-port", - AppProtocol: &appProtocolHttp, + Name: "public", Port: 2345, + Protocol: "TCP", + AppProtocol: &appProtocolHttp, }, }, }, @@ -301,6 +311,7 @@ func TestReconcile_CreateService(t *testing.T) { { Name: "public", Port: 8080, + Protocol: "TCP", TargetPort: intstr.FromString("my-http-port"), AppProtocol: &appProtocolHttp, }, @@ -366,9 +377,10 @@ func TestReconcile_CreateService(t *testing.T) { Addresses: addressesForPods(pod1), Ports: []corev1.EndpointPort{ { - Name: "my-http-port", - AppProtocol: &appProtocolHttp, + Name: "public", Port: 2345, + Protocol: "TCP", + AppProtocol: &appProtocolHttp, }, }, }, @@ -376,9 +388,10 @@ func TestReconcile_CreateService(t *testing.T) { Addresses: addressesForPods(pod2), Ports: []corev1.EndpointPort{ { - Name: "my-grpc-port", - AppProtocol: &appProtocolGrpc, + Name: "api", Port: 6789, + Protocol: "TCP", + AppProtocol: &appProtocolGrpc, }, }, }, @@ -395,12 +408,14 @@ func TestReconcile_CreateService(t *testing.T) { { Name: "public", Port: 8080, + Protocol: "TCP", TargetPort: intstr.FromString("my-http-port"), AppProtocol: &appProtocolHttp, }, { Name: "api", Port: 9090, + Protocol: "TCP", TargetPort: intstr.FromString("my-grpc-port"), AppProtocol: &appProtocolGrpc, }, @@ -454,6 +469,95 @@ func TestReconcile_CreateService(t *testing.T) { }, }, }, + { + name: "Only L4 TCP ports get a Consul Service port when L4 protocols are multiplexed", + svcName: "service-created", + k8sObjects: func() []runtime.Object { + pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde") + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addressesForPods(pod1), + Ports: []corev1.EndpointPort{ + { + Name: "public-tcp", + Port: 2345, + Protocol: "TCP", + }, + { + Name: "public-udp", + Port: 2345, + Protocol: "UDP", + }, + }, + }, + }, + } + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "172.18.0.1", + Ports: []corev1.ServicePort{ + // Two L4 protocols on one exposed port + { + Name: "public-tcp", + Port: 8080, + Protocol: "TCP", + TargetPort: intstr.FromString("my-svc-port"), + }, + { + Name: "public-udp", + Port: 8080, + Protocol: "UDP", + TargetPort: intstr.FromString("my-svc-port"), + }, + }, + }, + } + return []runtime.Object{pod1, endpoints, service} + }, + expectedResource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "service-created", + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Service", + }, + Tenancy: &pbresource.Tenancy{ + Namespace: constants.DefaultConsulNS, + Partition: constants.DefaultConsulPartition, + }, + }, + Data: common.ToProtoAny(&pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{ + { + VirtualPort: 8080, + TargetPort: "my-svc-port", + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"service-created-rs-abcde"}, + }, + VirtualIps: []string{"172.18.0.1"}, + }), + Metadata: map[string]string{ + constants.MetaKeyKubeNS: constants.DefaultConsulNS, + metaKeyManagedBy: constants.ManagedByEndpointsValue, + }, + }, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { @@ -484,8 +588,9 @@ func TestReconcile_UpdateService(t *testing.T) { Ports: []corev1.EndpointPort{ { Name: "my-http-port", - AppProtocol: &appProtocolHttp, Port: 2345, + Protocol: "TCP", + AppProtocol: &appProtocolHttp, }, }, }, @@ -502,6 +607,7 @@ func TestReconcile_UpdateService(t *testing.T) { { Name: "public", Port: 8080, + Protocol: "TCP", TargetPort: intstr.FromString("my-http-port"), AppProtocol: &appProtocolHttp, }, @@ -613,13 +719,15 @@ func TestReconcile_UpdateService(t *testing.T) { Ports: []corev1.EndpointPort{ { Name: "my-http-port", - AppProtocol: &appProtocolHttp, Port: 2345, + Protocol: "TCP", + AppProtocol: &appProtocolHttp, }, { Name: "my-grpc-port", - AppProtocol: &appProtocolHttp, Port: 6789, + Protocol: "TCP", + AppProtocol: &appProtocolHttp, }, }, }, @@ -636,12 +744,14 @@ func TestReconcile_UpdateService(t *testing.T) { { Name: "public", Port: 8080, + Protocol: "TCP", TargetPort: intstr.FromString("new-http-port"), AppProtocol: &appProtocolHttp2, }, { Name: "api", Port: 9091, + Protocol: "TCP", TargetPort: intstr.FromString("my-grpc-port"), AppProtocol: &appProtocolGrpc, },