Skip to content

Commit

Permalink
[NET-5314] Limit v2 Service port registration to L4 TCP ports (#2965)
Browse files Browse the repository at this point in the history
Limit v2 Service port registration to L4 TCP ports

Ignore non-TCP L4 ports in K8s services. This is expected behavior and
also prevents unintended duplication of Service port values registered
to Consul (which is not supported) when ports have multiplexed L4
traffic.
  • Loading branch information
zalimeni authored Sep 15, 2023
1 parent b1bc57e commit 60d09e2
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package endpointsv2

import (
"context"
"fmt"
"net"
"sort"
"strings"
Expand Down Expand Up @@ -202,23 +203,24 @@ func getOwnerPrefixFromPod(pod *corev1.Pod) string {

// registerService creates a Consul service registration from the provided Kuberetes service and endpoint information.
func (r *Controller) registerService(ctx context.Context, resourceClient pbresource.ResourceServiceClient, service corev1.Service, selector *pbcatalog.WorkloadSelector) error {
serviceResource := r.getServiceResource(
&pbcatalog.Service{
Workloads: selector,
Ports: getServicePorts(service),
VirtualIps: r.getServiceVIPs(service),
},
consulSvc := &pbcatalog.Service{
Workloads: selector,
Ports: getServicePorts(service),
VirtualIps: r.getServiceVIPs(service),
}
consulSvcResource := r.getServiceResource(
consulSvc,
service.Name, // Consul and Kubernetes service name will always match
r.getConsulNamespace(service.Namespace),
r.getConsulPartition(),
getServiceMeta(service),
)

r.Log.Info("registering service with Consul", getLogFieldsForResource(serviceResource.Id)...)
r.Log.Info("registering service with Consul", getLogFieldsForResource(consulSvcResource.Id)...)
//TODO: Maybe attempt to debounce redundant writes. For now, we blindly rewrite state on each reconcile.
_, err := resourceClient.Write(ctx, &pbresource.WriteRequest{Resource: serviceResource})
_, err := resourceClient.Write(ctx, &pbresource.WriteRequest{Resource: consulSvcResource})
if err != nil {
r.Log.Error(err, "failed to register service", getLogFieldsForResource(serviceResource.Id)...)
r.Log.Error(err, fmt.Sprintf("failed to register service: %+v", consulSvc), getLogFieldsForResource(consulSvcResource.Id)...)
return err
}

Expand Down Expand Up @@ -254,13 +256,21 @@ func getServicePorts(service corev1.Service) []*pbcatalog.ServicePort {
ports := make([]*pbcatalog.ServicePort, 0, len(service.Spec.Ports)+1)

for _, p := range service.Spec.Ports {
ports = append(ports, &pbcatalog.ServicePort{
VirtualPort: uint32(p.Port),
//TODO: If the value is a number, infer the correct name value based on
// the most prevalent endpoint subset for the port (best-effot, inspect a pod).
TargetPort: p.TargetPort.String(),
Protocol: common.GetPortProtocol(p.AppProtocol),
})
// Service mesh only supports TCP as the L4 Protocol (not to be confused w/ L7 AppProtocol).
//
// This check is necessary to deduplicate VirtualPort values when multiple declared ServicePort values exist
// for the same port, which is possible in K8s when e.g. multiplexing TCP and UDP traffic over a single port.
//
// If we otherwise see repeat port values in a K8s service, we pass along and allow Consul to fail validation.
if p.Protocol == corev1.ProtocolTCP {
ports = append(ports, &pbcatalog.ServicePort{
VirtualPort: uint32(p.Port),
//TODO: If the value is a number, infer the correct name value based on
// the most prevalent endpoint subset for the port (best-effot, inspect a pod).
TargetPort: p.TargetPort.String(),
Protocol: common.GetPortProtocol(p.AppProtocol),
})
}
}

//TODO: Error check reserved "mesh" target port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,23 @@ func TestReconcile_CreateService(t *testing.T) {
{
Name: "public",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.FromString("my-http-port"),
AppProtocol: &appProtocolHttp,
},
{
Name: "api",
Port: 9090,
Protocol: "TCP",
TargetPort: intstr.FromString("my-grpc-port"),
AppProtocol: &appProtocolGrpc,
},
{
Name: "other",
Port: 10001,
Protocol: "TCP",
TargetPort: intstr.FromString("10001"),
// no protocol specified
// no app protocol specified
},
},
},
Expand Down Expand Up @@ -169,18 +172,21 @@ func TestReconcile_CreateService(t *testing.T) {
Addresses: addressesForPods(pod1, pod2),
Ports: []corev1.EndpointPort{
{
Name: "my-http-port",
AppProtocol: &appProtocolHttp,
Name: "public",
Port: 2345,
Protocol: "TCP",
AppProtocol: &appProtocolHttp,
},
{
Name: "my-grpc-port",
AppProtocol: &appProtocolGrpc,
Name: "api",
Port: 6789,
Protocol: "TCP",
AppProtocol: &appProtocolGrpc,
},
{
Name: "10001",
Port: 10001,
Name: "other",
Port: 10001,
Protocol: "TCP",
},
},
},
Expand All @@ -197,20 +203,23 @@ func TestReconcile_CreateService(t *testing.T) {
{
Name: "public",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.FromString("my-http-port"),
AppProtocol: &appProtocolHttp,
},
{
Name: "api",
Port: 9090,
Protocol: "TCP",
TargetPort: intstr.FromString("my-grpc-port"),
AppProtocol: &appProtocolGrpc,
},
{
Name: "other",
Port: 10001,
Protocol: "TCP",
TargetPort: intstr.FromString("10001"),
// no protocol specified
// no app protocol specified
},
},
},
Expand Down Expand Up @@ -282,9 +291,10 @@ func TestReconcile_CreateService(t *testing.T) {
NotReadyAddresses: addressesForPods(pod2),
Ports: []corev1.EndpointPort{
{
Name: "my-http-port",
AppProtocol: &appProtocolHttp,
Name: "public",
Port: 2345,
Protocol: "TCP",
AppProtocol: &appProtocolHttp,
},
},
},
Expand All @@ -301,6 +311,7 @@ func TestReconcile_CreateService(t *testing.T) {
{
Name: "public",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.FromString("my-http-port"),
AppProtocol: &appProtocolHttp,
},
Expand Down Expand Up @@ -366,19 +377,21 @@ func TestReconcile_CreateService(t *testing.T) {
Addresses: addressesForPods(pod1),
Ports: []corev1.EndpointPort{
{
Name: "my-http-port",
AppProtocol: &appProtocolHttp,
Name: "public",
Port: 2345,
Protocol: "TCP",
AppProtocol: &appProtocolHttp,
},
},
},
{
Addresses: addressesForPods(pod2),
Ports: []corev1.EndpointPort{
{
Name: "my-grpc-port",
AppProtocol: &appProtocolGrpc,
Name: "api",
Port: 6789,
Protocol: "TCP",
AppProtocol: &appProtocolGrpc,
},
},
},
Expand All @@ -395,12 +408,14 @@ func TestReconcile_CreateService(t *testing.T) {
{
Name: "public",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.FromString("my-http-port"),
AppProtocol: &appProtocolHttp,
},
{
Name: "api",
Port: 9090,
Protocol: "TCP",
TargetPort: intstr.FromString("my-grpc-port"),
AppProtocol: &appProtocolGrpc,
},
Expand Down Expand Up @@ -454,6 +469,95 @@ func TestReconcile_CreateService(t *testing.T) {
},
},
},
{
name: "Only L4 TCP ports get a Consul Service port when L4 protocols are multiplexed",
svcName: "service-created",
k8sObjects: func() []runtime.Object {
pod1 := createServicePodOwnedBy(kindReplicaSet, "service-created-rs-abcde")
endpoints := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "service-created",
Namespace: "default",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: addressesForPods(pod1),
Ports: []corev1.EndpointPort{
{
Name: "public-tcp",
Port: 2345,
Protocol: "TCP",
},
{
Name: "public-udp",
Port: 2345,
Protocol: "UDP",
},
},
},
},
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service-created",
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "172.18.0.1",
Ports: []corev1.ServicePort{
// Two L4 protocols on one exposed port
{
Name: "public-tcp",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.FromString("my-svc-port"),
},
{
Name: "public-udp",
Port: 8080,
Protocol: "UDP",
TargetPort: intstr.FromString("my-svc-port"),
},
},
},
}
return []runtime.Object{pod1, endpoints, service}
},
expectedResource: &pbresource.Resource{
Id: &pbresource.ID{
Name: "service-created",
Type: &pbresource.Type{
Group: "catalog",
GroupVersion: "v1alpha1",
Kind: "Service",
},
Tenancy: &pbresource.Tenancy{
Namespace: constants.DefaultConsulNS,
Partition: constants.DefaultConsulPartition,
},
},
Data: common.ToProtoAny(&pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{
VirtualPort: 8080,
TargetPort: "my-svc-port",
},
{
TargetPort: "mesh",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
},
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"service-created-rs-abcde"},
},
VirtualIps: []string{"172.18.0.1"},
}),
Metadata: map[string]string{
constants.MetaKeyKubeNS: constants.DefaultConsulNS,
metaKeyManagedBy: constants.ManagedByEndpointsValue,
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -484,8 +588,9 @@ func TestReconcile_UpdateService(t *testing.T) {
Ports: []corev1.EndpointPort{
{
Name: "my-http-port",
AppProtocol: &appProtocolHttp,
Port: 2345,
Protocol: "TCP",
AppProtocol: &appProtocolHttp,
},
},
},
Expand All @@ -502,6 +607,7 @@ func TestReconcile_UpdateService(t *testing.T) {
{
Name: "public",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.FromString("my-http-port"),
AppProtocol: &appProtocolHttp,
},
Expand Down Expand Up @@ -613,13 +719,15 @@ func TestReconcile_UpdateService(t *testing.T) {
Ports: []corev1.EndpointPort{
{
Name: "my-http-port",
AppProtocol: &appProtocolHttp,
Port: 2345,
Protocol: "TCP",
AppProtocol: &appProtocolHttp,
},
{
Name: "my-grpc-port",
AppProtocol: &appProtocolHttp,
Port: 6789,
Protocol: "TCP",
AppProtocol: &appProtocolHttp,
},
},
},
Expand All @@ -636,12 +744,14 @@ func TestReconcile_UpdateService(t *testing.T) {
{
Name: "public",
Port: 8080,
Protocol: "TCP",
TargetPort: intstr.FromString("new-http-port"),
AppProtocol: &appProtocolHttp2,
},
{
Name: "api",
Port: 9091,
Protocol: "TCP",
TargetPort: intstr.FromString("my-grpc-port"),
AppProtocol: &appProtocolGrpc,
},
Expand Down

0 comments on commit 60d09e2

Please sign in to comment.