diff --git a/.changelog/3215.txt b/.changelog/3215.txt new file mode 100644 index 0000000000..c753e1ff79 --- /dev/null +++ b/.changelog/3215.txt @@ -0,0 +1,3 @@ +```release-note:bug +consul-telemetry-collector: fix deployments to non-default namespaces when global.enableConsulNamespaces +``` \ No newline at end of file diff --git a/acceptance/tests/cloud/basic_test.go b/acceptance/tests/cloud/basic_test.go index 8278309ff3..c48390ede7 100644 --- a/acceptance/tests/cloud/basic_test.go +++ b/acceptance/tests/cloud/basic_test.go @@ -4,22 +4,19 @@ package cloud import ( - "crypto/tls" - "encoding/json" - "errors" "fmt" - "io" - "net/http" "strings" "testing" "time" + "github.com/google/uuid" terratestk8s "github.com/gruntwork-io/terratest/modules/k8s" "github.com/hashicorp/consul-k8s/acceptance/framework/consul" "github.com/hashicorp/consul-k8s/acceptance/framework/environment" "github.com/hashicorp/consul-k8s/acceptance/framework/helpers" "github.com/hashicorp/consul-k8s/acceptance/framework/k8s" "github.com/hashicorp/consul-k8s/acceptance/framework/logger" + "github.com/hashicorp/consul/api" "github.com/hashicorp/serf/testutil/retry" "github.com/stretchr/testify/require" ) @@ -52,182 +49,260 @@ var ( scadaAddressSecretName = "scadaaddress-sec-name" scadaAddressSecretKey = "scadaaddress-sec-key" scadaAddressSecretKeyValue = "fake-server:443" -) - -// The fake-server has a requestToken endpoint to retrieve the token. -func requestToken(endpoint string) (string, error) { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - - client := &http.Client{Transport: tr} - url := fmt.Sprintf("https://%s/token", endpoint) - req, err := http.NewRequest("GET", url, nil) - if err != nil { - fmt.Println("Error creating request:", err) - return "", errors.New("error creating request") - } - - // Perform the request - resp, err := client.Do(req) - if err != nil { - fmt.Println("Error sending request:", err) - return "", errors.New("error making request") - } - defer resp.Body.Close() - - // Read the response body - body, err := io.ReadAll(resp.Body) - if err != nil { - fmt.Println("Error reading response:", err) - return "", errors.New("error reading body") - } - - var tokenResponse TokenResponse - err = json.Unmarshal(body, &tokenResponse) - if err != nil { - fmt.Println("Error parsing response:", err) - return "", errors.New("error parsing body") - } - - return tokenResponse.Token, nil -} - -func TestBasicCloud(t *testing.T) { - ctx := suite.Environment().DefaultContext(t) - - kubectlOptions := ctx.KubectlOptions(t) - ns := kubectlOptions.Namespace - k8sClient := environment.KubernetesClientFromOptions(t, kubectlOptions) + bootstrapTokenSecretName = "bootstrap-token" + bootstrapTokenSecretKey = "token" + bootstrapToken = uuid.NewString() +) +func TestObservabilityCloud(t *testing.T) { cfg := suite.Config() if cfg.HCPResourceID != "" { resourceSecretKeyValue = cfg.HCPResourceID } - consul.CreateK8sSecret(t, k8sClient, cfg, ns, resourceSecretName, resourceSecretKey, resourceSecretKeyValue) - consul.CreateK8sSecret(t, k8sClient, cfg, ns, clientIDSecretName, clientIDSecretKey, clientIDSecretKeyValue) - consul.CreateK8sSecret(t, k8sClient, cfg, ns, clientSecretName, clientSecretKey, clientSecretKeyValue) - consul.CreateK8sSecret(t, k8sClient, cfg, ns, apiHostSecretName, apiHostSecretKey, apiHostSecretKeyValue) - consul.CreateK8sSecret(t, k8sClient, cfg, ns, authUrlSecretName, authUrlSecretKey, authUrlSecretKeyValue) - consul.CreateK8sSecret(t, k8sClient, cfg, ns, scadaAddressSecretName, scadaAddressSecretKey, scadaAddressSecretKeyValue) - - k8s.DeployKustomize(t, ctx.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/bases/cloud/hcp-mock") - podName, err := k8s.RunKubectlAndGetOutputE(t, ctx.KubectlOptions(t), "get", "pod", "-l", "app=fake-server", "-o", `jsonpath="{.items[0].metadata.name}"`) - podName = strings.ReplaceAll(podName, "\"", "") - if err != nil { - logger.Log(t, "error finding pod name") - return - } - logger.Log(t, "fake-server pod name:"+podName) - localPort := terratestk8s.GetAvailablePort(t) - tunnel := terratestk8s.NewTunnelWithLogger( - ctx.KubectlOptions(t), - terratestk8s.ResourceTypePod, - podName, - localPort, - 443, - logger.TestLogger{}) - - // Retry creating the port forward since it can fail occasionally. - retry.RunWith(&retry.Counter{Wait: 5 * time.Second, Count: 60}, t, func(r *retry.R) { - // NOTE: It's okay to pass in `t` to ForwardPortE despite being in a retry - // because we're using ForwardPortE (not ForwardPort) so the `t` won't - // get used to fail the test, just for logging. - require.NoError(r, tunnel.ForwardPortE(t)) - }) - - logger.Log(t, "fake-server addr:"+tunnel.Endpoint()) - consulToken, err := requestToken(tunnel.Endpoint()) - if err != nil { - logger.Log(t, "error finding consul token") - return - } - tunnel.Close() - logger.Log(t, "consul test token :"+consulToken) - - releaseName := helpers.RandomName() - - helmValues := map[string]string{ - "global.cloud.enabled": "true", - "global.cloud.resourceId.secretName": resourceSecretName, - "global.cloud.resourceId.secretKey": resourceSecretKey, - - "global.cloud.clientId.secretName": clientIDSecretName, - "global.cloud.clientId.secretKey": clientIDSecretKey, - - "global.cloud.clientSecret.secretName": clientSecretName, - "global.cloud.clientSecret.secretKey": clientSecretKey, - - "global.cloud.apiHost.secretName": apiHostSecretName, - "global.cloud.apiHost.secretKey": apiHostSecretKey, - - "global.cloud.authUrl.secretName": authUrlSecretName, - "global.cloud.authUrl.secretKey": authUrlSecretKey, - - "global.cloud.scadaAddress.secretName": scadaAddressSecretName, - "global.cloud.scadaAddress.secretKey": scadaAddressSecretKey, - "connectInject.default": "true", - - // TODO: Follow up with this bug - "global.acls.manageSystemACLs": "false", - "global.gossipEncryption.autoGenerate": "false", - "global.tls.enabled": "true", - "global.tls.enableAutoEncrypt": "true", - // TODO: Take this out - - "telemetryCollector.enabled": "true", - "telemetryCollector.image": cfg.ConsulCollectorImage, - "telemetryCollector.cloud.clientId.secretName": clientIDSecretName, - "telemetryCollector.cloud.clientId.secretKey": clientIDSecretKey, - - "telemetryCollector.cloud.clientSecret.secretName": clientSecretName, - "telemetryCollector.cloud.clientSecret.secretKey": clientSecretKey, - // Either we set the global.trustedCAs (make sure it's idented exactly) or we - // set TLS to insecure - - "telemetryCollector.extraEnvironmentVars.HCP_API_TLS": "insecure", - "telemetryCollector.extraEnvironmentVars.HCP_AUTH_TLS": "insecure", - "telemetryCollector.extraEnvironmentVars.HCP_SCADA_TLS": "insecure", - "telemetryCollector.extraEnvironmentVars.OTLP_EXPORTER_TLS": "insecure", - - "server.extraEnvironmentVars.HCP_API_TLS": "insecure", - "server.extraEnvironmentVars.HCP_AUTH_TLS": "insecure", - "server.extraEnvironmentVars.HCP_SCADA_TLS": "insecure", - - // This is pregenerated CA used for testing. It can be replaced at any time and isn't - // meant for anything other than testing - // "global.trustedCAs[0]": `-----BEGIN CERTIFICATE----- - // MIICrjCCAZYCCQD5LxMcnMY8rDANBgkqhkiG9w0BAQsFADAZMRcwFQYDVQQDDA5m - // YWtlLXNlcnZlci1jYTAeFw0yMzA1MTkxMjIwMzhaFw0zMzA1MTYxMjIwMzhaMBkx - // FzAVBgNVBAMMDmZha2Utc2VydmVyLWNhMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A - // MIIBCgKCAQEAwhbiII7sMultedFzQVhVZz5Ti+9lWrpZb8y0ZR6NaNvoxDPX151t - // Adh5NegSeH/+351iDBGZHhmKECtBuk8FJgk88O7y8A7Yg+/lyeZd0SJTEeiYUe7d - // sSaBTYSmixyn6s15Y5MVp9gM7t2YXrocRkFxDtdhLMWf0zwzJEwDouFMMiFZw5II - // yDbI6UfwKyB8C8ln10+TcczbheaOMQ1jGn35YWAG/LEdutU6DO2Y/GZYQ41nyLF1 - // klqh34USQPVQSQW7R7GiDxyhh1fGaDF6RAzH4RerzQSNvvTHmBXIGurB/Hnu1n3p - // CwWeatWMU5POy1es73S/EPM0NpWD5RabSwIDAQABMA0GCSqGSIb3DQEBCwUAA4IB - // AQBayoTltSW55PvKVp9cmqGOBMlkIMKPd6Ny4bCb/3UF+3bzQmIblh3O3kEt7WoY - // fA9vp+6cSRGVqgBfR2bi40RrerLNA79yywIZjfBMteNuRoul5VeD+mLyFCo4197r - // Atl2TEx2kl2V8rjCsEBcTqKqetVOMLYEZ2tbCeUt1A/K7OzaJfHgelEYcsVt68Q9 - // /BLoo2UXfOpRrcsx7u7s5HPVbG3bx+1MvGJZ2C3i0B6agnkGDzEpoM4KZGxEefB9 - // DOHIJfie9d9BQD52nZh3SGHz0b3vfJ430XrQmaNZ26fuIEyIYrpvyAhBXckj2iTD - // 1TXpqr/1D7EUbddktyhXTK9e - // -----END CERTIFICATE-----`, - } - if cfg.ConsulImage != "" { - helmValues["global.image"] = cfg.ConsulImage - } - if cfg.ConsulCollectorImage != "" { - helmValues["telemetryCollector.image"] = cfg.ConsulCollectorImage - } - consulCluster := consul.NewHelmCluster(t, helmValues, suite.Environment().DefaultContext(t), suite.Config(), releaseName) - consulCluster.Create(t) + cases := []struct { + name string + validateCloudInteractions bool + enableConsulNamespaces bool + mirroringK8S bool + adminPartitionsEnabled bool + secure bool + }{ + { + name: "default namespace and partition", + validateCloudInteractions: true, + }, + { + name: "default namespace and partition; secure", + secure: true, + }, + { + name: "namespace mirroring; secure", + enableConsulNamespaces: true, + mirroringK8S: true, + secure: true, + }, + { + name: "admin partitions; secure", + enableConsulNamespaces: true, + mirroringK8S: true, + adminPartitionsEnabled: true, + secure: true, + }, + } - logger.Log(t, "creating static-server deployment") - k8s.DeployKustomize(t, ctx.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/bases/static-server") - // time.Sleep(1 * time.Hour) - // TODO: add in test assertions here + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + ctx := suite.Environment().DefaultContext(t) + + if c.enableConsulNamespaces && !cfg.EnableEnterprise { + t.Skip("skipping this test because -enable-enterprise is not set") + } + + options := &terratestk8s.KubectlOptions{ + ContextName: ctx.KubectlOptions(t).ContextName, + ConfigPath: ctx.KubectlOptions(t).ConfigPath, + Namespace: ctx.KubectlOptions(t).Namespace, + } + ns := options.Namespace + + k8sClient := environment.KubernetesClientFromOptions(t, options) + + // Create cloud and telemetryCollector secrets. + consul.CreateK8sSecret(t, k8sClient, cfg, ns, resourceSecretName, resourceSecretKey, resourceSecretKeyValue) + consul.CreateK8sSecret(t, k8sClient, cfg, ns, clientIDSecretName, clientIDSecretKey, clientIDSecretKeyValue) + consul.CreateK8sSecret(t, k8sClient, cfg, ns, clientSecretName, clientSecretKey, clientSecretKeyValue) + consul.CreateK8sSecret(t, k8sClient, cfg, ns, apiHostSecretName, apiHostSecretKey, apiHostSecretKeyValue) + consul.CreateK8sSecret(t, k8sClient, cfg, ns, authUrlSecretName, authUrlSecretKey, authUrlSecretKeyValue) + consul.CreateK8sSecret(t, k8sClient, cfg, ns, scadaAddressSecretName, scadaAddressSecretKey, scadaAddressSecretKeyValue) + consul.CreateK8sSecret(t, k8sClient, cfg, ns, bootstrapTokenSecretName, bootstrapTokenSecretKey, bootstrapToken) + + k8s.DeployKustomize(t, options, cfg.NoCleanupOnFailure, cfg.NoCleanup, cfg.DebugDirectory, "../fixtures/bases/cloud/hcp-mock") + podName, err := k8s.RunKubectlAndGetOutputE(t, options, "get", "pod", "-l", "app=fake-server", "-o", `jsonpath="{.items[0].metadata.name}"`) + podName = strings.ReplaceAll(podName, "\"", "") + if err != nil { + logger.Log(t, "error finding pod name") + return + } + logger.Log(t, "fake-server pod name:"+podName) + localPort := terratestk8s.GetAvailablePort(t) + tunnel := terratestk8s.NewTunnelWithLogger( + options, + terratestk8s.ResourceTypePod, + podName, + localPort, + 443, + logger.TestLogger{}) + defer tunnel.Close() + // Retry creating the port forward since it can fail occasionally. + retry.RunWith(&retry.Counter{Wait: 5 * time.Second, Count: 60}, t, func(r *retry.R) { + // NOTE: It's okay to pass in `t` to ForwardPortE despite being in a retry + // because we're using ForwardPortE (not ForwardPort) so the `t` won't + // get used to fail the test, just for logging. + require.NoError(r, tunnel.ForwardPortE(t)) + }) + + fsClient := newfakeServerClient(tunnel.Endpoint()) + logger.Log(t, "fake-server addr:"+tunnel.Endpoint()) + consulToken, err := fsClient.requestToken() + if err != nil { + logger.Log(t, "error finding consul token") + return + } + + logger.Log(t, "consul test token :"+consulToken) + + releaseName := helpers.RandomName() + + helmValues := map[string]string{ + "global.imagePullPolicy": "IfNotPresent", + + "global.acls.manageSystemACLs": fmt.Sprint(c.secure), + "global.tls.enabled": fmt.Sprint(c.secure), + "global.adminPartitions.enabled": fmt.Sprint(c.adminPartitionsEnabled), + + "global.enableConsulNamespaces": fmt.Sprint(c.enableConsulNamespaces), + "connectInject.enabled": "true", + "connectInject.consulNamespaces.mirroringK8S": fmt.Sprint(c.mirroringK8S), + + // TODO this doesn't appear to work because we just deploy to default using kubectl options from context. + // https://github.com/hashicorp/consul-k8s/blob/74097fe7b3023105ca755b45da9c72c716547f46/acceptance/framework/consul/helm_cluster.go#L107 + // "connectInject.consulNamespaces.consulDestinationNamespace": c.destinationNamespace, + + "global.cloud.enabled": "true", + "global.cloud.resourceId.secretName": resourceSecretName, + "global.cloud.resourceId.secretKey": resourceSecretKey, + + "global.cloud.clientId.secretName": clientIDSecretName, + "global.cloud.clientId.secretKey": clientIDSecretKey, + + "global.cloud.clientSecret.secretName": clientSecretName, + "global.cloud.clientSecret.secretKey": clientSecretKey, + + "global.cloud.apiHost.secretName": apiHostSecretName, + "global.cloud.apiHost.secretKey": apiHostSecretKey, + + "global.cloud.authUrl.secretName": authUrlSecretName, + "global.cloud.authUrl.secretKey": authUrlSecretKey, + + "global.cloud.scadaAddress.secretName": scadaAddressSecretName, + "global.cloud.scadaAddress.secretKey": scadaAddressSecretKey, + "connectInject.default": "true", + + "telemetryCollector.enabled": "true", + "telemetryCollector.image": cfg.ConsulCollectorImage, + "telemetryCollector.cloud.clientId.secretName": clientIDSecretName, + "telemetryCollector.cloud.clientId.secretKey": clientIDSecretKey, + + "telemetryCollector.cloud.clientSecret.secretName": clientSecretName, + "telemetryCollector.cloud.clientSecret.secretKey": clientSecretKey, + + "telemetryCollector.extraEnvironmentVars.HCP_API_TLS": "insecure", + "telemetryCollector.extraEnvironmentVars.HCP_AUTH_TLS": "insecure", + "telemetryCollector.extraEnvironmentVars.HCP_SCADA_TLS": "insecure", + "telemetryCollector.extraEnvironmentVars.OTLP_EXPORTER_TLS": "insecure", + + "server.extraEnvironmentVars.HCP_API_TLS": "insecure", + "server.extraEnvironmentVars.HCP_AUTH_TLS": "insecure", + "server.extraEnvironmentVars.HCP_SCADA_TLS": "insecure", + } + if cfg.ConsulImage != "" { + helmValues["global.image"] = cfg.ConsulImage + } + if c.secure { + helmValues["global.acls.bootstrapToken.secretName"] = bootstrapTokenSecretName + helmValues["global.acls.bootstrapToken.secretKey"] = bootstrapTokenSecretKey + } + + consulCluster := consul.NewHelmCluster(t, helmValues, ctx, cfg, releaseName) + consulCluster.ACLToken = bootstrapToken + consulCluster.Create(t) + + logger.Log(t, "creating static-server deployment") + k8s.DeployKustomize(t, options, cfg.NoCleanupOnFailure, cfg.NoCleanup, cfg.DebugDirectory, "../fixtures/bases/static-server") + t.Log("Finished deployment. Validating expected conditions now") + + // Validate that the consul-telemetry-collector service was deployed to the expected namespace. + consulClient, _ := consulCluster.SetupConsulClient(t, c.secure) + instances, _, err := consulClient.Catalog().Service("consul-telemetry-collector", "", &api.QueryOptions{Namespace: ns}) + require.NoError(t, err) + require.Len(t, instances, 1) + require.Equal(t, "passing", instances[0].Checks.AggregatedStatus()) + + for name, tc := range map[string]struct { + refresh *modifyTelemetryConfigBody + refreshTime int64 + recordsPath string + timeout time.Duration + wait time.Duration + validations *metricValidations + }{ + "collectorExportsMetrics": { + recordsPath: recordsPathCollector, + // High timeout as Collector metrics scraped every 1 minute (https://github.com/hashicorp/consul-telemetry-collector/blob/dfdbf51b91d502a18f3b143a94ab4d50cdff10b8/internal/otel/config/helpers/receivers/prometheus_receiver.go#L54) + timeout: 5 * time.Minute, + wait: 1 * time.Second, + validations: &metricValidations{ + expectedLabelKeys: []string{"service_name", "service_instance_id"}, + expectedMetricName: "otelcol_receiver_accepted_metric_points", + disallowedMetricName: "server.memory_heap_size", + }, + }, + "consulPeriodicRefreshUpdateConfig": { + refresh: &modifyTelemetryConfigBody{ + Filters: []string{"consul.state"}, + Labels: map[string]string{"new_label": "testLabel"}, + }, + recordsPath: recordsPathConsul, + // High timeout as Consul server metrics exported every 1 minute (https://github.com/hashicorp/consul/blob/9776c10efb4472f196b47f88bc0db58b1bfa12ef/agent/hcp/telemetry/otel_sink.go#L27) + timeout: 3 * time.Minute, + wait: 1 * time.Second, + validations: &metricValidations{ + expectedLabelKeys: []string{"node_id", "node_name", "new_label"}, + expectedMetricName: "consul.state.services", + disallowedMetricName: "consul.fsm", + }, + }, + "consulPeriodicRefreshDisabled": { + refresh: &modifyTelemetryConfigBody{ + Filters: []string{"consul.state"}, + Labels: map[string]string{"new_label": "testLabel"}, + Disabled: true, + }, + recordsPath: recordsPathConsul, + // High timeout as Consul server metrics exported every 1 minute (https://github.com/hashicorp/consul/blob/9776c10efb4472f196b47f88bc0db58b1bfa12ef/agent/hcp/telemetry/otel_sink.go#L27) + timeout: 3 * time.Minute, + wait: 1 * time.Second, + validations: &metricValidations{ + disabled: true, + }, + }, + } { + t.Run(name, func(t *testing.T) { + if !c.validateCloudInteractions { + t.Skip("skipping server metric and config validation") + } + + // For a refresh test, we force a telemetry config update before validating metrics using fakeserver's /telemetry_config_modify endpoint. + if tc.refresh != nil { + refreshTime := time.Now() + err := fsClient.modifyTelemetryConfig(tc.refresh) + require.NoError(t, err) + // Add 10 seconds (2 * periodic refresh interval in fakeserver) to allow a periodic refresh from Consul side to take place. + tc.refreshTime = refreshTime.Add(10 * time.Second).UnixNano() + } + + // Validate metrics are correct using fakeserver's /records endpoint to retrieve metric exports that occured from Consul/Collector to fakeserver. + // We use retry as we wait for Consul or the Collector to export metrics. This is the best we can do to avoid flakiness. + retry.RunWith(&retry.Timer{Timeout: tc.timeout, Wait: tc.wait}, t, func(r *retry.R) { + records, err := fsClient.getRecordsForPath(tc.recordsPath, tc.refreshTime) + require.NoError(r, err) + validateMetrics(r, records, tc.validations, tc.refreshTime) + }) + }) + } + }) + } } diff --git a/charts/consul/templates/telemetry-collector-deployment.yaml b/charts/consul/templates/telemetry-collector-deployment.yaml index 270d9ced48..d86c5b4bf5 100644 --- a/charts/consul/templates/telemetry-collector-deployment.yaml +++ b/charts/consul/templates/telemetry-collector-deployment.yaml @@ -35,6 +35,8 @@ spec: # This annotation tells the endpoints controller that this pod was injected even though it wasn't. The # endpoints controller would then sync the endpoint into Consul "consul.hashicorp.com/connect-inject-status": "injected" + # Signals to the endpoints controller that we should force Consul NS creation, since we bypass the mesh webhook. + "consul.hashicorp.com/telemetry-collector": "true" # We aren't using tproxy and we don't have an original pod. This would be simpler if we made a path similar # to gateways "consul.hashicorp.com/connect-service-port": "metricsserver" @@ -94,36 +96,51 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName + - name: CONSUL_NODE_NAME + value: $(NODE_NAME)-virtual + {{- include "consul.consulK8sConsulServerEnvVars" . | nindent 10 }} + # acl login info {{- if .Values.global.acls.manageSystemACLs }} - name: CONSUL_LOGIN_AUTH_METHOD value: {{ template "consul.fullname" . }}-k8s-auth-method + - name: CONSUL_LOGIN_DATACENTER + value: {{ .Values.global.datacenter }} - name: CONSUL_LOGIN_META value: "component=consul-telemetry-collector,pod=$(NAMESPACE)/$(POD_NAME)" {{- end }} - - name: CONSUL_NODE_NAME - value: $(NODE_NAME)-virtual - {{- include "consul.consulK8sConsulServerEnvVars" . | nindent 10 }} + # service and login namespace + # this is attempting to replicate the behavior of webhooks in calculating namespace + # https://github.com/hashicorp/consul-k8s/blob/b84339050bb2c4b62b60cec96275f74952b0ac9d/control-plane/connect-inject/webhook/consul_dataplane_sidecar.go#L200 {{- if .Values.global.enableConsulNamespaces }} + {{- if .Values.connectInject.consulNamespaces.mirroringK8S }} - name: CONSUL_NAMESPACE - value: {{ .Values.syncCatalog.consulNamespaces.consulDestinationNamespace }} - {{- if .Values.syncCatalog.consulNamespaces.mirroringK8S }} + value: {{ .Values.connectInject.consulNamespaces.mirroringK8SPrefix }}{{ .Release.Namespace }} + {{- else }} + - name: CONSUL_NAMESPACE + value: {{ .Values.connectInject.consulNamespaces.consulDestinationNamespace }} + {{- end }} + {{- if .Values.global.acls.manageSystemACLs }} + {{- if .Values.connectInject.consulNamespaces.mirroringK8S }} - name: CONSUL_LOGIN_NAMESPACE - value: "default" + value: default {{- else }} - name: CONSUL_LOGIN_NAMESPACE - value: {{ .Values.syncCatalog.consulNamespaces.consulDestinationNamespace }} + value: {{ .Values.connectInject.consulNamespaces.consulDestinationNamespace }} + {{- end }} {{- end }} {{- end }} command: - /bin/sh - -ec - |- - consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ - -log-level={{ default .Values.global.logLevel .Values.telemetryCollector.logLevel }} \ + consul-k8s-control-plane connect-init \ -log-json={{ .Values.global.logJSON }} \ + -log-level={{ default .Values.global.logLevel .Values.telemetryCollector.logLevel }} \ + -pod-name=${POD_NAME} \ + -pod-namespace=${POD_NAMESPACE} \ + -proxy-id-file="/consul/connect-inject/proxyid" \ -service-account-name="consul-telemetry-collector" \ - -service-name="" \ - -proxy-id-file="/consul/connect-inject/proxyid" + -service-name="" image: {{ .Values.global.imageK8S }} imagePullPolicy: IfNotPresent @@ -304,24 +321,30 @@ spec: - -credential-type=login - -login-bearer-token-path=/var/run/secrets/kubernetes.io/serviceaccount/token - -login-auth-method={{ template "consul.fullname" . }}-k8s-auth-method + {{- end }} + # service and login namespace {{- if .Values.global.enableConsulNamespaces }} - {{- if .Values.syncCatalog.consulNamespaces.mirroringK8S }} - - -login-namespace="default" + {{- if .Values.connectInject.consulNamespaces.mirroringK8S }} + - -service-namespace={{ .Values.connectInject.consulNamespaces.mirroringK8SPrefix }}{{ .Release.Namespace }} {{- else }} - - -login-namespace={{ .Values.syncCatalog.consulNamespaces.consulDestinationNamespace }} - {{- end }} - {{- end }} - {{- if .Values.global.adminPartitions.enabled }} - - foo - - -login-partition={{ .Values.global.adminPartitions.name }} + - -service-namespace={{ .Values.connectInject.consulNamespaces.consulDestinationNamespace }} {{- end }} + {{- if .Values.global.acls.manageSystemACLs }} + {{- if .Values.connectInject.consulNamespaces.mirroringK8S }} + - -login-namespace=default + {{- else }} + - -login-namespace={{ .Values.connectInject.consulNamespaces.consulDestinationNamespace }} + {{- end }} {{- end }} - {{- if .Values.global.enableConsulNamespaces }} - - -service-namespace={{ .Values.syncCatalog.consulNamespaces.consulDestinationNamespace }} {{- end }} + # service and login partition {{- if .Values.global.adminPartitions.enabled }} - -service-partition={{ .Values.global.adminPartitions.name }} + {{- if .Values.global.acls.manageSystemACLs }} + - -login-partition={{ .Values.global.adminPartitions.name }} + {{- end }} {{- end }} + # telemetry {{- if .Values.global.metrics.enabled }} - -telemetry-prom-scrape-path=/metrics {{- end }} diff --git a/charts/consul/test/unit/telemetry-collector-deployment.bats b/charts/consul/test/unit/telemetry-collector-deployment.bats index 45c512ee6c..57d6b84b27 100755 --- a/charts/consul/test/unit/telemetry-collector-deployment.bats +++ b/charts/consul/test/unit/telemetry-collector-deployment.bats @@ -1313,4 +1313,47 @@ MIICFjCCAZsCCQCdwLtdjbzlYzAKBggqhkjOPQQDAjB0MQswCQYDVQQGEwJDQTEL' \ local actual=$(echo "$cmd" | yq 'any(contains("-log-level=debug"))' | tee /dev/stderr) [ "${actual}" = "true" ] -} \ No newline at end of file +} + +#-------------------------------------------------------------------- +# Namespaces + +@test "telemetryCollector/Deployment: namespace flags when mirroringK8S" { + cd `chart_dir` + local object=$(helm template \ + -s templates/telemetry-collector-deployment.yaml \ + --set 'telemetryCollector.enabled=true' \ + --set 'telemetryCollector.image=bar' \ + --set 'global.enableConsulNamespaces=true' \ + --set 'global.acls.manageSystemACLs=true' \ + --set 'connectInject.consulNamespaces.mirroringK8S=true' \ + --namespace 'test-namespace' \ + . | tee /dev/stderr | + yq -r '.spec.template.spec' | tee /dev/stderr) + + local actual=$(echo $object | jq -r '.containers[1].args | any(contains("-login-namespace=default"))' | tee /dev/stderr) + [ "${actual}" = 'true' ] + + local actual=$(echo $object | jq -r '.containers[1].args | any(contains("-service-namespace=test-namespace"))' | tee /dev/stderr) + [ "${actual}" = 'true' ] +} + +@test "telemetryCollector/Deployment: namespace flags when not mirroringK8S" { + cd `chart_dir` + local object=$(helm template \ + -s templates/telemetry-collector-deployment.yaml \ + --set 'telemetryCollector.enabled=true' \ + --set 'telemetryCollector.image=bar' \ + --set 'global.enableConsulNamespaces=true' \ + --set 'global.acls.manageSystemACLs=true' \ + --set 'connectInject.consulNamespaces.mirroringK8S=false' \ + --set 'connectInject.consulNamespaces.consulDestinationNamespace=fakenamespace' \ + . | tee /dev/stderr | + yq -r '.spec.template.spec.containers' | tee /dev/stderr) + + local actual=$(echo $object | jq -r '.[1].args | any(contains("-login-namespace=fakenamespace"))' | tee /dev/stderr) + [ "${actual}" = 'true' ] + + local actual=$(echo $object | jq -r '.[1].args | any(contains("-service-namespace=fakenamespace"))' | tee /dev/stderr) + [ "${actual}" = 'true' ] +} diff --git a/control-plane/connect-inject/constants/annotations_and_labels.go b/control-plane/connect-inject/constants/annotations_and_labels.go index 8767de33b3..686191ca11 100644 --- a/control-plane/connect-inject/constants/annotations_and_labels.go +++ b/control-plane/connect-inject/constants/annotations_and_labels.go @@ -189,6 +189,11 @@ const ( // by the peering controllers. LabelPeeringToken = "consul.hashicorp.com/peering-token" + // LabelTelemetryCollector is a label signaling the pod is associated with the deployment of a Consul Telemetry + // Collector. If this is set, during connect-inject, the endpoints-controller ensures the deployed Namespace exists in Consul and create it if it does not. + // This is only meant to be used by Deployment/consul-telemetry-collector. + LabelTelemetryCollector = "consul.hashicorp.com/telemetry-collector" + // Injected is used as the annotation value for keyInjectStatus and annotationInjected. Injected = "injected" diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go index 9603b6bd3d..03f75278cb 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go @@ -209,6 +209,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu continue } + if isTelemetryCollector(pod) { + if err = r.ensureNamespaceExists(apiClient, pod); err != nil { + r.Log.Error(err, "failed to ensure a namespace exists for Consul Telemetry Collector") + errs = multierror.Append(errs, err) + } + } + if hasBeenInjected(pod) { endpointPods.Add(address.TargetRef.Name) if isConsulDataplaneSupported(pod) { @@ -236,6 +243,7 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu continue } } + if isGateway(pod) { endpointPods.Add(address.TargetRef.Name) if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil { @@ -421,6 +429,7 @@ func (r *Controller) createServiceRegistrations(pod corev1.Pod, serviceEndpoints tags := consulTags(pod) consulNS := r.consulNamespace(pod.Namespace) + service := &api.AgentService{ ID: svcID, Service: svcName, @@ -1310,6 +1319,28 @@ func isGateway(pod corev1.Pod) bool { return ok && anno != "" } +// isTelemetryCollector checks whether a pod is part of a deployment for a Consul Telemetry Collector. If so, +// and this is the first pod deployed to a Namespace, we need to create the Namespace in Consul. Otherwise the +// deployment may fail out during service registration because it is deployed to a Namespace that does not exist. +func isTelemetryCollector(pod corev1.Pod) bool { + anno, ok := pod.Annotations[constants.LabelTelemetryCollector] + return ok && anno != "" +} + +// ensureNamespaceExists creates a Consul namespace for a pod in the event it does not exist. +// At the time of writing, we use this for the Consul Telemetry Collector which may be the first +// pod deployed to a namespace. If it is, it's connect-inject will fail for lack of a namespace. +func (r *Controller) ensureNamespaceExists(apiClient *api.Client, pod corev1.Pod) error { + if r.EnableConsulNamespaces { + consulNS := r.consulNamespace(pod.Namespace) + if _, err := namespaces.EnsureExists(apiClient, consulNS, r.CrossNSACLPolicy); err != nil { + r.Log.Error(err, "failed to ensure Consul namespace exists", "ns", pod.Namespace, "consul ns", consulNS) + return err + } + } + return nil +} + // mapAddresses combines all addresses to a mapping of address to its health status. func mapAddresses(addresses corev1.EndpointSubset) map[corev1.EndpointAddress]string { m := make(map[corev1.EndpointAddress]string)