From 7fdd48d577b756fa7dccecb64e0b105391c14875 Mon Sep 17 00:00:00 2001 From: Enrico Candino Date: Fri, 27 Dec 2024 11:41:40 +0100 Subject: [PATCH] Implementation of `GetStatsSummary` and `GetMetricsResource` for Virtual Kubelet (#163) * implemented GetStatsSummary and GetMetricsResource for Virtual Kubelet * fixed ClusterRole for node proxy * limit the clusterrole with get and list * remove unused Metrics client interface --- charts/k3k/templates/rbac.yaml | 25 ++- go.mod | 4 +- k3k-kubelet/kubelet.go | 4 +- .../collectors/kubelet_resource_metrics.go | 196 ++++++++++++++++++ k3k-kubelet/provider/provider.go | 101 ++++++++- pkg/controller/cluster/agent/shared.go | 40 ++-- pkg/controller/cluster/cluster.go | 68 +++++- 7 files changed, 407 insertions(+), 31 deletions(-) create mode 100644 k3k-kubelet/provider/collectors/kubelet_resource_metrics.go diff --git a/charts/k3k/templates/rbac.yaml b/charts/k3k/templates/rbac.yaml index df583af..6dcca90 100644 --- a/charts/k3k/templates/rbac.yaml +++ b/charts/k3k/templates/rbac.yaml @@ -11,4 +11,27 @@ roleRef: subjects: - kind: ServiceAccount name: {{ include "k3k.serviceAccountName" . }} - namespace: {{ .Values.namespace }} \ No newline at end of file + namespace: {{ .Values.namespace }} +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ include "k3k.fullname" . }}-node-proxy +rules: +- apiGroups: + - "" + resources: + - "nodes" + - "nodes/proxy" + verbs: + - "get" + - "list" +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: {{ include "k3k.fullname" . }}-node-proxy +roleRef: + kind: ClusterRole + name: {{ include "k3k.fullname" . }}-node-proxy + apiGroup: rbac.authorization.k8s.io diff --git a/go.mod b/go.mod index 6871a58..95ec518 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/go-logr/zapr v1.3.0 github.com/onsi/ginkgo/v2 v2.20.1 github.com/onsi/gomega v1.36.0 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.6.1 github.com/rancher/dynamiclistener v1.27.5 github.com/sirupsen/logrus v1.9.3 @@ -29,6 +30,7 @@ require ( k8s.io/apimachinery v0.29.11 k8s.io/apiserver v0.29.11 k8s.io/client-go v0.29.11 + k8s.io/component-base v0.29.11 k8s.io/metrics v0.29.11 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 sigs.k8s.io/controller-runtime v0.17.5 @@ -80,7 +82,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect @@ -119,7 +120,6 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.29.2 // indirect - k8s.io/component-base v0.29.11 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kms v0.31.0 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect diff --git a/k3k-kubelet/kubelet.go b/k3k-kubelet/kubelet.go index f3dbac2..a5b0c48 100644 --- a/k3k-kubelet/kubelet.go +++ b/k3k-kubelet/kubelet.go @@ -214,10 +214,10 @@ func (k *kubelet) newProviderFunc(namespace, name, hostname, agentIP, serverIP, if err != nil { return nil, nil, errors.New("unable to make nodeutil provider: " + err.Error()) } - nodeProvider := provider.Node{} provider.ConfigureNode(pc.Node, hostname, k.port, agentIP) - return utilProvider, &nodeProvider, nil + + return utilProvider, &provider.Node{}, nil } } diff --git a/k3k-kubelet/provider/collectors/kubelet_resource_metrics.go b/k3k-kubelet/provider/collectors/kubelet_resource_metrics.go new file mode 100644 index 0000000..e67822f --- /dev/null +++ b/k3k-kubelet/provider/collectors/kubelet_resource_metrics.go @@ -0,0 +1,196 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the Apache 2.0 license. + +See https://github.com/virtual-kubelet/azure-aci/tree/master/pkg/metrics/collectors +*/ + +package collectors + +import ( + "time" + + stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + compbasemetrics "k8s.io/component-base/metrics" +) + +// defining metrics +var ( + nodeCPUUsageDesc = compbasemetrics.NewDesc("node_cpu_usage_seconds_total", + "Cumulative cpu time consumed by the node in core-seconds", + nil, + nil, + compbasemetrics.ALPHA, + "") + + nodeMemoryUsageDesc = compbasemetrics.NewDesc("node_memory_working_set_bytes", + "Current working set of the node in bytes", + nil, + nil, + compbasemetrics.ALPHA, + "") + + containerCPUUsageDesc = compbasemetrics.NewDesc("container_cpu_usage_seconds_total", + "Cumulative cpu time consumed by the container in core-seconds", + []string{"container", "pod", "namespace"}, + nil, + compbasemetrics.ALPHA, + "") + + containerMemoryUsageDesc = compbasemetrics.NewDesc("container_memory_working_set_bytes", + "Current working set of the container in bytes", + []string{"container", "pod", "namespace"}, + nil, + compbasemetrics.ALPHA, + "") + + podCPUUsageDesc = compbasemetrics.NewDesc("pod_cpu_usage_seconds_total", + "Cumulative cpu time consumed by the pod in core-seconds", + []string{"pod", "namespace"}, + nil, + compbasemetrics.ALPHA, + "") + + podMemoryUsageDesc = compbasemetrics.NewDesc("pod_memory_working_set_bytes", + "Current working set of the pod in bytes", + []string{"pod", "namespace"}, + nil, + compbasemetrics.ALPHA, + "") + + resourceScrapeResultDesc = compbasemetrics.NewDesc("scrape_error", + "1 if there was an error while getting container metrics, 0 otherwise", + nil, + nil, + compbasemetrics.ALPHA, + "") + + containerStartTimeDesc = compbasemetrics.NewDesc("container_start_time_seconds", + "Start time of the container since unix epoch in seconds", + []string{"container", "pod", "namespace"}, + nil, + compbasemetrics.ALPHA, + "") +) + +// NewResourceMetricsCollector returns a metrics.StableCollector which exports resource metrics +func NewKubeletResourceMetricsCollector(podStats *stats.Summary) compbasemetrics.StableCollector { + return &resourceMetricsCollector{ + providerPodStats: podStats, + } +} + +type resourceMetricsCollector struct { + compbasemetrics.BaseStableCollector + + providerPodStats *stats.Summary +} + +// Check if resourceMetricsCollector implements necessary interface +var _ compbasemetrics.StableCollector = &resourceMetricsCollector{} + +// DescribeWithStability implements compbasemetrics.StableCollector +func (rc *resourceMetricsCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) { + ch <- nodeCPUUsageDesc + ch <- nodeMemoryUsageDesc + ch <- containerStartTimeDesc + ch <- containerCPUUsageDesc + ch <- containerMemoryUsageDesc + ch <- podCPUUsageDesc + ch <- podMemoryUsageDesc + ch <- resourceScrapeResultDesc +} + +// CollectWithStability implements compbasemetrics.StableCollector +// Since new containers are frequently created and removed, using the Gauge would +// leak metric collectors for containers or pods that no longer exist. Instead, implement +// custom collector in a way that only collects metrics for active containers. +func (rc *resourceMetricsCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) { + var errorCount float64 + defer func() { + ch <- compbasemetrics.NewLazyConstMetric(resourceScrapeResultDesc, compbasemetrics.GaugeValue, errorCount) + }() + + statsSummary := *rc.providerPodStats + rc.collectNodeCPUMetrics(ch, statsSummary.Node) + rc.collectNodeMemoryMetrics(ch, statsSummary.Node) + + for _, pod := range statsSummary.Pods { + for _, container := range pod.Containers { + rc.collectContainerStartTime(ch, pod, container) + rc.collectContainerCPUMetrics(ch, pod, container) + rc.collectContainerMemoryMetrics(ch, pod, container) + } + rc.collectPodCPUMetrics(ch, pod) + rc.collectPodMemoryMetrics(ch, pod) + } +} + +// implement collector methods and validate that correct data is used + +func (rc *resourceMetricsCollector) collectNodeCPUMetrics(ch chan<- compbasemetrics.Metric, s stats.NodeStats) { + if s.CPU == nil || s.CPU.UsageCoreNanoSeconds == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time, + compbasemetrics.NewLazyConstMetric(nodeCPUUsageDesc, compbasemetrics.CounterValue, float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second))) +} + +func (rc *resourceMetricsCollector) collectNodeMemoryMetrics(ch chan<- compbasemetrics.Metric, s stats.NodeStats) { + if s.Memory == nil || s.Memory.WorkingSetBytes == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time, + compbasemetrics.NewLazyConstMetric(nodeMemoryUsageDesc, compbasemetrics.GaugeValue, float64(*s.Memory.WorkingSetBytes))) +} + +func (rc *resourceMetricsCollector) collectContainerStartTime(ch chan<- compbasemetrics.Metric, pod stats.PodStats, s stats.ContainerStats) { + if s.StartTime.Unix() <= 0 { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.StartTime.Time, + compbasemetrics.NewLazyConstMetric(containerStartTimeDesc, compbasemetrics.GaugeValue, float64(s.StartTime.UnixNano())/float64(time.Second), s.Name, pod.PodRef.Name, pod.PodRef.Namespace)) +} + +func (rc *resourceMetricsCollector) collectContainerCPUMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats, s stats.ContainerStats) { + if s.CPU == nil || s.CPU.UsageCoreNanoSeconds == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time, + compbasemetrics.NewLazyConstMetric(containerCPUUsageDesc, compbasemetrics.CounterValue, + float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second), s.Name, pod.PodRef.Name, pod.PodRef.Namespace)) +} + +func (rc *resourceMetricsCollector) collectContainerMemoryMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats, s stats.ContainerStats) { + if s.Memory == nil || s.Memory.WorkingSetBytes == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time, + compbasemetrics.NewLazyConstMetric(containerMemoryUsageDesc, compbasemetrics.GaugeValue, + float64(*s.Memory.WorkingSetBytes), s.Name, pod.PodRef.Name, pod.PodRef.Namespace)) +} + +func (rc *resourceMetricsCollector) collectPodCPUMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats) { + if pod.CPU == nil || pod.CPU.UsageCoreNanoSeconds == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(pod.CPU.Time.Time, + compbasemetrics.NewLazyConstMetric(podCPUUsageDesc, compbasemetrics.CounterValue, + float64(*pod.CPU.UsageCoreNanoSeconds)/float64(time.Second), pod.PodRef.Name, pod.PodRef.Namespace)) +} + +func (rc *resourceMetricsCollector) collectPodMemoryMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats) { + if pod.Memory == nil || pod.Memory.WorkingSetBytes == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(pod.Memory.Time.Time, + compbasemetrics.NewLazyConstMetric(podMemoryUsageDesc, compbasemetrics.GaugeValue, + float64(*pod.Memory.WorkingSetBytes), pod.PodRef.Name, pod.PodRef.Namespace)) +} diff --git a/k3k-kubelet/provider/provider.go b/k3k-kubelet/provider/provider.go index 73ec7e2..982afc1 100644 --- a/k3k-kubelet/provider/provider.go +++ b/k3k-kubelet/provider/provider.go @@ -2,20 +2,24 @@ package provider import ( "context" + "encoding/json" "fmt" "io" "net/http" "strconv" "strings" + "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" "github.com/rancher/k3k/k3k-kubelet/controller" + "github.com/rancher/k3k/k3k-kubelet/provider/collectors" "github.com/rancher/k3k/k3k-kubelet/translate" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" k3klog "github.com/rancher/k3k/pkg/log" "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -24,11 +28,12 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/scheme" cv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/transport/spdy" - metricset "k8s.io/metrics/pkg/client/clientset/versioned" + compbasemetrics "k8s.io/component-base/metrics" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -42,7 +47,6 @@ type Provider struct { VirtualClient client.Client ClientConfig rest.Config CoreClient cv1.CoreV1Interface - MetricsClient metricset.Interface ClusterNamespace string ClusterName string serverIP string @@ -55,10 +59,12 @@ func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger *k3 if err != nil { return nil, err } + translater := translate.ToHostTranslater{ ClusterName: name, ClusterNamespace: namespace, } + p := Provider{ Handler: controller.ControllerHandler{ Mgr: virtualMgr, @@ -177,13 +183,96 @@ func (p *Provider) AttachToContainer(ctx context.Context, namespace, podName, co } // GetStatsSummary gets the stats for the node, including running pods -func (p *Provider) GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) { - return nil, fmt.Errorf("not implemented") +func (p *Provider) GetStatsSummary(ctx context.Context) (*statsv1alpha1.Summary, error) { + p.logger.Debug("GetStatsSummary") + + nodeList := &v1.NodeList{} + if err := p.CoreClient.RESTClient().Get().Resource("nodes").Do(ctx).Into(nodeList); err != nil { + return nil, fmt.Errorf("unable to get nodes of cluster %s in namespace %s: %w", p.ClusterName, p.ClusterNamespace, err) + } + + // fetch the stats from all the nodes + var nodeStats statsv1alpha1.NodeStats + var allPodsStats []statsv1alpha1.PodStats + + for _, n := range nodeList.Items { + res, err := p.CoreClient.RESTClient(). + Get(). + Resource("nodes"). + Name(n.Name). + SubResource("proxy"). + Suffix("stats/summary"). + DoRaw(ctx) + if err != nil { + return nil, fmt.Errorf( + "unable to get stats of node '%s', from cluster %s in namespace %s: %w", + n.Name, p.ClusterName, p.ClusterNamespace, err, + ) + } + + stats := &statsv1alpha1.Summary{} + if err := json.Unmarshal(res, stats); err != nil { + return nil, err + } + + // TODO: we should probably calculate somehow the node stats from the different nodes of the host + // or reflect different nodes from the virtual kubelet. + // For the moment let's just pick one random node stats. + nodeStats = stats.Node + allPodsStats = append(allPodsStats, stats.Pods...) + } + + pods, err := p.GetPods(ctx) + if err != nil { + return nil, err + } + + podsNameMap := make(map[string]*v1.Pod) + for _, pod := range pods { + hostPodName := p.Translater.TranslateName(pod.Namespace, pod.Name) + podsNameMap[hostPodName] = pod + } + + filteredStats := &statsv1alpha1.Summary{ + Node: nodeStats, + Pods: make([]statsv1alpha1.PodStats, 0), + } + + for _, podStat := range allPodsStats { + // skip pods that are not in the cluster namespace + if podStat.PodRef.Namespace != p.ClusterNamespace { + continue + } + + // rewrite the PodReference to match the data of the virtual cluster + if pod, found := podsNameMap[podStat.PodRef.Name]; found { + podStat.PodRef = statsv1alpha1.PodReference{ + Name: pod.Name, + Namespace: pod.Namespace, + UID: string(pod.UID), + } + filteredStats.Pods = append(filteredStats.Pods, podStat) + } + } + + return filteredStats, nil } // GetMetricsResource gets the metrics for the node, including running pods -func (p *Provider) GetMetricsResource(context.Context) ([]*dto.MetricFamily, error) { - return nil, fmt.Errorf("not implemented") +func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) { + statsSummary, err := p.GetStatsSummary(ctx) + if err != nil { + return nil, errors.Wrapf(err, "error fetching MetricsResource") + } + + registry := compbasemetrics.NewKubeRegistry() + registry.CustomMustRegister(collectors.NewKubeletResourceMetricsCollector(statsSummary)) + + metricFamily, err := registry.Gather() + if err != nil { + return nil, errors.Wrapf(err, "error gathering metrics from collector") + } + return metricFamily, nil } // PortForward forwards a local port to a port on the pod diff --git a/pkg/controller/cluster/agent/shared.go b/pkg/controller/cluster/agent/shared.go index 138e254..366dc64 100644 --- a/pkg/controller/cluster/agent/shared.go +++ b/pkg/controller/cluster/agent/shared.go @@ -16,7 +16,7 @@ import ( const ( sharedKubeletConfigPath = "/opt/rancher/k3k/config.yaml" - sharedNodeAgentName = "kubelet" + SharedNodeAgentName = "kubelet" SharedNodeMode = "shared" ) @@ -65,43 +65,50 @@ token: %s`, } func (s *SharedAgent) Resources() []ctrlruntimeclient.Object { - return []ctrlruntimeclient.Object{s.serviceAccount(), s.role(), s.roleBinding(), s.service(), s.deployment(), s.dnsService()} + return []ctrlruntimeclient.Object{ + s.serviceAccount(), + s.role(), + s.roleBinding(), + s.service(), + s.deployment(), + s.dnsService(), + } } func (s *SharedAgent) deployment() *apps.Deployment { - selector := metav1.LabelSelector{ + selector := &metav1.LabelSelector{ MatchLabels: map[string]string{ "cluster": s.cluster.Name, "type": "agent", "mode": "shared", }, } - name := s.Name() + return &apps.Deployment{ TypeMeta: metav1.TypeMeta{ Kind: "Deployment", APIVersion: "apps/v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: s.Name(), Namespace: s.cluster.Namespace, Labels: selector.MatchLabels, }, Spec: apps.DeploymentSpec{ - Selector: &selector, + Selector: selector, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: selector.MatchLabels, }, - Spec: s.podSpec(s.sharedAgentImage, name, &selector), + Spec: s.podSpec(selector), }, }, } } -func (s *SharedAgent) podSpec(image, name string, affinitySelector *metav1.LabelSelector) v1.PodSpec { - args := []string{"--config", sharedKubeletConfigPath} +func (s *SharedAgent) podSpec(affinitySelector *metav1.LabelSelector) v1.PodSpec { var limit v1.ResourceList + return v1.PodSpec{ Affinity: &v1.Affinity{ PodAntiAffinity: &v1.PodAntiAffinity{ @@ -132,13 +139,16 @@ func (s *SharedAgent) podSpec(image, name string, affinitySelector *metav1.Label }, Containers: []v1.Container{ { - Name: name, - Image: image, + Name: s.Name(), + Image: s.sharedAgentImage, ImagePullPolicy: v1.PullAlways, Resources: v1.ResourceRequirements{ Limits: limit, }, - Args: args, + Args: []string{ + "--config", + sharedKubeletConfigPath, + }, VolumeMounts: []v1.VolumeMount{ { Name: "config", @@ -243,14 +253,14 @@ func (s *SharedAgent) role() *rbacv1.Role { }, Rules: []rbacv1.PolicyRule{ { - Verbs: []string{"*"}, APIGroups: []string{""}, Resources: []string{"pods", "pods/log", "pods/exec", "secrets", "configmaps", "services"}, + Verbs: []string{"*"}, }, { - Verbs: []string{"get", "watch", "list"}, APIGroups: []string{"k3k.io"}, Resources: []string{"clusters"}, + Verbs: []string{"get", "watch", "list"}, }, }, } @@ -282,7 +292,7 @@ func (s *SharedAgent) roleBinding() *rbacv1.RoleBinding { } func (s *SharedAgent) Name() string { - return controller.SafeConcatNameWithPrefix(s.cluster.Name, sharedNodeAgentName) + return controller.SafeConcatNameWithPrefix(s.cluster.Name, SharedNodeAgentName) } func (s *SharedAgent) DNSName() string { diff --git a/pkg/controller/cluster/cluster.go b/pkg/controller/cluster/cluster.go index 6c6f1e3..1d3acbc 100644 --- a/pkg/controller/cluster/cluster.go +++ b/pkg/controller/cluster/cluster.go @@ -8,19 +8,21 @@ import ( "time" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + "github.com/rancher/k3k/pkg/controller" "github.com/rancher/k3k/pkg/controller/cluster/agent" "github.com/rancher/k3k/pkg/controller/cluster/server" "github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap" "github.com/rancher/k3k/pkg/log" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" + ctrlruntimecontroller "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -59,7 +61,7 @@ func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage string, logg } return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.Cluster{}). - WithOptions(controller.Options{ + WithOptions(ctrlruntimecontroller.Options{ MaxConcurrentReconciles: maxConcurrentReconciles, }). Complete(&reconciler) @@ -100,6 +102,11 @@ func (c *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request } } } + + if err := c.unbindNodeProxyClusterRole(ctx, &cluster); err != nil { + return reconcile.Result{}, err + } + if controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) { // remove finalizer from the cluster and update it. controllerutil.RemoveFinalizer(&cluster, clusterFinalizerName) @@ -130,9 +137,6 @@ func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1 cluster.Status.Persistence.StorageRequestSize = defaultStoragePersistentSize } } - if err := c.Client.Update(ctx, cluster); err != nil { - return err - } cluster.Status.ClusterCIDR = cluster.Spec.ClusterCIDR if cluster.Status.ClusterCIDR == "" { @@ -189,6 +193,10 @@ func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1 } } + if err := c.bindNodeProxyClusterRole(ctx, cluster); err != nil { + return err + } + return c.Client.Update(ctx, cluster) } @@ -279,6 +287,56 @@ func (c *ClusterReconciler) server(ctx context.Context, cluster *v1alpha1.Cluste return nil } +func (c *ClusterReconciler) bindNodeProxyClusterRole(ctx context.Context, cluster *v1alpha1.Cluster) error { + clusterRoleBinding := &rbacv1.ClusterRoleBinding{} + if err := c.Client.Get(ctx, types.NamespacedName{Name: "k3k-node-proxy"}, clusterRoleBinding); err != nil { + return fmt.Errorf("failed to get or find k3k-node-proxy ClusterRoleBinding: %w", err) + } + + subjectName := controller.SafeConcatNameWithPrefix(cluster.Name, agent.SharedNodeAgentName) + + found := false + for _, subject := range clusterRoleBinding.Subjects { + if subject.Name == subjectName && subject.Namespace == cluster.Namespace { + found = true + } + } + + if !found { + clusterRoleBinding.Subjects = append(clusterRoleBinding.Subjects, rbacv1.Subject{ + Kind: "ServiceAccount", + Name: subjectName, + Namespace: cluster.Namespace, + }) + } + + return c.Client.Update(ctx, clusterRoleBinding) +} + +func (c *ClusterReconciler) unbindNodeProxyClusterRole(ctx context.Context, cluster *v1alpha1.Cluster) error { + clusterRoleBinding := &rbacv1.ClusterRoleBinding{} + if err := c.Client.Get(ctx, types.NamespacedName{Name: "k3k-node-proxy"}, clusterRoleBinding); err != nil { + return fmt.Errorf("failed to get or find k3k-node-proxy ClusterRoleBinding: %w", err) + } + + subjectName := controller.SafeConcatNameWithPrefix(cluster.Name, agent.SharedNodeAgentName) + + var cleanedSubjects []rbacv1.Subject + for _, subject := range clusterRoleBinding.Subjects { + if subject.Name != subjectName || subject.Namespace != cluster.Namespace { + cleanedSubjects = append(cleanedSubjects, subject) + } + } + + // if no subject was removed, all good + if reflect.DeepEqual(clusterRoleBinding.Subjects, cleanedSubjects) { + return nil + } + + clusterRoleBinding.Subjects = cleanedSubjects + return c.Client.Update(ctx, clusterRoleBinding) +} + func (c *ClusterReconciler) agent(ctx context.Context, cluster *v1alpha1.Cluster, serviceIP, token string) error { agent := agent.New(cluster, serviceIP, c.SharedAgentImage, token) agentsConfig, err := agent.Config()