Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ingvagabund committed Nov 18, 2024
1 parent 7d3ecbe commit 00a6962
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pkg/api/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ type PluginSet struct {
type MetricsCollector struct {
// Enabled metrics collection from kubernetes metrics.
// Later, the collection can be extended to other providers.
Enabled bool `json:"enabled"`
Enabled bool `json:"enabled,omitempty"`
}
2 changes: 1 addition & 1 deletion pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func newDescheduler(rs *options.DeschedulerServer, deschedulerPolicy *api.Desche
if deschedulerPolicy.NodeSelector != nil {
nodeSelector = *deschedulerPolicy.NodeSelector
}
metricsCollector = metricscollector.NewMetricsCollector(rs.Client, rs.MetricsClient, nodeSelector)
metricsCollector = metricscollector.NewMetricsCollector(rs.Client.CoreV1().Nodes(), rs.MetricsClient, nodeSelector)
}

return &descheduler{
Expand Down
24 changes: 13 additions & 11 deletions pkg/descheduler/metricscollector/metricscollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/klog/v2"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
utilptr "k8s.io/utils/ptr"
Expand All @@ -38,7 +38,7 @@ const (
)

type MetricsCollector struct {
clientset kubernetes.Interface
nodeLister corev1.NodeInterface
metricsClientset metricsclient.Interface
nodeSelector string

Expand All @@ -49,9 +49,9 @@ type MetricsCollector struct {
hasSynced bool
}

func NewMetricsCollector(clientset kubernetes.Interface, metricsClientset metricsclient.Interface, nodeSelector string) *MetricsCollector {
func NewMetricsCollector(nodeLister corev1.NodeInterface, metricsClientset metricsclient.Interface, nodeSelector string) *MetricsCollector {
return &MetricsCollector{
clientset: clientset,
nodeLister: nodeLister,
metricsClientset: metricsClientset,
nodeSelector: nodeSelector,
nodes: make(map[string]map[v1.ResourceName]*resource.Quantity),
Expand All @@ -65,7 +65,7 @@ func (mc *MetricsCollector) Run(ctx context.Context) {
}

func weightedAverage(prevValue, value int64) int64 {
return int64(math.Floor(beta*float64(prevValue) + (1-beta)*float64(value)))
return int64(math.Round(beta*float64(prevValue) + (1-beta)*float64(value)))
}

func (mc *MetricsCollector) AllNodesUsage() (map[string]map[v1.ResourceName]*resource.Quantity, error) {
Expand All @@ -88,7 +88,7 @@ func (mc *MetricsCollector) NodeUsage(node *v1.Node) (map[v1.ResourceName]*resou
defer mc.mu.RUnlock()

if _, exists := mc.nodes[node.Name]; !exists {
klog.V(4).Infof("unable to find node %q in the collected metrics", node.Name)
klog.V(4).InfoS("unable to find node in the collected metrics", "node", node.Name)
return nil, fmt.Errorf("unable to find node %q in the collected metrics", node.Name)
}
return map[v1.ResourceName]*resource.Quantity{
Expand All @@ -108,7 +108,7 @@ func (mc *MetricsCollector) MetricsClient() metricsclient.Interface {
func (mc *MetricsCollector) Collect(ctx context.Context) error {
mc.mu.Lock()
defer mc.mu.Unlock()
nodes, err := mc.clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: mc.nodeSelector})
nodes, err := mc.nodeLister.List(context.TODO(), metav1.ListOptions{LabelSelector: mc.nodeSelector})
if err != nil {
return fmt.Errorf("unable to list nodes: %v", err)
}
Expand All @@ -128,11 +128,13 @@ func (mc *MetricsCollector) Collect(ctx context.Context) error {
}
} else {
// get MilliValue to reduce loss of precision
mc.nodes[node.Name][v1.ResourceCPU].SetMilli(
weightedAverage(mc.nodes[node.Name][v1.ResourceCPU].MilliValue(), metrics.Usage.Cpu().MilliValue()),
mc.nodes[node.Name][v1.ResourceCPU].SetScaled(
weightedAverage(mc.nodes[node.Name][v1.ResourceCPU].ScaledValue(resource.Micro), metrics.Usage.Cpu().ScaledValue(resource.Micro)),
resource.Micro,
)
mc.nodes[node.Name][v1.ResourceMemory].SetMilli(
weightedAverage(mc.nodes[node.Name][v1.ResourceMemory].MilliValue(), metrics.Usage.Memory().MilliValue()),
mc.nodes[node.Name][v1.ResourceMemory].SetScaled(
weightedAverage(mc.nodes[node.Name][v1.ResourceMemory].ScaledValue(resource.Micro), metrics.Usage.Memory().ScaledValue(resource.Micro)),
resource.Micro,
)
}
}
Expand Down
49 changes: 47 additions & 2 deletions pkg/descheduler/metricscollector/metricscollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package metricscollector

import (
"context"
"math"
"testing"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestMetricsCollector(t *testing.T) {
metricsClientset.Tracker().Create(gvr, n3metrics, "")

t.Logf("Set initial node cpu usage to 1400")
collector := NewMetricsCollector(clientset, metricsClientset, "")
collector := NewMetricsCollector(clientset.CoreV1().Nodes(), metricsClientset, "")
collector.Collect(context.TODO())
nodesUsage, _ := collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1400)
Expand All @@ -70,7 +71,7 @@ func TestMetricsCollector(t *testing.T) {
allnodesUsage, _ = collector.AllNodesUsage()
checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1310)

t.Logf("Set current node cpu usage to 500")
t.Logf("Set current node cpu usage to 900")
n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(900, resource.DecimalSI)
metricsClientset.Tracker().Update(gvr, n2metrics, "")
collector.Collect(context.TODO())
Expand All @@ -79,3 +80,47 @@ func TestMetricsCollector(t *testing.T) {
allnodesUsage, _ = collector.AllNodesUsage()
checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1269)
}

func TestMetricsCollectorConvergence(t *testing.T) {
gvr := schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"}

n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil)

n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816)
n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816)
n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816)

clientset := fakeclientset.NewSimpleClientset(n1, n2, n3)
metricsClientset := fakemetricsclient.NewSimpleClientset()
metricsClientset.Tracker().Create(gvr, n1metrics, "")
metricsClientset.Tracker().Create(gvr, n2metrics, "")
metricsClientset.Tracker().Create(gvr, n3metrics, "")

t.Logf("Set initial node cpu usage to 1400")
collector := NewMetricsCollector(clientset.CoreV1().Nodes(), metricsClientset, "")
collector.Collect(context.TODO())
nodesUsage, _ := collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1400)
allnodesUsage, _ := collector.AllNodesUsage()
checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1400)

t.Logf("Set current node cpu usage to 900 and wait until it converges to it")
n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(900, resource.DecimalSI)
metricsClientset.Tracker().Update(gvr, n2metrics, "")
converged := false
for i := 0; i < 100; i++ {
collector.Collect(context.TODO())
nodesUsage, _ = collector.NodeUsage(n2)
if math.Abs(float64(900-nodesUsage[v1.ResourceCPU].MilliValue())) < 2 {
t.Logf("Node usage converged to 900+-1")
converged = true
break
}
t.Logf("The current node usage: %v", nodesUsage[v1.ResourceCPU].MilliValue())
}
if !converged {
t.Fatalf("The node usage did not converged to 900+-1")
}
}
27 changes: 15 additions & 12 deletions pkg/framework/plugins/nodeutilization/lownodeutilization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,20 +1136,23 @@ func TestLowNodeUtilization(t *testing.T) {
objs = append(objs, pod)
}

metricsClientset := fakemetricsclient.NewSimpleClientset()
for _, nodemetrics := range tc.nodemetricses {
metricsClientset.Tracker().Create(nodesgvr, nodemetrics, "")
}
for _, podmetrics := range tc.podmetricses {
metricsClientset.Tracker().Create(podsgvr, podmetrics, podmetrics.Namespace)
}

fakeClient := fake.NewSimpleClientset(objs...)

collector := metricscollector.NewMetricsCollector(fakeClient, metricsClientset, "")
err := collector.Collect(ctx)
if err != nil {
t.Fatalf("unable to collect metrics: %v", err)
var collector *metricscollector.MetricsCollector
if len(tc.nodemetricses) > 0 || len(tc.podmetricses) > 0 {
metricsClientset := fakemetricsclient.NewSimpleClientset()
for _, nodemetrics := range tc.nodemetricses {
metricsClientset.Tracker().Create(nodesgvr, nodemetrics, "")
}
for _, podmetrics := range tc.podmetricses {
metricsClientset.Tracker().Create(podsgvr, podmetrics, podmetrics.Namespace)
}

collector = metricscollector.NewMetricsCollector(fakeClient.CoreV1().Nodes(), metricsClientset, "")
err := collector.Collect(ctx)
if err != nil {
t.Fatalf("unable to collect metrics: %v", err)
}
}

podsForEviction := make(map[string]struct{})
Expand Down
6 changes: 6 additions & 0 deletions pkg/framework/plugins/nodeutilization/usageclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ func (client *actualUsageClient) sync(nodes []*v1.Node) error {
if !ok {
return fmt.Errorf("unable to find node %q in the collected metrics", node.Name)
}
if _, exists := nodeUsage[v1.ResourceCPU]; !exists {
return fmt.Errorf("unable to find %q resource for collected %q node metric", v1.ResourceCPU, node.Name)
}
if _, exists := nodeUsage[v1.ResourceMemory]; !exists {
return fmt.Errorf("unable to find %q resource for collected %q node metric", v1.ResourceMemory, node.Name)
}
nodeUsage[v1.ResourcePods] = resource.NewQuantity(int64(len(pods)), resource.DecimalSI)

// store the snapshot of pods from the same (or the closest) node utilization computation
Expand Down
2 changes: 1 addition & 1 deletion pkg/framework/plugins/nodeutilization/usageclients_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestActualUsageClient(t *testing.T) {
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())

collector := metricscollector.NewMetricsCollector(clientset, metricsClientset, "")
collector := metricscollector.NewMetricsCollector(clientset.CoreV1().Nodes(), metricsClientset, "")

usageClient := newActualUsageClient(
resourceNames,
Expand Down

0 comments on commit 00a6962

Please sign in to comment.