Skip to content

Commit

Permalink
Merge pull request #393 from Tal-or/n_replicas_with_gu_pod_test
Browse files Browse the repository at this point in the history
e2e:serial: requesting available resources with replicaset
  • Loading branch information
openshift-merge-robot authored Aug 17, 2022
2 parents 2c049ea + 25c702a commit 0bca928
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 1 deletion.
15 changes: 15 additions & 0 deletions internal/resourcelist/resourcelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sort"
"strings"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
Expand All @@ -42,6 +43,20 @@ func ToString(res corev1.ResourceList) string {
return strings.Join(items, ", ")
}

func FromReplicaSet(rs appsv1.ReplicaSet) corev1.ResourceList {
rl := FromContainers(rs.Spec.Template.Spec.Containers)
replicas := rs.Spec.Replicas
for resName, resQty := range rl {
replicaResQty := resQty.DeepCopy()
// index begins from 1 because we already have resources of one replica
for i := 1; i < int(*replicas); i++ {
resQty.Add(replicaResQty)
}
rl[resName] = resQty
}
return rl
}

func FromGuaranteedPod(pod corev1.Pod) corev1.ResourceList {
return FromContainers(pod.Spec.Containers)
}
Expand Down
41 changes: 41 additions & 0 deletions test/e2e/sched/utils/replicaset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"context"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func ListPodsByReplicaSet(aclient client.Client, rs appsv1.ReplicaSet) ([]corev1.Pod, error) {
podList := &corev1.PodList{}
sel, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
return nil, err
}

err = aclient.List(context.TODO(), podList, &client.ListOptions{Namespace: rs.Namespace, LabelSelector: sel})
if err != nil {
return nil, err
}

return podList.Items, nil
}
253 changes: 253 additions & 0 deletions test/e2e/serial/tests/workload_placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tests
import (
"context"
"fmt"
"math"
"time"

"github.com/ghodss/yaml"
Expand Down Expand Up @@ -856,6 +857,258 @@ var _ = Describe("[serial][disruptive][scheduler] numaresources workload placeme
}, time.Minute, time.Second*5).Should(BeTrue(), "resources not restored on %q", targetNodeName)
})
})
Context("cluster with at least two available nodes", func() {
timeout := 5 * time.Minute
BeforeEach(func() {
policies := []nrtv1alpha1.TopologyManagerPolicy{
nrtv1alpha1.SingleNUMANodeContainerLevel,
nrtv1alpha1.SingleNUMANodePodLevel,
}
nrts = e2enrt.FilterByPolicies(nrtList.Items, policies)
if len(nrts) < 2 {
Skip(fmt.Sprintf("not enough nodes with valid policy - found %d", len(nrts)))
}

numOfNodeToBePadded := len(nrts) - 1

rl := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("4G"),
}
By("padding the nodes before test start")
err := padder.Nodes(numOfNodeToBePadded).UntilAvailableIsResourceList(rl).Pad(timeout, e2epadder.PaddingOptions{})
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
By("unpadding the nodes after test finish")
err := padder.Clean()
Expect(err).ToNot(HaveOccurred())

})
It("[test_id:47627] should be able to schedule many replicas with performance time equals to the default scheduler", func() {
nrtInitial, err := e2enrt.GetUpdated(fxt.Client, nrtList, timeout)
Expect(err).ToNot(HaveOccurred())

replicaNumber := int32(10)
rsName := "testrs"
podLabels := map[string]string{
"test": "test-rs",
}

rs := objects.NewTestReplicaSetWithPodSpec(replicaNumber, podLabels, map[string]string{}, fxt.Namespace.Name, rsName, corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "c0",
Image: objects.PauseImage,
Command: []string{objects.PauseCommand},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("200Mi"),
},
},
},
{
Name: "c1",
Image: objects.PauseImage,
Command: []string{objects.PauseCommand},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("100Mi"),
},
},
},
},
})

dpCreateStart := time.Now()
By(fmt.Sprintf("creating a replicaset %s/%s with %d replicas scheduling with: %s", fxt.Namespace.Name, rsName, replicaNumber, corev1.DefaultSchedulerName))
err = fxt.Client.Create(context.TODO(), rs)
Expect(err).ToNot(HaveOccurred())

namespacedRsName := client.ObjectKeyFromObject(rs)
err = fxt.Client.Get(context.TODO(), namespacedRsName, rs)
Expect(err).ToNot(HaveOccurred())

var pods []corev1.Pod
Eventually(func() bool {
pods, err = schedutils.ListPodsByReplicaSet(fxt.Client, *rs)
if err != nil {
klog.Warningf("failed to list the pods of replicaset: %q error: %v", namespacedRsName.String(), err)
return false
}
if len(pods) != int(replicaNumber) {
klog.Warningf("%d pods are exists under replicaset %q", len(pods), namespacedRsName.String())
return false
}
return true
}, time.Minute, 5*time.Second).Should(BeTrue(), "there should be %d pods under deployment: %q", replicaNumber, namespacedRsName.String())
schedTimeWithDefaultScheduler := time.Now().Sub(dpCreateStart)

By(fmt.Sprintf("checking the pod was scheduled with the topology aware scheduler %q", corev1.DefaultSchedulerName))
for _, pod := range pods {
schedOK, err := nrosched.CheckPODWasScheduledWith(fxt.K8sClient, pod.Namespace, pod.Name, corev1.DefaultSchedulerName)
Expect(err).ToNot(HaveOccurred())
Expect(schedOK).To(BeTrue(), "pod %s/%s not scheduled with expected scheduler %s", pod.Namespace, pod.Name, corev1.DefaultSchedulerName)
}

// all pods should land on same node
targetNodeName := pods[0].Spec.NodeName
By(fmt.Sprintf("verifying resources allocation correctness for NRT target: %q", targetNodeName))
var nrtAfterDPCreation nrtv1alpha1.NodeResourceTopologyList
Eventually(func() bool {
nrtAfterDPCreation, err := e2enrt.GetUpdated(fxt.Client, nrtInitial, timeout)
Expect(err).ToNot(HaveOccurred())

nrtInitialTarget, err := e2enrt.FindFromList(nrtInitial.Items, targetNodeName)
Expect(err).ToNot(HaveOccurred())
Expect(nrtInitialTarget.Name).To(Equal(targetNodeName), "expected targetNrt to be equal to %q", targetNodeName)

updatedTargetNrt, err := e2enrt.FindFromList(nrtAfterDPCreation.Items, targetNodeName)
Expect(err).ToNot(HaveOccurred())
Expect(updatedTargetNrt.Name).To(Equal(targetNodeName), "expected targetNrt to be equal to %q", targetNodeName)

rl := e2ereslist.FromReplicaSet(*rs)

dataBefore, err := yaml.Marshal(nrtInitialTarget)
Expect(err).ToNot(HaveOccurred())
dataAfter, err := yaml.Marshal(updatedTargetNrt)
Expect(err).ToNot(HaveOccurred())
match, err := e2enrt.CheckZoneConsumedResourcesAtLeast(*nrtInitialTarget, *updatedTargetNrt, rl)
Expect(err).ToNot(HaveOccurred())

if match == "" {
klog.Warningf("inconsistent accounting: no resources consumed by the running pod,\nNRT before test deployment: %s \nNRT after: %s \npod resources: %v", dataBefore, dataAfter, e2ereslist.ToString(rl))
return false
}
return true
}).WithTimeout(timeout).WithPolling(10 * time.Second).Should(BeTrue())

By(fmt.Sprintf("deleting replicaset %s/%s", fxt.Namespace.Name, rsName))
err = fxt.Client.Delete(context.TODO(), rs)
Expect(err).ToNot(HaveOccurred())

Eventually(func() bool {
By(fmt.Sprintf("checking the resources are restored as expected on %q", targetNodeName))

nrtListPostDelete, err := e2enrt.GetUpdated(fxt.Client, nrtAfterDPCreation, 1*time.Minute)
Expect(err).ToNot(HaveOccurred())

nrtPostDelete, err := e2enrt.FindFromList(nrtListPostDelete.Items, targetNodeName)
Expect(err).ToNot(HaveOccurred())

nrtInitial, err := e2enrt.FindFromList(nrtInitial.Items, targetNodeName)
Expect(err).ToNot(HaveOccurred())

ok, err := e2enrt.CheckEqualAvailableResources(*nrtInitial, *nrtPostDelete)
Expect(err).ToNot(HaveOccurred())
return ok
}, time.Minute, time.Second*5).Should(BeTrue(), "resources not restored on %q", targetNodeName)

rs = objects.NewTestReplicaSetWithPodSpec(replicaNumber, podLabels, map[string]string{}, fxt.Namespace.Name, rsName, corev1.PodSpec{
SchedulerName: serialconfig.Config.SchedulerName,
Containers: []corev1.Container{
{
Name: "c0",
Image: objects.PauseImage,
Command: []string{objects.PauseCommand},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("200Mi"),
},
},
},
{
Name: "c1",
Image: objects.PauseImage,
Command: []string{objects.PauseCommand},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("100Mi"),
},
},
},
},
})
nrtInitial, err = e2enrt.GetUpdated(fxt.Client, nrtv1alpha1.NodeResourceTopologyList{}, timeout)
Expect(err).ToNot(HaveOccurred())

By(fmt.Sprintf("creating a replicaset %s/%s with %d replicas scheduling with: %s", fxt.Namespace.Name, rsName, replicaNumber, serialconfig.Config.SchedulerName))
dpCreateStart = time.Now()
err = fxt.Client.Create(context.TODO(), rs)
Expect(err).ToNot(HaveOccurred())

namespacedRsName = client.ObjectKeyFromObject(rs)
err = fxt.Client.Get(context.TODO(), namespacedRsName, rs)
Expect(err).ToNot(HaveOccurred())

Eventually(func() bool {
pods, err = schedutils.ListPodsByReplicaSet(fxt.Client, *rs)
if err != nil {
klog.Warningf("failed to list the pods of replicaset: %q error: %v", namespacedRsName.String(), err)
return false
}
if len(pods) != int(replicaNumber) {
klog.Warningf("%d pods are exists under replicaset %q", len(pods), namespacedRsName.String())
return false
}
return true
}, time.Minute, 5*time.Second).Should(BeTrue(), "there should be %d pods under deployment: %q", replicaNumber, namespacedRsName.String())
schedTimeWithTopologyScheduler := time.Now().Sub(dpCreateStart)

By(fmt.Sprintf("checking the pod was scheduled with the topology aware scheduler %q", serialconfig.Config.SchedulerName))
for _, pod := range pods {
schedOK, err := nrosched.CheckPODWasScheduledWith(fxt.K8sClient, pod.Namespace, pod.Name, serialconfig.Config.SchedulerName)
Expect(err).ToNot(HaveOccurred())
Expect(schedOK).To(BeTrue(), "pod %s/%s not scheduled with expected scheduler %s", pod.Namespace, pod.Name, serialconfig.Config.SchedulerName)
}

// all pods should land on same node
targetNodeName = pods[0].Spec.NodeName
By(fmt.Sprintf("verifying resources allocation correctness for NRT target: %q", targetNodeName))
Eventually(func() bool {
nrtAfterDPCreation, err := e2enrt.GetUpdated(fxt.Client, nrtInitial, timeout)
Expect(err).ToNot(HaveOccurred())

nrtInitialTarget, err := e2enrt.FindFromList(nrtInitial.Items, targetNodeName)
Expect(err).ToNot(HaveOccurred())
Expect(nrtInitialTarget.Name).To(Equal(targetNodeName), "expected targetNrt to be equal to %q", targetNodeName)

updatedTargetNrt, err := e2enrt.FindFromList(nrtAfterDPCreation.Items, targetNodeName)
Expect(err).ToNot(HaveOccurred())
Expect(updatedTargetNrt.Name).To(Equal(targetNodeName), "expected targetNrt to be equal to %q", targetNodeName)

rl := e2ereslist.FromReplicaSet(*rs)

dataBefore, err := yaml.Marshal(nrtInitialTarget)
Expect(err).ToNot(HaveOccurred())
dataAfter, err := yaml.Marshal(updatedTargetNrt)
Expect(err).ToNot(HaveOccurred())
match, err := e2enrt.CheckZoneConsumedResourcesAtLeast(*nrtInitialTarget, *updatedTargetNrt, rl)
Expect(err).ToNot(HaveOccurred())

if match == "" {
klog.Warningf("inconsistent accounting: no resources consumed by the running pod,\nNRT before test deployment: %s \nNRT after: %s \npod resources: %v", dataBefore, dataAfter, e2ereslist.ToString(rl))
return false
}
return true
}).WithTimeout(timeout).WithPolling(10 * time.Second).Should(BeTrue())

By(fmt.Sprintf("comparing scheduling times between %q and %q", corev1.DefaultSchedulerName, serialconfig.Config.SchedulerName))
diff := int64(math.Abs(float64(schedTimeWithTopologyScheduler.Milliseconds() - schedTimeWithDefaultScheduler.Milliseconds())))
// 2000 milliseconds diff seems reasonable, but can evaluate later if needed.
d := time.Millisecond * 2000
Expect(diff).To(BeNumerically("<", d.Milliseconds()), "expected the difference between scheduling times to be %d at max; actual diff: %d milliseconds", d, diff)

By(fmt.Sprintf("deleting deployment %s/%s", fxt.Namespace.Name, rsName))
err = fxt.Client.Delete(context.TODO(), rs)
Expect(err).ToNot(HaveOccurred())
})
})
})

func makePaddingPod(namespace, nodeName string, zone nrtv1alpha1.Zone, podReqs corev1.ResourceList) (*corev1.Pod, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func GetUpdated(cli client.Client, ref nrtv1alpha1.NodeResourceTopologyList, tim
return false, err
}
klog.Infof("NRT List current ResourceVersion %s reference %s", updatedNrtList.ListMeta.ResourceVersion, ref.ListMeta.ResourceVersion)
return (updatedNrtList.ListMeta.ResourceVersion != ref.ListMeta.ResourceVersion), nil
return updatedNrtList.ListMeta.ResourceVersion != ref.ListMeta.ResourceVersion, nil
})
return updatedNrtList, err
}
Expand Down
48 changes: 48 additions & 0 deletions test/utils/objects/replicaset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package objects

import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func NewTestReplicaSetWithPodSpec(replicas int32, podLabels map[string]string, nodeSelector map[string]string, namespace, name string, podSpec corev1.PodSpec) *appsv1.ReplicaSet {
rs := &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: appsv1.ReplicaSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: podLabels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
},
Spec: podSpec,
},
},
}
if nodeSelector != nil {
rs.Spec.Template.Spec.NodeSelector = nodeSelector
}
return rs
}

0 comments on commit 0bca928

Please sign in to comment.