Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topologyspreadconstraint: support matchLabelKeys #1233

Merged
merged 3 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,19 @@ topologyBalanceNodeFit: false

Strategy parameter `labelSelector` is not utilized when balancing topology domains and is only applied during eviction to determine if the pod can be evicted.

[Supported Constraints](https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/#spread-constraint-definition) fields:

|Name|Supported?|
|----|----------|
|`maxSkew`|Yes|
|`minDomains`|No|
|`topologyKey`|Yes|
|`whenUnsatisfiable`|Yes|
|`labelSelector`|Yes|
|`matchLabelKeys`|Yes|
|`nodeAffinityPolicy`|Yes|
|`nodeTaintsPolicy`|Yes|

**Parameters:**

|Name|Type|
Expand Down
4 changes: 4 additions & 0 deletions hack/kind_config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
featureGates:
# beta as of 1.27 but we currently run e2e on 1.26
# this flag should be removed as part of Descheduler 0.29 release
MatchLabelKeysInPodTopologySpread: true
nodes:
- role: control-plane
- role: worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
utilpointer "k8s.io/utils/pointer"

v1helper "k8s.io/component-helpers/scheduling/corev1"
nodeaffinity "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
Expand All @@ -50,6 +50,19 @@ type topology struct {
pods []*v1.Pod
}

// topologySpreadConstraint is an internal version for v1.TopologySpreadConstraint
// and where the selector is parsed.
// This mirrors scheduler: https://github.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/plugins/podtopologyspread/common.go#L37
type topologySpreadConstraint struct {
maxSkew int32
topologyKey string
selector labels.Selector
nodeAffinityPolicy v1.NodeInclusionPolicy
nodeTaintsPolicy v1.NodeInclusionPolicy
podNodeAffinity nodeaffinity.RequiredNodeAffinity
podTolerations []v1.Toleration
}

// RemovePodsViolatingTopologySpreadConstraint evicts pods which violate their topology spread constraints
type RemovePodsViolatingTopologySpreadConstraint struct {
handle frameworktypes.Handle
Expand Down Expand Up @@ -81,12 +94,6 @@ func New(args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plug
}, nil
}

type topologyConstraintSet struct {
constraint v1.TopologySpreadConstraint
podNodeAffinity nodeaffinity.RequiredNodeAffinity
podTolerations []v1.Toleration
}

// Name retrieves the plugin name
func (d *RemovePodsViolatingTopologySpreadConstraint) Name() string {
return PluginName
Expand Down Expand Up @@ -140,21 +147,22 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
}

// ...where there is a topology constraint
namespaceTopologySpreadConstraints := []topologyConstraintSet{}
var namespaceTopologySpreadConstraints []topologySpreadConstraint
for _, pod := range namespacedPods[namespace] {
for _, constraint := range pod.Spec.TopologySpreadConstraints {
// Ignore topology constraints if they are not included
if !allowedConstraints.Has(constraint.WhenUnsatisfiable) {
continue
}
requiredSchedulingTerm := nodeaffinity.GetRequiredNodeAffinity(pod)
namespaceTopologySpreadConstraint := topologyConstraintSet{
constraint: constraint,
podNodeAffinity: requiredSchedulingTerm,
podTolerations: pod.Spec.Tolerations,

namespaceTopologySpreadConstraint, err := newTopologySpreadConstraint(constraint, pod)
if err != nil {
klog.ErrorS(err, "cannot process topology spread constraint")
continue
}
// Need to check v1.TopologySpreadConstraint deepEquality because
// v1.TopologySpreadConstraint has pointer fields

// Need to check TopologySpreadConstraint deepEquality because
// TopologySpreadConstraint can haves pointer fields
// and we don't need to go over duplicated constraints later on
if hasIdenticalConstraints(namespaceTopologySpreadConstraint, namespaceTopologySpreadConstraints) {
continue
Expand All @@ -167,27 +175,18 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
}

// 2. for each topologySpreadConstraint in that namespace
for _, constraintSet := range namespaceTopologySpreadConstraints {
constraint := constraintSet.constraint
nodeAffinity := constraintSet.podNodeAffinity
tolerations := constraintSet.podTolerations
for _, tsc := range namespaceTopologySpreadConstraints {
constraintTopologies := make(map[topologyPair][]*v1.Pod)
// pre-populate the topologyPair map with all the topologies available from the nodeMap
// (we can't just build it from existing pods' nodes because a topology may have 0 pods)
for _, node := range nodeMap {
if val, ok := node.Labels[constraint.TopologyKey]; ok {
if matchNodeInclusionPolicies(&constraint, tolerations, node, nodeAffinity) {
constraintTopologies[topologyPair{key: constraint.TopologyKey, value: val}] = make([]*v1.Pod, 0)
if val, ok := node.Labels[tsc.topologyKey]; ok {
if matchNodeInclusionPolicies(tsc, node) {
constraintTopologies[topologyPair{key: tsc.topologyKey, value: val}] = make([]*v1.Pod, 0)
}
}
}

selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector)
if err != nil {
klog.ErrorS(err, "Couldn't parse label selector as selector", "selector", constraint.LabelSelector)
continue
}

// 3. for each evictable pod in that namespace
// (this loop is where we count the number of pods per topologyValue that match this constraint's selector)
var sumPods float64
Expand All @@ -197,7 +196,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
continue
}
// 4. if the pod matches this TopologySpreadConstraint LabelSelector
if !selector.Matches(labels.Set(pod.Labels)) {
if !tsc.selector.Matches(labels.Set(pod.Labels)) {
continue
}

Expand All @@ -207,21 +206,21 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
// If ok is false, node is nil in which case node.Labels will panic. In which case a pod is yet to be scheduled. So it's safe to just continue here.
continue
}
nodeValue, ok := node.Labels[constraint.TopologyKey]
nodeValue, ok := node.Labels[tsc.topologyKey]
if !ok {
continue
}
// 6. create a topoPair with key as this TopologySpreadConstraint
topoPair := topologyPair{key: constraint.TopologyKey, value: nodeValue}
topoPair := topologyPair{key: tsc.topologyKey, value: nodeValue}
// 7. add the pod with key as this topoPair
constraintTopologies[topoPair] = append(constraintTopologies[topoPair], pod)
sumPods++
}
if topologyIsBalanced(constraintTopologies, constraint) {
klog.V(2).InfoS("Skipping topology constraint because it is already balanced", "constraint", constraint)
if topologyIsBalanced(constraintTopologies, tsc) {
klog.V(2).InfoS("Skipping topology constraint because it is already balanced", "constraint", tsc)
continue
}
d.balanceDomains(podsForEviction, constraintSet, constraintTopologies, sumPods, nodes)
d.balanceDomains(podsForEviction, tsc, constraintTopologies, sumPods, nodes)
}
}

Expand All @@ -246,7 +245,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
}

// hasIdenticalConstraints checks if we already had an identical TopologySpreadConstraint in namespaceTopologySpreadConstraints slice
func hasIdenticalConstraints(newConstraint topologyConstraintSet, namespaceTopologySpreadConstraints []topologyConstraintSet) bool {
func hasIdenticalConstraints(newConstraint topologySpreadConstraint, namespaceTopologySpreadConstraints []topologySpreadConstraint) bool {
for _, constraint := range namespaceTopologySpreadConstraints {
if reflect.DeepEqual(newConstraint, constraint) {
return true
Expand All @@ -257,7 +256,7 @@ func hasIdenticalConstraints(newConstraint topologyConstraintSet, namespaceTopol

// topologyIsBalanced checks if any domains in the topology differ by more than the MaxSkew
// this is called before any sorting or other calculations and is used to skip topologies that don't need to be balanced
func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, constraint v1.TopologySpreadConstraint) bool {
func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, tsc topologySpreadConstraint) bool {
minDomainSize := math.MaxInt32
maxDomainSize := math.MinInt32
for _, pods := range topology {
Expand All @@ -267,7 +266,7 @@ func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, constraint v1.Topol
if len(pods) > maxDomainSize {
maxDomainSize = len(pods)
}
if int32(maxDomainSize-minDomainSize) > constraint.MaxSkew {
if int32(maxDomainSize-minDomainSize) > tsc.maxSkew {
return false
}
}
Expand Down Expand Up @@ -296,20 +295,19 @@ func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, constraint v1.Topol
// (assuming even distribution by the scheduler of the evicted pods)
func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains(
podsForEviction map[*v1.Pod]struct{},
constraintSet topologyConstraintSet,
tsc topologySpreadConstraint,
constraintTopologies map[topologyPair][]*v1.Pod,
sumPods float64,
nodes []*v1.Node,
) {
constraint := constraintSet.constraint
idealAvg := sumPods / float64(len(constraintTopologies))
isEvictable := d.handle.Evictor().Filter
sortedDomains := sortDomains(constraintTopologies, isEvictable)
getPodsAssignedToNode := d.handle.GetPodsAssignedToNodeFunc()
topologyBalanceNodeFit := utilpointer.BoolDeref(d.args.TopologyBalanceNodeFit, true)

eligibleNodes := filterEligibleNodes(nodes, constraintSet)
nodesBelowIdealAvg := filterNodesBelowIdealAvg(eligibleNodes, sortedDomains, constraint.TopologyKey, idealAvg)
eligibleNodes := filterEligibleNodes(nodes, tsc)
nodesBelowIdealAvg := filterNodesBelowIdealAvg(eligibleNodes, sortedDomains, tsc.topologyKey, idealAvg)

// i is the index for belowOrEqualAvg
// j is the index for aboveAvg
Expand All @@ -325,7 +323,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains(
skew := float64(len(sortedDomains[j].pods) - len(sortedDomains[i].pods))

// if k and j are within the maxSkew of each other, move to next belowOrEqualAvg
if int32(skew) <= constraint.MaxSkew {
if int32(skew) <= tsc.maxSkew {
i++
continue
}
Expand All @@ -339,7 +337,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains(
aboveAvg := math.Ceil(float64(len(sortedDomains[j].pods)) - idealAvg)
belowAvg := math.Ceil(idealAvg - float64(len(sortedDomains[i].pods)))
smallestDiff := math.Min(aboveAvg, belowAvg)
halfSkew := math.Ceil((skew - float64(constraint.MaxSkew)) / 2)
halfSkew := math.Ceil((skew - float64(tsc.maxSkew)) / 2)
movePods := int(math.Min(smallestDiff, halfSkew))
if movePods <= 0 {
i++
Expand Down Expand Up @@ -466,33 +464,82 @@ func doNotScheduleTaintsFilterFunc() func(t *v1.Taint) bool {
}
}

func filterEligibleNodes(nodes []*v1.Node, constraintSet topologyConstraintSet) []*v1.Node {
constraint := constraintSet.constraint
nodeAffinity := constraintSet.podNodeAffinity
tolerations := constraintSet.podTolerations
func filterEligibleNodes(nodes []*v1.Node, tsc topologySpreadConstraint) []*v1.Node {
var eligibleNodes []*v1.Node
for _, node := range nodes {
if matchNodeInclusionPolicies(&constraint, tolerations, node, nodeAffinity) {
if matchNodeInclusionPolicies(tsc, node) {
eligibleNodes = append(eligibleNodes, node)
}
}
return eligibleNodes
}

func matchNodeInclusionPolicies(tsc *v1.TopologySpreadConstraint, tolerations []v1.Toleration, node *v1.Node, require nodeaffinity.RequiredNodeAffinity) bool {
// Nil is equivalent to honor
if tsc.NodeAffinityPolicy == nil || *tsc.NodeAffinityPolicy == v1.NodeInclusionPolicyHonor {
func matchNodeInclusionPolicies(tsc topologySpreadConstraint, node *v1.Node) bool {
if tsc.nodeAffinityPolicy == v1.NodeInclusionPolicyHonor {
// We ignore parsing errors here for backwards compatibility.
if match, _ := require.Match(node); !match {
if match, _ := tsc.podNodeAffinity.Match(node); !match {
return false
}
}

// Nil is equivalent to ignore
if tsc.NodeTaintsPolicy != nil && *tsc.NodeTaintsPolicy == v1.NodeInclusionPolicyHonor {
if _, untolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, tolerations, doNotScheduleTaintsFilterFunc()); untolerated {
if tsc.nodeTaintsPolicy == v1.NodeInclusionPolicyHonor {
if _, untolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, tsc.podTolerations, doNotScheduleTaintsFilterFunc()); untolerated {
return false
}
}
return true
}

// inspired by Scheduler: https://github.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/plugins/podtopologyspread/common.go#L90
func newTopologySpreadConstraint(constraint v1.TopologySpreadConstraint, pod *v1.Pod) (topologySpreadConstraint, error) {
selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector)
if err != nil {
return topologySpreadConstraint{}, err
}

if len(constraint.MatchLabelKeys) > 0 && pod.Labels != nil {
matchLabels := make(labels.Set)
for _, labelKey := range constraint.MatchLabelKeys {
if value, ok := pod.Labels[labelKey]; ok {
matchLabels[labelKey] = value
}
}
if len(matchLabels) > 0 {
selector = mergeLabelSetWithSelector(matchLabels, selector)
}
}

tsc := topologySpreadConstraint{
maxSkew: constraint.MaxSkew,
topologyKey: constraint.TopologyKey,
selector: selector,
nodeAffinityPolicy: v1.NodeInclusionPolicyHonor, // If NodeAffinityPolicy is nil, we treat NodeAffinityPolicy as "Honor".
nodeTaintsPolicy: v1.NodeInclusionPolicyIgnore, // If NodeTaintsPolicy is nil, we treat NodeTaintsPolicy as "Ignore".
podNodeAffinity: nodeaffinity.GetRequiredNodeAffinity(pod),
podTolerations: pod.Spec.Tolerations,
}
if constraint.NodeAffinityPolicy != nil {
tsc.nodeAffinityPolicy = *constraint.NodeAffinityPolicy
}
if constraint.NodeTaintsPolicy != nil {
tsc.nodeTaintsPolicy = *constraint.NodeTaintsPolicy
}

return tsc, nil
}

// Scheduler: https://github.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/plugins/podtopologyspread/common.go#L136
func mergeLabelSetWithSelector(matchLabels labels.Set, s labels.Selector) labels.Selector {
mergedSelector := labels.SelectorFromSet(matchLabels)

requirements, ok := s.Requirements()
if !ok {
return s
}

for _, r := range requirements {
mergedSelector = mergedSelector.Add(r)
}

return mergedSelector
}
Loading