diff --git a/.changelog/3874.txt b/.changelog/3874.txt new file mode 100644 index 0000000000..c6928774ef --- /dev/null +++ b/.changelog/3874.txt @@ -0,0 +1,3 @@ +```release-note:sync-catalog +Add Endpoint health state to registered consul service +``` diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index 09e7e17200..9e48305a30 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -46,6 +46,7 @@ const ( // consulKubernetesCheckName is the name of health check in Consul for Kubernetes readiness status. consulKubernetesCheckName = "Kubernetes Readiness Check" kubernetesSuccessReasonMsg = "Kubernetes health checks passing" + kubernetesFailureReasonMsg = "Kubernetes health checks failing" ) type NodePortSyncType string @@ -693,7 +694,6 @@ func (t *ServiceResource) generateRegistrations(key string) { } } } - } } } @@ -800,11 +800,17 @@ func (t *ServiceResource) registerServiceInstance( Name: consulKubernetesCheckName, Namespace: baseService.Namespace, Type: consulKubernetesCheckType, - Status: consulapi.HealthPassing, ServiceID: serviceID(r.Service.Service, addr), - Output: kubernetesSuccessReasonMsg, } + // Consider endpoint health state for registered consul service + if endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready { + r.Check.Status = consulapi.HealthPassing + r.Check.Output = kubernetesSuccessReasonMsg + } else { + r.Check.Status = consulapi.HealthCritical + r.Check.Output = kubernetesFailureReasonMsg + } t.consulMap[key] = append(t.consulMap[key], &r) } } diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 08b08ced2f..4951d1b107 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -26,6 +26,7 @@ import ( const nodeName1 = "ip-10-11-12-13.ec2.internal" const nodeName2 = "ip-10-11-12-14.ec2.internal" +const nodeName3 = "ip-10-11-12-15.ec2.internal" func init() { hclog.DefaultOptions.Level = hclog.Debug @@ -763,7 +764,7 @@ func TestServiceResource_lbRegisterEndpoints(t *testing.T) { closer := controller.TestControllerRun(&serviceResource) defer closer() - node1, _ := createNodes(t, client) + node1, _, _ := createNodes(t, client) // Insert the endpoint slice _, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create( @@ -845,7 +846,7 @@ func TestServiceResource_nodePort(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "1.2.3.4", actual[0].Service.Address) require.Equal(r, 30000, actual[0].Service.Port) @@ -854,7 +855,13 @@ func TestServiceResource_nodePort(t *testing.T) { require.Equal(r, "2.3.4.5", actual[1].Service.Address) require.Equal(r, 30000, actual[1].Service.Port) require.Equal(r, "k8s-sync", actual[1].Node) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "3.4.5.6", actual[2].Service.Address) + require.Equal(r, 30000, actual[2].Service.Port) + require.Equal(r, "k8s-sync", actual[2].Node) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -885,7 +892,7 @@ func TestServiceResource_nodePortPrefix(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "prefixfoo", actual[0].Service.Service) require.Equal(r, "1.2.3.4", actual[0].Service.Address) require.Equal(r, 30000, actual[0].Service.Port) @@ -894,7 +901,13 @@ func TestServiceResource_nodePortPrefix(t *testing.T) { require.Equal(r, "2.3.4.5", actual[1].Service.Address) require.Equal(r, 30000, actual[1].Service.Port) require.Equal(r, "k8s-sync", actual[1].Node) + require.Equal(r, "prefixfoo", actual[2].Service.Service) + require.Equal(r, "3.4.5.6", actual[2].Service.Address) + require.Equal(r, 30000, actual[2].Service.Port) + require.Equal(r, "k8s-sync", actual[2].Node) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -911,7 +924,7 @@ func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { closer := controller.TestControllerRun(&serviceResource) defer closer() - node1, _ := createNodes(t, client) + node1, _, _ := createNodes(t, client) // Insert the endpoint slice _, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create( @@ -994,7 +1007,7 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "1.2.3.4", actual[0].Service.Address) require.Equal(r, 30001, actual[0].Service.Port) @@ -1003,7 +1016,13 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { require.Equal(r, "2.3.4.5", actual[1].Service.Address) require.Equal(r, 30001, actual[1].Service.Port) require.Equal(r, "k8s-sync", actual[1].Node) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "3.4.5.6", actual[2].Service.Address) + require.Equal(r, 30001, actual[2].Service.Port) + require.Equal(r, "k8s-sync", actual[2].Node) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1038,7 +1057,7 @@ func TestServiceResource_nodePortUnnamedPort(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "1.2.3.4", actual[0].Service.Address) require.Equal(r, 30000, actual[0].Service.Port) @@ -1047,7 +1066,13 @@ func TestServiceResource_nodePortUnnamedPort(t *testing.T) { require.Equal(r, "2.3.4.5", actual[1].Service.Address) require.Equal(r, 30000, actual[1].Service.Port) require.Equal(r, "k8s-sync", actual[1].Node) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "3.4.5.6", actual[2].Service.Address) + require.Equal(r, 30000, actual[2].Service.Port) + require.Equal(r, "k8s-sync", actual[2].Node) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1078,16 +1103,22 @@ func TestServiceResource_nodePort_internalOnlySync(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "4.5.6.7", actual[0].Service.Address) require.Equal(r, 30000, actual[0].Service.Port) require.Equal(r, "k8s-sync", actual[0].Node) require.Equal(r, "foo", actual[1].Service.Service) - require.Equal(r, "3.4.5.6", actual[1].Service.Address) + require.Equal(r, "5.6.7.8", actual[1].Service.Address) require.Equal(r, 30000, actual[1].Service.Port) require.Equal(r, "k8s-sync", actual[1].Node) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "6.7.8.9", actual[2].Service.Address) + require.Equal(r, 30000, actual[2].Service.Port) + require.Equal(r, "k8s-sync", actual[2].Node) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1104,7 +1135,7 @@ func TestServiceResource_nodePort_externalFirstSync(t *testing.T) { closer := controller.TestControllerRun(&serviceResource) defer closer() - node1, _ := createNodes(t, client) + node1, _, _ := createNodes(t, client) node1.Status = corev1.NodeStatus{ Addresses: []corev1.NodeAddress{ @@ -1126,7 +1157,7 @@ func TestServiceResource_nodePort_externalFirstSync(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "4.5.6.7", actual[0].Service.Address) require.Equal(r, 30000, actual[0].Service.Port) @@ -1135,7 +1166,13 @@ func TestServiceResource_nodePort_externalFirstSync(t *testing.T) { require.Equal(r, "2.3.4.5", actual[1].Service.Address) require.Equal(r, 30000, actual[1].Service.Port) require.Equal(r, "k8s-sync", actual[1].Node) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "3.4.5.6", actual[2].Service.Address) + require.Equal(r, 30000, actual[2].Service.Port) + require.Equal(r, "k8s-sync", actual[2].Node) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1166,16 +1203,22 @@ func TestServiceResource_clusterIP(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "1.1.1.1", actual[0].Service.Address) require.Equal(r, 8080, actual[0].Service.Port) + require.Equal(r, "us-west-2a", actual[0].Service.Meta["external-k8s-topology-zone"]) require.Equal(r, "foo", actual[1].Service.Service) require.Equal(r, "2.2.2.2", actual[1].Service.Address) require.Equal(r, 8080, actual[1].Service.Port) - require.Equal(r, "us-west-2a", actual[0].Service.Meta["external-k8s-topology-zone"]) require.Equal(r, "us-west-2b", actual[1].Service.Meta["external-k8s-topology-zone"]) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "3.3.3.3", actual[2].Service.Address) + require.Equal(r, 8080, actual[2].Service.Port) + require.Equal(r, "us-west-2c", actual[2].Service.Meta["external-k8s-topology-zone"]) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1206,15 +1249,19 @@ func TestServiceResource_clusterIP_healthCheck(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, consulKubernetesCheckName, actual[0].Check.Name) require.Equal(r, consulapi.HealthPassing, actual[0].Check.Status) require.Equal(r, kubernetesSuccessReasonMsg, actual[0].Check.Output) require.Equal(r, consulKubernetesCheckType, actual[0].Check.Type) require.Equal(r, consulKubernetesCheckName, actual[1].Check.Name) - require.Equal(r, consulapi.HealthPassing, actual[1].Check.Status) - require.Equal(r, kubernetesSuccessReasonMsg, actual[1].Check.Output) + require.Equal(r, consulapi.HealthCritical, actual[1].Check.Status) + require.Equal(r, kubernetesFailureReasonMsg, actual[1].Check.Output) require.Equal(r, consulKubernetesCheckType, actual[1].Check.Type) + require.Equal(r, consulKubernetesCheckName, actual[2].Check.Name) + require.Equal(r, consulapi.HealthCritical, actual[2].Check.Status) + require.Equal(r, kubernetesFailureReasonMsg, actual[2].Check.Output) + require.Equal(r, consulKubernetesCheckType, actual[2].Check.Type) }) } @@ -1246,14 +1293,19 @@ func TestServiceResource_clusterIPPrefix(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "prefixfoo", actual[0].Service.Service) require.Equal(r, "1.1.1.1", actual[0].Service.Address) require.Equal(r, 8080, actual[0].Service.Port) require.Equal(r, "prefixfoo", actual[1].Service.Service) require.Equal(r, "2.2.2.2", actual[1].Service.Address) require.Equal(r, 8080, actual[1].Service.Port) + require.Equal(r, "prefixfoo", actual[2].Service.Service) + require.Equal(r, "3.3.3.3", actual[2].Service.Address) + require.Equal(r, 8080, actual[2].Service.Port) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1286,14 +1338,19 @@ func TestServiceResource_clusterIPAnnotatedPortName(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "1.1.1.1", actual[0].Service.Address) require.Equal(r, 2000, actual[0].Service.Port) require.Equal(r, "foo", actual[1].Service.Service) require.Equal(r, "2.2.2.2", actual[1].Service.Address) require.Equal(r, 2000, actual[1].Service.Port) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "3.3.3.3", actual[2].Service.Address) + require.Equal(r, 2000, actual[2].Service.Port) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1326,14 +1383,19 @@ func TestServiceResource_clusterIPAnnotatedPortNumber(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "1.1.1.1", actual[0].Service.Address) require.Equal(r, 4141, actual[0].Service.Port) require.Equal(r, "foo", actual[1].Service.Service) require.Equal(r, "2.2.2.2", actual[1].Service.Address) require.Equal(r, 4141, actual[1].Service.Port) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "3.3.3.3", actual[2].Service.Address) + require.Equal(r, 4141, actual[2].Service.Port) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1368,14 +1430,19 @@ func TestServiceResource_clusterIPUnnamedPorts(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "1.1.1.1", actual[0].Service.Address) require.Equal(r, 8080, actual[0].Service.Port) require.Equal(r, "foo", actual[1].Service.Service) require.Equal(r, "2.2.2.2", actual[1].Service.Address) require.Equal(r, 8080, actual[1].Service.Port) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "3.3.3.3", actual[2].Service.Address) + require.Equal(r, 8080, actual[2].Service.Port) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1439,14 +1506,19 @@ func TestServiceResource_clusterIPAllNamespaces(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "1.1.1.1", actual[0].Service.Address) require.Equal(r, 8080, actual[0].Service.Port) require.Equal(r, "foo", actual[1].Service.Service) require.Equal(r, "2.2.2.2", actual[1].Service.Address) require.Equal(r, 8080, actual[1].Service.Port) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "3.3.3.3", actual[2].Service.Address) + require.Equal(r, 8080, actual[2].Service.Port) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1482,14 +1554,19 @@ func TestServiceResource_clusterIPTargetPortNamed(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foo", actual[0].Service.Service) require.Equal(r, "1.1.1.1", actual[0].Service.Address) require.Equal(r, 2000, actual[0].Service.Port) require.Equal(r, "foo", actual[1].Service.Service) require.Equal(r, "2.2.2.2", actual[1].Service.Address) require.Equal(r, 2000, actual[1].Service.Port) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "3.3.3.3", actual[2].Service.Address) + require.Equal(r, 2000, actual[2].Service.Port) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + require.NotEqual(r, actual[0].Service.ID, actual[2].Service.ID) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) }) } @@ -1520,7 +1597,7 @@ func TestServiceResource_targetRefInMeta(t *testing.T) { syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, "foobar", actual[0].Service.Meta[ConsulK8SRefValue]) require.Equal(r, "pod", actual[0].Service.Meta[ConsulK8SRefKind]) require.Equal(r, nodeName1, actual[0].Service.Meta[ConsulK8SNodeName]) @@ -2013,7 +2090,7 @@ func TestServiceResource_addIngress(t *testing.T) { require.Equal(r, test.expectedAddress, actual[0].Service.Address) require.Equal(r, test.expectedPort, actual[0].Service.Port) } else { - require.Len(r, actual, 2) + require.Len(r, actual, 3) require.Equal(r, test.expectedAddress, actual[0].Service.Address) require.Equal(r, test.expectedPort, actual[0].Service.Port) } @@ -2085,8 +2162,8 @@ func clusterIPService(name, namespace string) *corev1.Service { } } -// createNodes calls the fake k8s client to create two Kubernetes nodes and returns them. -func createNodes(t *testing.T, client *fake.Clientset) (*corev1.Node, *corev1.Node) { +// createNodes calls the fake k8s client to create three Kubernetes nodes and returns them. +func createNodes(t *testing.T, client *fake.Clientset) (*corev1.Node, *corev1.Node, *corev1.Node) { // Insert the nodes node1 := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -2112,21 +2189,37 @@ func createNodes(t *testing.T, client *fake.Clientset) (*corev1.Node, *corev1.No Status: corev1.NodeStatus{ Addresses: []corev1.NodeAddress{ {Type: corev1.NodeExternalIP, Address: "2.3.4.5"}, - {Type: corev1.NodeInternalIP, Address: "3.4.5.6"}, - {Type: corev1.NodeInternalIP, Address: "6.7.8.9"}, + {Type: corev1.NodeInternalIP, Address: "5.6.7.8"}, + {Type: corev1.NodeInternalIP, Address: "8.9.10.11"}, }, }, } _, err = client.CoreV1().Nodes().Create(context.Background(), node2, metav1.CreateOptions{}) require.NoError(t, err) - return node1, node2 + node3 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName3, + }, + + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + {Type: corev1.NodeExternalIP, Address: "3.4.5.6"}, + {Type: corev1.NodeInternalIP, Address: "6.7.8.9"}, + {Type: corev1.NodeInternalIP, Address: "9.10.11.12"}, + }, + }, + } + _, err = client.CoreV1().Nodes().Create(context.Background(), node3, metav1.CreateOptions{}) + require.NoError(t, err) + return node1, node2, node3 } -// createEndpointSlices calls the fake k8s client to create an endpoint slices with two endpoints on different nodes. +// createEndpointSlices calls the fake k8s client to create an endpoint slices with three endpoints on different nodes. func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName string, namespace string) { node1 := nodeName1 node2 := nodeName2 + node3 := nodeName3 targetRef := corev1.ObjectReference{Kind: "pod", Name: "foobar"} _, err := client.DiscoveryV1().EndpointSlices(namespace).Create( @@ -2152,13 +2245,23 @@ func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName strin { Addresses: []string{"2.2.2.2"}, Conditions: discoveryv1.EndpointConditions{ - Ready: ptr.To(true), + Ready: nil, Serving: ptr.To(true), Terminating: ptr.To(false), }, NodeName: &node2, Zone: ptr.To("us-west-2b"), }, + { + Addresses: []string{"3.3.3.3"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: ptr.To(false), + Serving: ptr.To(false), + Terminating: ptr.To(true), + }, + NodeName: &node3, + Zone: ptr.To("us-west-2c"), + }, }, Ports: []discoveryv1.EndpointPort{ {