Skip to content

Commit

Permalink
implemented GetStatsSummary and GetMetricsResource for Virtual Kubelet
Browse files Browse the repository at this point in the history
  • Loading branch information
enrichman committed Dec 19, 2024
1 parent 70a098d commit c3ed2f6
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 20 deletions.
4 changes: 2 additions & 2 deletions k3k-kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ func (k *kubelet) newProviderFunc(namespace, name, hostname, agentIP, serverIP,
if err != nil {
return nil, nil, errors.New("unable to make nodeutil provider: " + err.Error())
}
nodeProvider := provider.Node{}

provider.ConfigureNode(pc.Node, hostname, k.port, agentIP)
return utilProvider, &nodeProvider, nil

return utilProvider, &provider.Node{}, nil
}
}

Expand Down
196 changes: 196 additions & 0 deletions k3k-kubelet/provider/collectors/kubelet_resource_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the Apache 2.0 license.
See https://github.com/virtual-kubelet/azure-aci/tree/master/pkg/metrics/collectors
*/

package collectors

import (
"time"

stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
compbasemetrics "k8s.io/component-base/metrics"
)

// defining metrics
var (
nodeCPUUsageDesc = compbasemetrics.NewDesc("node_cpu_usage_seconds_total",
"Cumulative cpu time consumed by the node in core-seconds",
nil,
nil,
compbasemetrics.ALPHA,
"")

nodeMemoryUsageDesc = compbasemetrics.NewDesc("node_memory_working_set_bytes",
"Current working set of the node in bytes",
nil,
nil,
compbasemetrics.ALPHA,
"")

containerCPUUsageDesc = compbasemetrics.NewDesc("container_cpu_usage_seconds_total",
"Cumulative cpu time consumed by the container in core-seconds",
[]string{"container", "pod", "namespace"},
nil,
compbasemetrics.ALPHA,
"")

containerMemoryUsageDesc = compbasemetrics.NewDesc("container_memory_working_set_bytes",
"Current working set of the container in bytes",
[]string{"container", "pod", "namespace"},
nil,
compbasemetrics.ALPHA,
"")

podCPUUsageDesc = compbasemetrics.NewDesc("pod_cpu_usage_seconds_total",
"Cumulative cpu time consumed by the pod in core-seconds",
[]string{"pod", "namespace"},
nil,
compbasemetrics.ALPHA,
"")

podMemoryUsageDesc = compbasemetrics.NewDesc("pod_memory_working_set_bytes",
"Current working set of the pod in bytes",
[]string{"pod", "namespace"},
nil,
compbasemetrics.ALPHA,
"")

resourceScrapeResultDesc = compbasemetrics.NewDesc("scrape_error",
"1 if there was an error while getting container metrics, 0 otherwise",
nil,
nil,
compbasemetrics.ALPHA,
"")

containerStartTimeDesc = compbasemetrics.NewDesc("container_start_time_seconds",
"Start time of the container since unix epoch in seconds",
[]string{"container", "pod", "namespace"},
nil,
compbasemetrics.ALPHA,
"")
)

// NewResourceMetricsCollector returns a metrics.StableCollector which exports resource metrics
func NewKubeletResourceMetricsCollector(podStats *stats.Summary) compbasemetrics.StableCollector {
return &resourceMetricsCollector{
providerPodStats: podStats,
}
}

type resourceMetricsCollector struct {
compbasemetrics.BaseStableCollector

providerPodStats *stats.Summary
}

// Check if resourceMetricsCollector implements necessary interface
var _ compbasemetrics.StableCollector = &resourceMetricsCollector{}

// DescribeWithStability implements compbasemetrics.StableCollector
func (rc *resourceMetricsCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) {
ch <- nodeCPUUsageDesc
ch <- nodeMemoryUsageDesc
ch <- containerStartTimeDesc
ch <- containerCPUUsageDesc
ch <- containerMemoryUsageDesc
ch <- podCPUUsageDesc
ch <- podMemoryUsageDesc
ch <- resourceScrapeResultDesc
}

// CollectWithStability implements compbasemetrics.StableCollector
// Since new containers are frequently created and removed, using the Gauge would
// leak metric collectors for containers or pods that no longer exist. Instead, implement
// custom collector in a way that only collects metrics for active containers.
func (rc *resourceMetricsCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) {
var errorCount float64
defer func() {
ch <- compbasemetrics.NewLazyConstMetric(resourceScrapeResultDesc, compbasemetrics.GaugeValue, errorCount)
}()

statsSummary := *rc.providerPodStats
rc.collectNodeCPUMetrics(ch, statsSummary.Node)
rc.collectNodeMemoryMetrics(ch, statsSummary.Node)

for _, pod := range statsSummary.Pods {
for _, container := range pod.Containers {
rc.collectContainerStartTime(ch, pod, container)
rc.collectContainerCPUMetrics(ch, pod, container)
rc.collectContainerMemoryMetrics(ch, pod, container)
}
rc.collectPodCPUMetrics(ch, pod)
rc.collectPodMemoryMetrics(ch, pod)
}
}

// implement collector methods and validate that correct data is used

func (rc *resourceMetricsCollector) collectNodeCPUMetrics(ch chan<- compbasemetrics.Metric, s stats.NodeStats) {
if s.CPU == nil || s.CPU.UsageCoreNanoSeconds == nil {
return
}

ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time,
compbasemetrics.NewLazyConstMetric(nodeCPUUsageDesc, compbasemetrics.CounterValue, float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second)))
}

func (rc *resourceMetricsCollector) collectNodeMemoryMetrics(ch chan<- compbasemetrics.Metric, s stats.NodeStats) {
if s.Memory == nil || s.Memory.WorkingSetBytes == nil {
return
}

ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time,
compbasemetrics.NewLazyConstMetric(nodeMemoryUsageDesc, compbasemetrics.GaugeValue, float64(*s.Memory.WorkingSetBytes)))
}

func (rc *resourceMetricsCollector) collectContainerStartTime(ch chan<- compbasemetrics.Metric, pod stats.PodStats, s stats.ContainerStats) {
if s.StartTime.Unix() <= 0 {
return
}

ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.StartTime.Time,
compbasemetrics.NewLazyConstMetric(containerStartTimeDesc, compbasemetrics.GaugeValue, float64(s.StartTime.UnixNano())/float64(time.Second), s.Name, pod.PodRef.Name, pod.PodRef.Namespace))
}

func (rc *resourceMetricsCollector) collectContainerCPUMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats, s stats.ContainerStats) {
if s.CPU == nil || s.CPU.UsageCoreNanoSeconds == nil {
return
}

ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time,
compbasemetrics.NewLazyConstMetric(containerCPUUsageDesc, compbasemetrics.CounterValue,
float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second), s.Name, pod.PodRef.Name, pod.PodRef.Namespace))
}

func (rc *resourceMetricsCollector) collectContainerMemoryMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats, s stats.ContainerStats) {
if s.Memory == nil || s.Memory.WorkingSetBytes == nil {
return
}

ch <- compbasemetrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time,
compbasemetrics.NewLazyConstMetric(containerMemoryUsageDesc, compbasemetrics.GaugeValue,
float64(*s.Memory.WorkingSetBytes), s.Name, pod.PodRef.Name, pod.PodRef.Namespace))
}

func (rc *resourceMetricsCollector) collectPodCPUMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats) {
if pod.CPU == nil || pod.CPU.UsageCoreNanoSeconds == nil {
return
}

ch <- compbasemetrics.NewLazyMetricWithTimestamp(pod.CPU.Time.Time,
compbasemetrics.NewLazyConstMetric(podCPUUsageDesc, compbasemetrics.CounterValue,
float64(*pod.CPU.UsageCoreNanoSeconds)/float64(time.Second), pod.PodRef.Name, pod.PodRef.Namespace))
}

func (rc *resourceMetricsCollector) collectPodMemoryMetrics(ch chan<- compbasemetrics.Metric, pod stats.PodStats) {
if pod.Memory == nil || pod.Memory.WorkingSetBytes == nil {
return
}

ch <- compbasemetrics.NewLazyMetricWithTimestamp(pod.Memory.Time.Time,
compbasemetrics.NewLazyConstMetric(podMemoryUsageDesc, compbasemetrics.GaugeValue,
float64(*pod.Memory.WorkingSetBytes), pod.PodRef.Name, pod.PodRef.Namespace))
}
101 changes: 96 additions & 5 deletions k3k-kubelet/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ package provider

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"

"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
"github.com/rancher/k3k/k3k-kubelet/controller"
"github.com/rancher/k3k/k3k-kubelet/provider/collectors"
"github.com/rancher/k3k/k3k-kubelet/translate"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
k3klog "github.com/rancher/k3k/pkg/log"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -24,10 +28,12 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes/scheme"
cv1 "k8s.io/client-go/kubernetes/typed/core/v1"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport/spdy"
compbasemetrics "k8s.io/component-base/metrics"
metricset "k8s.io/metrics/pkg/client/clientset/versioned"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -42,7 +48,7 @@ type Provider struct {
VirtualClient client.Client
ClientConfig rest.Config
CoreClient cv1.CoreV1Interface
MetricsClient metricset.Interface
MetricsClient metricset.Interface // TODO: do we need this?
ClusterNamespace string
ClusterName string
serverIP string
Expand All @@ -55,10 +61,12 @@ func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger *k3
if err != nil {
return nil, err
}

translater := translate.ToHostTranslater{
ClusterName: name,
ClusterNamespace: namespace,
}

p := Provider{
Handler: controller.ControllerHandler{
Mgr: virtualMgr,
Expand Down Expand Up @@ -177,13 +185,96 @@ func (p *Provider) AttachToContainer(ctx context.Context, namespace, podName, co
}

// GetStatsSummary gets the stats for the node, including running pods
func (p *Provider) GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error) {
return nil, fmt.Errorf("not implemented")
func (p *Provider) GetStatsSummary(ctx context.Context) (*statsv1alpha1.Summary, error) {
p.logger.Debug("GetStatsSummary")

nodeList := &v1.NodeList{}
if err := p.CoreClient.RESTClient().Get().Resource("nodes").Do(ctx).Into(nodeList); err != nil {
return nil, fmt.Errorf("unable to get nodes of cluster %s in namespace %s: %w", p.ClusterName, p.ClusterNamespace, err)
}

// fetch the stats from all the nodes
var nodeStats statsv1alpha1.NodeStats
var allPodsStats []statsv1alpha1.PodStats

for _, n := range nodeList.Items {
res, err := p.CoreClient.RESTClient().
Get().
Resource("nodes").
Name(n.Name).
SubResource("proxy").
Suffix("stats/summary").
DoRaw(ctx)
if err != nil {
return nil, fmt.Errorf(
"unable to get stats of node '%s', from cluster %s in namespace %s: %w",
n.Name, p.ClusterName, p.ClusterNamespace, err,
)
}

stats := &statsv1alpha1.Summary{}
if err := json.Unmarshal(res, stats); err != nil {
return nil, err
}

// TODO: we should probably calculate somehow the node stats from the different nodes of the host
// or reflect different nodes from the virtual kubelet.
// For the moment let's just pick one random node stats.
nodeStats = stats.Node
allPodsStats = append(allPodsStats, stats.Pods...)
}

pods, err := p.GetPods(ctx)
if err != nil {
return nil, err
}

podsNameMap := make(map[string]*v1.Pod)
for _, pod := range pods {
hostPodName := p.Translater.TranslateName(pod.Namespace, pod.Name)
podsNameMap[hostPodName] = pod
}

filteredStats := &statsv1alpha1.Summary{
Node: nodeStats,
Pods: make([]statsv1alpha1.PodStats, 0),
}

for _, podStat := range allPodsStats {
// skip pods that are not in the cluster namespace
if podStat.PodRef.Namespace != p.ClusterNamespace {
continue
}

// rewrite the PodReference to match the data of the virtual cluster
if pod, found := podsNameMap[podStat.PodRef.Name]; found {
podStat.PodRef = statsv1alpha1.PodReference{
Name: pod.Name,
Namespace: pod.Namespace,
UID: string(pod.UID),
}
filteredStats.Pods = append(filteredStats.Pods, podStat)
}
}

return filteredStats, nil
}

// GetMetricsResource gets the metrics for the node, including running pods
func (p *Provider) GetMetricsResource(context.Context) ([]*dto.MetricFamily, error) {
return nil, fmt.Errorf("not implemented")
func (p *Provider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) {
statsSummary, err := p.GetStatsSummary(ctx)
if err != nil {
return nil, errors.Wrapf(err, "error fetching MetricsResource")
}

registry := compbasemetrics.NewKubeRegistry()
registry.CustomMustRegister(collectors.NewKubeletResourceMetricsCollector(statsSummary))

metricFamily, err := registry.Gather()
if err != nil {
return nil, errors.Wrapf(err, "error gathering metrics from collector")
}
return metricFamily, nil
}

// PortForward forwards a local port to a port on the pod
Expand Down
Loading

0 comments on commit c3ed2f6

Please sign in to comment.