Skip to content

Commit

Permalink
added priorityClass to Clusters and ClusterSets
Browse files Browse the repository at this point in the history
  • Loading branch information
enrichman committed Jan 10, 2025
1 parent 5892121 commit 91aa784
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 1 deletion.
5 changes: 5 additions & 0 deletions charts/k3k/crds/k3k.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ spec:
required:
- type
type: object
priorityClass:
description: |-
PriorityClass is the priorityClassName that will be applied to all server/agent pods.
In "shared" mode the priorityClassName will be applied also to the workloads.
type: string
serverArgs:
description: ServerArgs are the ordered key value pairs (e.x. "testArg",
"testValue") for the K3s pods running in server mode.
Expand Down
4 changes: 4 additions & 0 deletions charts/k3k/crds/k3k.io_clustersets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ spec:
description: DefaultNodeSelector is the node selector that applies
to all clusters (server + agent) in the set
type: object
defaultPriorityClass:
description: DefaultPriorityClass is the priorityClassName applied
to all pods of all clusters in the set
type: string
disableNetworkPolicy:
description: DisableNetworkPolicy is an option that will disable the
creation of a default networkpolicy for cluster isolation
Expand Down
1 change: 1 addition & 0 deletions examples/clusterset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ metadata:
# - "shared"
# - "virtual"
# podSecurityAdmissionLevel: "baseline"
# defaultPriorityClass: "lowpriority"
1 change: 1 addition & 0 deletions k3k-kubelet/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func (p *Provider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
tPod.Spec.NodeName = ""

tPod.Spec.NodeSelector = cluster.Spec.NodeSelector
tPod.Spec.PriorityClassName = cluster.Spec.PriorityClass

// volumes will often refer to resources in the virtual cluster, but instead need to refer to the sync'd
// host cluster version
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/k3k.io/v1alpha1/set_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type ClusterSetSpec struct {
// DefaultNodeSelector is the node selector that applies to all clusters (server + agent) in the set
DefaultNodeSelector map[string]string `json:"defaultNodeSelector,omitempty"`

// DefaultPriorityClass is the priorityClassName applied to all pods of all clusters in the set
DefaultPriorityClass string `json:"defaultPriorityClass,omitempty"`

// DisableNetworkPolicy is an option that will disable the creation of a default networkpolicy for cluster isolation
DisableNetworkPolicy bool `json:"disableNetworkPolicy,omitempty"`

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/k3k.io/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type ClusterSpec struct {
// +optional
NodeSelector map[string]string `json:"nodeSelector,omitempty"`

// PriorityClass is the priorityClassName that will be applied to all server/agent pods.
// In "shared" mode the priorityClassName will be applied also to the workloads.
PriorityClass string `json:"priorityClass,omitempty"`

// Limit is the limits that apply for the server/worker nodes.
Limit *ClusterLimit `json:"clusterLimit,omitempty"`

Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/cluster/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func (s *Server) podSpec(image, name string, persistent bool, affinitySelector *
limit = s.cluster.Spec.Limit.ServerLimit
}
podSpec := v1.PodSpec{
NodeSelector: s.cluster.Spec.NodeSelector,
NodeSelector: s.cluster.Spec.NodeSelector,
PriorityClassName: s.cluster.Spec.PriorityClass,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
Expand Down
59 changes: 59 additions & 0 deletions pkg/controller/clusterset/clusterset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clusterset

import (
"context"
"errors"
"reflect"

"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
Expand Down Expand Up @@ -60,6 +61,10 @@ func Add(ctx context.Context, mgr manager.Manager, clusterCIDR string, logger *l
handler.EnqueueRequestsFromMapFunc(namespaceEventHandler(reconciler)),
builder.WithPredicates(namespaceLabelsPredicate()),
).
Watches(
&v1alpha1.Cluster{},
handler.EnqueueRequestsFromMapFunc(sameNamespaceEventHandler(reconciler)),
).
Complete(&reconciler)
}

Expand All @@ -83,6 +88,26 @@ func namespaceEventHandler(reconciler ClusterSetReconciler) handler.MapFunc {
}
}

// namespaceEventHandler will enqueue reconciling requests for all the ClusterSets in the changed namespace
func sameNamespaceEventHandler(reconciler ClusterSetReconciler) handler.MapFunc {
return func(ctx context.Context, obj client.Object) []reconcile.Request {
var requests []reconcile.Request

var set v1alpha1.ClusterSetList
_ = reconciler.Client.List(ctx, &set, client.InNamespace(obj.GetNamespace()))
for _, clusterSet := range set.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: clusterSet.Name,
Namespace: obj.GetNamespace(),
},
})
}

return requests
}
}

// namespaceLabelsPredicate returns a predicate that will allow a reconciliation if the labels of a Namespace changed
func namespaceLabelsPredicate() predicate.Predicate {
return predicate.Funcs{
Expand Down Expand Up @@ -111,6 +136,10 @@ func (c *ClusterSetReconciler) Reconcile(ctx context.Context, req reconcile.Requ
return reconcile.Result{}, err
}

if err := c.reconcileClusters(ctx, log, &clusterSet); err != nil {
return reconcile.Result{}, err
}

// TODO: Add resource quota for clustersets
// if clusterSet.Spec.MaxLimits != nil {
// quota := v1.ResourceQuota{
Expand Down Expand Up @@ -271,3 +300,33 @@ func (c *ClusterSetReconciler) reconcileNamespacePodSecurityLabels(ctx context.C
}
return nil
}

func (c *ClusterSetReconciler) reconcileClusters(ctx context.Context, log *zap.SugaredLogger, clusterSet *v1alpha1.ClusterSet) error {
log.Info("reconciling Clusters")

var clusters v1alpha1.ClusterList
if err := c.Client.List(ctx, &clusters, ctrlruntimeclient.InNamespace(clusterSet.Namespace)); err != nil {
return err
}

var err error

for _, cluster := range clusters.Items {
oldClusterSpec := cluster.Spec

if cluster.Spec.PriorityClass != clusterSet.Spec.DefaultPriorityClass {
cluster.Spec.PriorityClass = clusterSet.Spec.DefaultPriorityClass
}

if !reflect.DeepEqual(cluster.Spec.NodeSelector, clusterSet.Spec.DefaultNodeSelector) {
cluster.Spec.NodeSelector = clusterSet.Spec.DefaultNodeSelector
}

if !reflect.DeepEqual(oldClusterSpec, cluster.Spec) {
// continue updating also the other clusters even if an error occurred
err = errors.Join(c.Client.Update(ctx, &cluster))
}
}

return err
}
203 changes: 203 additions & 0 deletions pkg/controller/clusterset/clusterset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clusterset_test

import (
"context"
"reflect"
"time"

"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
Expand All @@ -13,6 +14,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -463,5 +465,206 @@ var _ = Describe("ClusterSet Controller", func() {
Expect(ns.Labels).Should(HaveKeyWithValue("pod-security.kubernetes.io/enforce-version", "latest"))
})
})

When("a cluster in the same namespace is present", func() {
It("should update it if needed", func() {
clusterSet := &v1alpha1.ClusterSet{
ObjectMeta: v1.ObjectMeta{
GenerateName: "clusterset-",
Namespace: namespace,
},
Spec: v1alpha1.ClusterSetSpec{
DefaultPriorityClass: "foobar",
},
}

err := k8sClient.Create(ctx, clusterSet)
Expect(err).To(Not(HaveOccurred()))

cluster := &v1alpha1.Cluster{
ObjectMeta: v1.ObjectMeta{
GenerateName: "cluster-",
Namespace: namespace,
},
Spec: v1alpha1.ClusterSpec{
Mode: v1alpha1.SharedClusterMode,
Servers: ptr.To(int32(1)),
Agents: ptr.To(int32(0)),
},
}

err = k8sClient.Create(ctx, cluster)
Expect(err).To(Not(HaveOccurred()))

// wait a bit
Eventually(func() bool {
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
err = k8sClient.Get(ctx, key, cluster)
Expect(err).To(Not(HaveOccurred()))
return cluster.Spec.PriorityClass == clusterSet.Spec.DefaultPriorityClass
}).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(BeTrue())
})

It("should update the nodeSelector", func() {
clusterSet := &v1alpha1.ClusterSet{
ObjectMeta: v1.ObjectMeta{
GenerateName: "clusterset-",
Namespace: namespace,
},
Spec: v1alpha1.ClusterSetSpec{
DefaultNodeSelector: map[string]string{"label-1": "value-1"},
},
}

err := k8sClient.Create(ctx, clusterSet)
Expect(err).To(Not(HaveOccurred()))

cluster := &v1alpha1.Cluster{
ObjectMeta: v1.ObjectMeta{
GenerateName: "cluster-",
Namespace: namespace,
},
Spec: v1alpha1.ClusterSpec{
Mode: v1alpha1.SharedClusterMode,
Servers: ptr.To(int32(1)),
Agents: ptr.To(int32(0)),
},
}

err = k8sClient.Create(ctx, cluster)
Expect(err).To(Not(HaveOccurred()))

// wait a bit
Eventually(func() bool {
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
err = k8sClient.Get(ctx, key, cluster)
Expect(err).To(Not(HaveOccurred()))
return reflect.DeepEqual(cluster.Spec.NodeSelector, clusterSet.Spec.DefaultNodeSelector)
}).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(BeTrue())
})

It("should update the nodeSelector if changed", func() {
clusterSet := &v1alpha1.ClusterSet{
ObjectMeta: v1.ObjectMeta{
GenerateName: "clusterset-",
Namespace: namespace,
},
Spec: v1alpha1.ClusterSetSpec{
DefaultNodeSelector: map[string]string{"label-1": "value-1"},
},
}

err := k8sClient.Create(ctx, clusterSet)
Expect(err).To(Not(HaveOccurred()))

cluster := &v1alpha1.Cluster{
ObjectMeta: v1.ObjectMeta{
GenerateName: "cluster-",
Namespace: namespace,
},
Spec: v1alpha1.ClusterSpec{
Mode: v1alpha1.SharedClusterMode,
Servers: ptr.To(int32(1)),
Agents: ptr.To(int32(0)),
NodeSelector: map[string]string{"label-1": "value-1"},
},
}

err = k8sClient.Create(ctx, cluster)
Expect(err).To(Not(HaveOccurred()))

Expect(cluster.Spec.NodeSelector).To(Equal(clusterSet.Spec.DefaultNodeSelector))

// update the ClusterSet
clusterSet.Spec.DefaultNodeSelector["label-2"] = "value-2"
err = k8sClient.Update(ctx, clusterSet)
Expect(err).To(Not(HaveOccurred()))
Expect(cluster.Spec.NodeSelector).To(Not(Equal(clusterSet.Spec.DefaultNodeSelector)))

// wait a bit
Eventually(func() bool {
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
err = k8sClient.Get(ctx, key, cluster)
Expect(err).To(Not(HaveOccurred()))
return reflect.DeepEqual(cluster.Spec.NodeSelector, clusterSet.Spec.DefaultNodeSelector)
}).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(BeTrue())

// Update the Cluster
cluster.Spec.NodeSelector["label-3"] = "value-3"
err = k8sClient.Update(ctx, cluster)
Expect(err).To(Not(HaveOccurred()))
Expect(cluster.Spec.NodeSelector).To(Not(Equal(clusterSet.Spec.DefaultNodeSelector)))

// wait a bit and check it's restored
Eventually(func() bool {
var updatedCluster v1alpha1.Cluster

key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
err = k8sClient.Get(ctx, key, &updatedCluster)
Expect(err).To(Not(HaveOccurred()))
return reflect.DeepEqual(updatedCluster.Spec.NodeSelector, clusterSet.Spec.DefaultNodeSelector)
}).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(BeTrue())
})
})

When("a cluster in a different namespace is present", func() {
It("should not be update", func() {
clusterSet := &v1alpha1.ClusterSet{
ObjectMeta: v1.ObjectMeta{
GenerateName: "clusterset-",
Namespace: namespace,
},
Spec: v1alpha1.ClusterSetSpec{
DefaultPriorityClass: "foobar",
},
}

err := k8sClient.Create(ctx, clusterSet)
Expect(err).To(Not(HaveOccurred()))

namespace2 := &corev1.Namespace{ObjectMeta: v1.ObjectMeta{GenerateName: "ns-"}}
err = k8sClient.Create(ctx, namespace2)
Expect(err).To(Not(HaveOccurred()))

cluster := &v1alpha1.Cluster{
ObjectMeta: v1.ObjectMeta{
GenerateName: "cluster-",
Namespace: namespace2.Name,
},
Spec: v1alpha1.ClusterSpec{
Mode: v1alpha1.SharedClusterMode,
Servers: ptr.To(int32(1)),
Agents: ptr.To(int32(0)),
},
}

err = k8sClient.Create(ctx, cluster)
Expect(err).To(Not(HaveOccurred()))

// it should not change!
Eventually(func() bool {
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
err = k8sClient.Get(ctx, key, cluster)
Expect(err).To(Not(HaveOccurred()))
return cluster.Spec.PriorityClass != clusterSet.Spec.DefaultPriorityClass
}).
MustPassRepeatedly(5).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(BeTrue())
})
})
})
})

0 comments on commit 91aa784

Please sign in to comment.