From c3ed2f608cfa98850333e8986450aad7b1d6655a Mon Sep 17 00:00:00 2001 From: Enrico Candino Date: Thu, 19 Dec 2024 18:45:10 +0100 Subject: [PATCH] implemented GetStatsSummary and GetMetricsResource for Virtual Kubelet --- k3k-kubelet/kubelet.go | 4 +- .../collectors/kubelet_resource_metrics.go | 196 ++++++++++++++++++ k3k-kubelet/provider/provider.go | 101 ++++++++- pkg/controller/cluster/agent/shared.go | 85 ++++++-- 4 files changed, 366 insertions(+), 20 deletions(-) create mode 100644 k3k-kubelet/provider/collectors/kubelet_resource_metrics.go diff --git a/k3k-kubelet/kubelet.go b/k3k-kubelet/kubelet.go index f3dbac2..a5b0c48 100644 --- a/k3k-kubelet/kubelet.go +++ b/k3k-kubelet/kubelet.go @@ -214,10 +214,10 @@ func (k *kubelet) newProviderFunc(namespace, name, hostname, agentIP, serverIP, if err != nil { return nil, nil, errors.New("unable to make nodeutil provider: " + err.Error()) } - nodeProvider := provider.Node{} provider.ConfigureNode(pc.Node, hostname, k.port, agentIP) - return utilProvider, &nodeProvider, nil + + return utilProvider, &provider.Node{}, nil } } diff --git a/k3k-kubelet/provider/collectors/kubelet_resource_metrics.go b/k3k-kubelet/provider/collectors/kubelet_resource_metrics.go new file mode 100644 index 0000000..e67822f --- /dev/null +++ b/k3k-kubelet/provider/collectors/kubelet_resource_metrics.go @@ -0,0 +1,196 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the Apache 2.0 license. + +See https://github.com/virtual-kubelet/azure-aci/tree/master/pkg/metrics/collectors +*/ + +package collectors + +import ( + "time" + + stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" + compbasemetrics "k8s.io/component-base/metrics" +) + +// defining metrics +var ( + nodeCPUUsageDesc = compbasemetrics.NewDesc("node_cpu_usage_seconds_total", + "Cumulative cpu time consumed by the node in core-seconds", + nil, + nil, + compbasemetrics.ALPHA, + "") + + nodeMemoryUsageDesc = compbasemetrics.NewDesc("node_memory_working_set_bytes", + "Current working set of the node in bytes", + nil, + nil, + compbasemetrics.ALPHA, + "") + + containerCPUUsageDesc = compbasemetrics.NewDesc("container_cpu_usage_seconds_total", + "Cumulative cpu time consumed by the container in core-seconds", + []string{"container", "pod", "namespace"}, + nil, + compbasemetrics.ALPHA, + "") + + containerMemoryUsageDesc = compbasemetrics.NewDesc("container_memory_working_set_bytes", + "Current working set of the container in bytes", + []string{"container", "pod", "namespace"}, + nil, + compbasemetrics.ALPHA, + "") + + podCPUUsageDesc = compbasemetrics.NewDesc("pod_cpu_usage_seconds_total", + "Cumulative cpu time consumed by the pod in core-seconds", + []string{"pod", "namespace"}, + nil, + compbasemetrics.ALPHA, + "") + + podMemoryUsageDesc = compbasemetrics.NewDesc("pod_memory_working_set_bytes", + "Current working set of the pod in bytes", + []string{"pod", "namespace"}, + nil, + compbasemetrics.ALPHA, + "") + + resourceScrapeResultDesc = compbasemetrics.NewDesc("scrape_error", + "1 if there was an error while getting container metrics, 0 otherwise", + nil, + nil, + compbasemetrics.ALPHA, + "") + + containerStartTimeDesc = compbasemetrics.NewDesc("container_start_time_seconds", + "Start time of the container since unix epoch in seconds", + []string{"container", "pod", "namespace"}, + nil, + compbasemetrics.ALPHA, + "") +) + +// NewResourceMetricsCollector returns a metrics.StableCollector which exports resource metrics +func NewKubeletResourceMetricsCollector(podStats *stats.Summary) compbasemetrics.StableCollector { + return &resourceMetricsCollector{ + providerPodStats: podStats, + } +} + +type resourceMetricsCollector struct { + compbasemetrics.BaseStableCollector + + providerPodStats *stats.Summary +} + +// Check if resourceMetricsCollector implements necessary interface +var _ compbasemetrics.StableCollector = &resourceMetricsCollector{} + +// DescribeWithStability implements compbasemetrics.StableCollector +func (rc *resourceMetricsCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) { + ch <- nodeCPUUsageDesc + ch <- nodeMemoryUsageDesc + ch <- containerStartTimeDesc + ch <- containerCPUUsageDesc + ch <- containerMemoryUsageDesc + ch <- podCPUUsageDesc + ch <- podMemoryUsageDesc + ch <- resourceScrapeResultDesc +} + +// CollectWithStability implements compbasemetrics.StableCollector +// Since new containers are frequently created and removed, using the Gauge would +// leak metric collectors for containers or pods that no longer exist. Instead, implement +// custom collector in a way that only collects metrics for active containers. +func (rc *resourceMetricsCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) { + var errorCount float64 + defer func() { + ch <- compbasemetrics.NewLazyConstMetric(resourceScrapeResultDesc, compbasemetrics.GaugeValue, errorCount) + }() + + statsSummary := *rc.providerPodStats + rc.collectNodeCPUMetrics(ch, statsSummary.Node) + rc.collectNodeMemoryMetrics(ch, statsSummary.Node) + + for _, pod := range statsSummary.Pods { + for _, container := range pod.Containers { + rc.collectContainerStartTime(ch, pod, container) + rc.collectContainerCPUMetrics(ch, pod, container) + rc.collectContainerMemoryMetrics(ch, pod, container) + } + rc.collectPodCPUMetrics(ch, pod) + rc.collectPodMemoryMetrics(ch, pod) + } +} + +// implement collector methods and validate that correct data is used + +func (rc *resourceMetricsCollector) collectNodeCPUMetrics(ch chan<- compbasemetrics.Metric, s stats.NodeStats) { + if s.CPU == nil || s.CPU.UsageCoreNanoSeconds == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time, + compbasemetrics.NewLazyConstMetric(nodeCPUUsageDesc, compbasemetrics.CounterValue, float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second))) +} + +func (rc *resourceMetricsCollector) collectNodeMemoryMetrics(ch chan<- compbasemetrics.Metric, s stats.NodeStats) { + if s.Memory == nil || s.Memory.WorkingSetBytes == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time, + compbasemetrics.NewLazyConstMetric(nodeMemoryUsageDesc, compbasemetrics.GaugeValue, float64(*s.Memory.WorkingSetBytes))) +} + +func (rc *resourceMetricsCollector) collectContainerStartTime(ch chan<- compbasemetrics.Metric, pod stats.PodStats, s stats.ContainerStats) { + if s.StartTime.Unix() <= 0 { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.StartTime.Time, + compbasemetrics.NewLazyConstMetric(containerStartTimeDesc, compbasemetrics.GaugeValue, float64(s.StartTime.UnixNano())/float64(time.Second), s.Name, pod.PodRef.Name, pod.PodRef.Namespace)) +} + +func (rc *resourceMetricsCollector) collectContainerCPUMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats, s stats.ContainerStats) { + if s.CPU == nil || s.CPU.UsageCoreNanoSeconds == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time, + compbasemetrics.NewLazyConstMetric(containerCPUUsageDesc, compbasemetrics.CounterValue, + float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second), s.Name, pod.PodRef.Name, pod.PodRef.Namespace)) +} + +func (rc *resourceMetricsCollector) collectContainerMemoryMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats, s stats.ContainerStats) { + if s.Memory == nil || s.Memory.WorkingSetBytes == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time, + compbasemetrics.NewLazyConstMetric(containerMemoryUsageDesc, compbasemetrics.GaugeValue, + float64(*s.Memory.WorkingSetBytes), s.Name, pod.PodRef.Name, pod.PodRef.Namespace)) +} + +func (rc *resourceMetricsCollector) collectPodCPUMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats) { + if pod.CPU == nil || pod.CPU.UsageCoreNanoSeconds == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(pod.CPU.Time.Time, + compbasemetrics.NewLazyConstMetric(podCPUUsageDesc, compbasemetrics.CounterValue, + float64(*pod.CPU.UsageCoreNanoSeconds)/float64(time.Second), pod.PodRef.Name, pod.PodRef.Namespace)) +} + +func (rc *resourceMetricsCollector) collectPodMemoryMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats) { + if pod.Memory == nil || pod.Memory.WorkingSetBytes == nil { + return + } + + ch <- compbasemetrics.NewLazyMetricWithTimestamp(pod.Memory.Time.Time, + compbasemetrics.NewLazyConstMetric(podMemoryUsageDesc, compbasemetrics.GaugeValue, + float64(*pod.Memory.WorkingSetBytes), pod.PodRef.Name, pod.PodRef.Namespace)) +} diff --git a/k3k-kubelet/provider/provider.go b/k3k-kubelet/provider/provider.go index 73ec7e2..5061bd0 100644 --- a/k3k-kubelet/provider/provider.go +++ b/k3k-kubelet/provider/provider.go @@ -2,20 +2,24 @@ package provider import ( "context" + "encoding/json" "fmt" "io" "net/http" "strconv" "strings" + "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" "github.com/rancher/k3k/k3k-kubelet/controller" + "github.com/rancher/k3k/k3k-kubelet/provider/collectors" "github.com/rancher/k3k/k3k-kubelet/translate" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" k3klog "github.com/rancher/k3k/pkg/log" "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -24,10 +28,12 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/scheme" cv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/transport/spdy" + compbasemetrics "k8s.io/component-base/metrics" metricset "k8s.io/metrics/pkg/client/clientset/versioned" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -42,7 +48,7 @@ type Provider struct { VirtualClient client.Client ClientConfig rest.Config CoreClient cv1.CoreV1Interface - MetricsClient metricset.Interface + MetricsClient metricset.Interface // TODO: do we need this? ClusterNamespace string ClusterName string serverIP string @@ -55,10 +61,12 @@ func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger *k3 if err != nil { return nil, err } + translater := translate.ToHostTranslater{ ClusterName: name, ClusterNamespace: namespace, } + p := Provider{ Handler: controller.ControllerHandler{ Mgr: virtualMgr, @@ -177,13 +185,96 @@ func (p *Provider) AttachToContainer(ctx context.Context, namespace, podName, co } // GetStatsSummary gets the stats for the node, including running pods -func (p *Provider) GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) { - return nil, fmt.Errorf("not implemented") +func (p *Provider) GetStatsSummary(ctx context.Context) (*statsv1alpha1.Summary, error) { + p.logger.Debug("GetStatsSummary") + + nodeList := &v1.NodeList{} + if err := p.CoreClient.RESTClient().Get().Resource("nodes").Do(ctx).Into(nodeList); err != nil { + return nil, fmt.Errorf("unable to get nodes of cluster %s in namespace %s: %w", p.ClusterName, p.ClusterNamespace, err) + } + + // fetch the stats from all the nodes + var nodeStats statsv1alpha1.NodeStats + var allPodsStats []statsv1alpha1.PodStats + + for _, n := range nodeList.Items { + res, err := p.CoreClient.RESTClient(). + Get(). + Resource("nodes"). + Name(n.Name). + SubResource("proxy"). + Suffix("stats/summary"). + DoRaw(ctx) + if err != nil { + return nil, fmt.Errorf( + "unable to get stats of node '%s', from cluster %s in namespace %s: %w", + n.Name, p.ClusterName, p.ClusterNamespace, err, + ) + } + + stats := &statsv1alpha1.Summary{} + if err := json.Unmarshal(res, stats); err != nil { + return nil, err + } + + // TODO: we should probably calculate somehow the node stats from the different nodes of the host + // or reflect different nodes from the virtual kubelet. + // For the moment let's just pick one random node stats. + nodeStats = stats.Node + allPodsStats = append(allPodsStats, stats.Pods...) + } + + pods, err := p.GetPods(ctx) + if err != nil { + return nil, err + } + + podsNameMap := make(map[string]*v1.Pod) + for _, pod := range pods { + hostPodName := p.Translater.TranslateName(pod.Namespace, pod.Name) + podsNameMap[hostPodName] = pod + } + + filteredStats := &statsv1alpha1.Summary{ + Node: nodeStats, + Pods: make([]statsv1alpha1.PodStats, 0), + } + + for _, podStat := range allPodsStats { + // skip pods that are not in the cluster namespace + if podStat.PodRef.Namespace != p.ClusterNamespace { + continue + } + + // rewrite the PodReference to match the data of the virtual cluster + if pod, found := podsNameMap[podStat.PodRef.Name]; found { + podStat.PodRef = statsv1alpha1.PodReference{ + Name: pod.Name, + Namespace: pod.Namespace, + UID: string(pod.UID), + } + filteredStats.Pods = append(filteredStats.Pods, podStat) + } + } + + return filteredStats, nil } // GetMetricsResource gets the metrics for the node, including running pods -func (p *Provider) GetMetricsResource(context.Context) ([]*dto.MetricFamily, error) { - return nil, fmt.Errorf("not implemented") +func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) { + statsSummary, err := p.GetStatsSummary(ctx) + if err != nil { + return nil, errors.Wrapf(err, "error fetching MetricsResource") + } + + registry := compbasemetrics.NewKubeRegistry() + registry.CustomMustRegister(collectors.NewKubeletResourceMetricsCollector(statsSummary)) + + metricFamily, err := registry.Gather() + if err != nil { + return nil, errors.Wrapf(err, "error gathering metrics from collector") + } + return metricFamily, nil } // PortForward forwards a local port to a port on the pod diff --git a/pkg/controller/cluster/agent/shared.go b/pkg/controller/cluster/agent/shared.go index 138e254..58b1a21 100644 --- a/pkg/controller/cluster/agent/shared.go +++ b/pkg/controller/cluster/agent/shared.go @@ -65,43 +65,52 @@ token: %s`, } func (s *SharedAgent) Resources() []ctrlruntimeclient.Object { - return []ctrlruntimeclient.Object{s.serviceAccount(), s.role(), s.roleBinding(), s.service(), s.deployment(), s.dnsService()} + return []ctrlruntimeclient.Object{ + s.serviceAccount(), + s.role(), + s.roleBinding(), + s.clusterRole(), + s.clusterRoleBinding(), + s.service(), + s.deployment(), + s.dnsService(), + } } func (s *SharedAgent) deployment() *apps.Deployment { - selector := metav1.LabelSelector{ + selector := &metav1.LabelSelector{ MatchLabels: map[string]string{ "cluster": s.cluster.Name, "type": "agent", "mode": "shared", }, } - name := s.Name() + return &apps.Deployment{ TypeMeta: metav1.TypeMeta{ Kind: "Deployment", APIVersion: "apps/v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: s.Name(), Namespace: s.cluster.Namespace, Labels: selector.MatchLabels, }, Spec: apps.DeploymentSpec{ - Selector: &selector, + Selector: selector, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: selector.MatchLabels, }, - Spec: s.podSpec(s.sharedAgentImage, name, &selector), + Spec: s.podSpec(selector), }, }, } } -func (s *SharedAgent) podSpec(image, name string, affinitySelector *metav1.LabelSelector) v1.PodSpec { - args := []string{"--config", sharedKubeletConfigPath} +func (s *SharedAgent) podSpec(affinitySelector *metav1.LabelSelector) v1.PodSpec { var limit v1.ResourceList + return v1.PodSpec{ Affinity: &v1.Affinity{ PodAntiAffinity: &v1.PodAntiAffinity{ @@ -132,13 +141,16 @@ func (s *SharedAgent) podSpec(image, name string, affinitySelector *metav1.Label }, Containers: []v1.Container{ { - Name: name, - Image: image, + Name: s.Name(), + Image: s.sharedAgentImage, ImagePullPolicy: v1.PullAlways, Resources: v1.ResourceRequirements{ Limits: limit, }, - Args: args, + Args: []string{ + "--config", + sharedKubeletConfigPath, + }, VolumeMounts: []v1.VolumeMount{ { Name: "config", @@ -243,14 +255,14 @@ func (s *SharedAgent) role() *rbacv1.Role { }, Rules: []rbacv1.PolicyRule{ { - Verbs: []string{"*"}, APIGroups: []string{""}, Resources: []string{"pods", "pods/log", "pods/exec", "secrets", "configmaps", "services"}, + Verbs: []string{"*"}, }, { - Verbs: []string{"get", "watch", "list"}, APIGroups: []string{"k3k.io"}, Resources: []string{"clusters"}, + Verbs: []string{"get", "watch", "list"}, }, }, } @@ -281,6 +293,53 @@ func (s *SharedAgent) roleBinding() *rbacv1.RoleBinding { } } +func (s *SharedAgent) clusterRole() *rbacv1.ClusterRole { + return &rbacv1.ClusterRole{ + TypeMeta: metav1.TypeMeta{ + Kind: "ClusterRole", + APIVersion: "rbac.authorization.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: s.Name(), + Namespace: s.cluster.Namespace, + }, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + // nodes and nodes/proxy are needed to gather the stats and metrics from the host from + // '/api/v1/nodes//proxy/stats/summary' and '/api/v1/nodes//proxy/metrics/resource' endpoints + // in the GetStatsSummary and GetMetricsResource provider implementation of the Virtual Kubelet + Resources: []string{"nodes", "nodes/proxy"}, + Verbs: []string{"*"}, + }, + }, + } +} + +func (s *SharedAgent) clusterRoleBinding() *rbacv1.ClusterRoleBinding { + return &rbacv1.ClusterRoleBinding{ + TypeMeta: metav1.TypeMeta{ + Kind: "ClusterRoleBinding", + APIVersion: "rbac.authorization.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: s.Name(), + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: s.Name(), + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: s.Name(), + Namespace: s.cluster.Namespace, + }, + }, + } +} + func (s *SharedAgent) Name() string { return controller.SafeConcatNameWithPrefix(s.cluster.Name, sharedNodeAgentName) }