From 2029a1de62d285d3d0fb93386047b8a617821725 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sat, 17 Feb 2024 19:57:09 -0700 Subject: [PATCH] webhook: adding support for adding pod group labels Problem: we need every pod object coming into the cluster to be part of a group. Solution: This change adds logic to the mutating webhook to add the labels that indicate the group name and size. We can eventually add flexibility here. I also realize that we can easily watch for job objects first, and add the group size/name to the pod template. This will be much more efficient to then not have to add to the individual pods that are part of a larger job. With this approach I was able to create a fluence scheduled pod, and then see my labels added! It does not do anything beyond that. I am also adding a nice script that makes it easy to build, load, and install fluence freshly, otherwise you will use all your internet data for the month in like, two days. Do not do that :P Signed-off-by: vsoch --- README.md | 63 ++----- hack/quick-build.sh | 36 ++++ .../scheduling/v1alpha1/podgroup_webhook.go | 165 +++++------------- .../charts/as-a-second-scheduler/values.yaml | 2 +- .../pkg/controllers/podgroup_controller.go | 29 ++- .../pkg/fluence/group/group.go | 10 +- .../pkg/fluence/labels/labels.go | 8 + 7 files changed, 137 insertions(+), 176 deletions(-) create mode 100755 hack/quick-build.sh create mode 100644 sig-scheduler-plugins/pkg/fluence/labels/labels.go diff --git a/README.md b/README.md index 3821ad8..4b38014 100644 --- a/README.md +++ b/README.md @@ -225,17 +225,6 @@ helm install \ fluence as-a-second-scheduler/ ``` -If you load your images into your testing environment and don't need to pull, you can change the pull policy too: - -```bash -helm install \ - --set scheduler.image=vanessa/fluence:latest \ - --set scheduler.sidecarimage=vanessa/fluence-sidecar \ - --set controller.image=vanessa/fluence-controller \ - --set scheduler.sidecarPullPolicy=IfNotPresent \ - fluence as-a-second-scheduler/ -``` - If you need to uninstall (e.g., to redo something): ```bash @@ -433,31 +422,27 @@ make proto #### Workflow -The easiest thing to do is to build the containers in some container namespace that you control (meaning you can push to a registry), e.g.,: +You should first do these on your own: -```bash -make build REGISTRY=ghcr.io/vsoch -``` +1. Create the kind cluster (`kubectl apply -f ./examples/kind-cluster.yaml`) +2. Install the certificate manager. -If needed, create a "multi node" kind cluster: +I was having trouble developing this easily because it's a lot of steps to build and load containers and change directories and uninstall/install the charts, so I put together a small script that does the following: -```bash -kind create cluster --config ./examples/kind-config.yaml -``` +1. Takes a registry of interest (probably doesn't matter since we are working locally, defaults to `ghcr.io/vsoch` +2. builds all three images, the controller, sidecar, and fluence +3. loads them all into kind +4. changes directory to the charts +5. uninstalls the fluence helm instance (if installed) +6. installs it, targeted the images just built, and setting pullPolicy to never -And then install with your custom images: +The last step ensures we use the images we loaded! You can basically just do: ```bash -cd ./upstream/manifests/install/charts -helm install \ - --set scheduler.image=ghcr.io/vsoch/fluence:latest \ - --set controller.image=ghcr.io/vsoch/fluence-controller:latest \ - --set scheduler.sidecarimage=ghcr.io/vsoch/fluence-sidecar:latest \ - fluence as-a-second-scheduler/ +./hack/quick-build.sh ``` -And then apply what you need to test, and look at logs! -And then keep doing that until you get what you want :) Note that I haven't found a good way for the VSCode developer tools to work because we develop fluence outside of the tree it's supposed to be in. +This sped up my development time immensely. If you want to manually do the steps, see that script for instructions. ##### kubectl plugin @@ -472,26 +457,7 @@ helm install \ fluence as-a-second-scheduler/ ``` -For this setup if you are developing locally with kind, you will need to enable the ingress. Here is `kind-config.yaml` - -```yaml -kind: Cluster -apiVersion: kind.x-k8s.io/v1alpha4 -nodes: -- role: control-plane - kubeadmConfigPatches: - - | - kind: InitConfiguration - nodeRegistration: - kubeletExtraArgs: - node-labels: "ingress-ready=true" - extraPortMappings: - - containerPort: 4242 - hostPort: 4242 - protocol: TCP -``` - -And to create: +For this setup if you are developing locally with kind, you will need to enable the ingress, as is done in [examples/kind-config.yaml](examples/kind-config.yaml). ```bash kind create cluster --config ./kind-config.yaml @@ -500,7 +466,6 @@ kind create cluster --config ./kind-config.yaml #### TODO - Try what [kueue does](https://github.com/kubernetes-sigs/kueue/blob/6d57813a52066dab412735deeeb60ebb0cdb8e8e/cmd/kueue/main.go#L146-L155) to not require cert-manager. - - Possible bug with using kind (with custom config we are scheduling things to the control plane) - need to verify this didn't start happening with mutating webhook addition. #### Vanessa Thinking diff --git a/hack/quick-build.sh b/hack/quick-build.sh new file mode 100755 index 0000000..23a5c87 --- /dev/null +++ b/hack/quick-build.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +# Before running this, you should: +# 1. create the kind cluster (needs more than one node, fluence does not scheduler to the control plane) +# 2. Install cert-manager +# 3. Customize the script to point to your registry if you intend to push + +REGISTRY="${1:-ghcr.io/vsoch}" +HERE=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +ROOT=$(dirname ${HERE}) + +# Go to the script directory +cd ${ROOT} + +# These build each of the images. The sidecar is separate from the other two in src/ +make REGISTRY=${REGISTRY} SCHEDULER_IMAGE=fluence SIDECAR_IMAGE=fluence-sidecar CONTROLLER_IMAGE=fluence-controller + +# This is what it might look like to push +# docker push ghcr.io/vsoch/fluence-sidecar && docker push ghcr.io/vsoch/fluence-controller && docker push ghcr.io/vsoch/fluence:latest + +# We load into kind so we don't need to push/pull and use up internet data ;) +kind load docker-image ${REGISTRY}/fluence-sidecar:latest +kind load docker-image ${REGISTRY}/fluence-controller:latest +kind load docker-image ${REGISTRY}/fluence:latest + +# And then install using the charts. The pull policy ensures we use the loaded ones +cd ${ROOT}/upstream/manifests/install/charts +helm uninstall fluence || true +helm install \ + --set scheduler.image=${REGISTRY}/fluence:latest \ + --set scheduler.sidecarPullPolicy=Never \ + --set scheduler.pullPolicy=Never \ + --set controller.pullPolicy=Never \ + --set controller.image=${REGISTRY}/fluence-controller:latest \ + --set scheduler.sidecarimage=${REGISTRY}/fluence-sidecar:latest \ + fluence as-a-second-scheduler/ diff --git a/sig-scheduler-plugins/apis/scheduling/v1alpha1/podgroup_webhook.go b/sig-scheduler-plugins/apis/scheduling/v1alpha1/podgroup_webhook.go index 55c4d45..6139884 100644 --- a/sig-scheduler-plugins/apis/scheduling/v1alpha1/podgroup_webhook.go +++ b/sig-scheduler-plugins/apis/scheduling/v1alpha1/podgroup_webhook.go @@ -15,12 +15,12 @@ import ( "fmt" "net/http" - batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + "sigs.k8s.io/scheduler-plugins/pkg/fluence/labels" ) var ( @@ -41,7 +41,6 @@ func NewMutatingWebhook(mgr manager.Manager) *fluenceWatcher { } // mutate-v1-fluence - type fluenceWatcher struct { decoder *admission.Decoder } @@ -49,142 +48,72 @@ type fluenceWatcher struct { func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admission.Response { logger.Info("Running webhook handle") - // First try for job - job := &batchv1.Job{} - err := a.decoder.Decode(req, job) - if err != nil { - // Try for a pod next - pod := &corev1.Pod{} - err := a.decoder.Decode(req, pod) - if err != nil { - logger.Error(err, "Admission error.") - return admission.Errored(http.StatusBadRequest, err) - } - - // If we get here, we decoded a pod - /*err = a.InjectPod(pod) - if err != nil { - logger.Error("Inject pod error.", err) - return admission.Errored(http.StatusBadRequest, err) - }*/ - - // Mutate the fields in pod - marshalledPod, err := json.Marshal(pod) - if err != nil { - logger.Error(err, "Marshalling pod error.") - return admission.Errored(http.StatusInternalServerError, err) - } - logger.Info("Admission pod success.") - return admission.PatchResponseFromRaw(req.Object.Raw, marshalledPod) + // Assume we operate on the level of pods for now + pod := &corev1.Pod{} + err := a.decoder.Decode(req, pod) + if err != nil { + logger.Error(err, "Admission error.") + return admission.Errored(http.StatusBadRequest, err) } - /* - // If we get here, we found a job - err = a.InjectJob(job) - if err != nil { - logger.Error("Inject job error.", err) - return admission.Errored(http.StatusBadRequest, err) - }*/ + // If we get here, we decoded a pod + err = a.EnsureGroup(pod) + if err != nil { + logger.Error(err, "Issue adding PodGroup.") + return admission.Errored(http.StatusBadRequest, err) + } - marshalledJob, err := json.Marshal(job) + logger.Info("Admission pod success.") + marshalledPod, err := json.Marshal(pod) if err != nil { - logger.Error(err, "Marshalling job error.") + logger.Error(err, "Marshalling pod error.") return admission.Errored(http.StatusInternalServerError, err) } logger.Info("Admission job success.") - return admission.PatchResponseFromRaw(req.Object.Raw, marshalledJob) + return admission.PatchResponseFromRaw(req.Object.Raw, marshalledPod) + } // Default is the expected entrypoint for a webhook +// I don't remember how this is different from handle func (a *fluenceWatcher) Default(ctx context.Context, obj runtime.Object) error { pod, ok := obj.(*corev1.Pod) if !ok { - job, ok := obj.(*batchv1.Job) - if !ok { - return fmt.Errorf("expected a Pod or Job but got a %T", obj) - } - logger.Info(fmt.Sprintf("Job %s is marked for fluence.", job.Name)) - return nil - // return a.InjectJob(job) + return fmt.Errorf("expected a Pod or Job but got a %T", obj) } logger.Info(fmt.Sprintf("Pod %s is marked for fluence.", pod.Name)) - return nil - //return a.InjectPod(pod) + return a.EnsureGroup(pod) } -// InjectPod adds the sidecar container to a pod -func (a *fluenceWatcher) InjectPod(pod *corev1.Pod) error { - - /* - // Cut out early if we have no labels - if pod.Annotations == nil { - logger.Info(fmt.Sprintf("Pod %s is not marked for oras storage.", pod.Name)) - return nil - } - - // Parse oras known labels into settings - settings := orasSettings.NewOrasCacheSettings(pod.Annotations) - - // Cut out early if no oras identifiers! - if !settings.MarkedForOras { - logger.Warnf("Pod %s is not marked for oras storage.", pod.Name) - return nil - } - - // Validate, return error if no good here. - if !settings.Validate() { - logger.Warnf("Pod %s oras storage did not validate.", pod.Name) - return fmt.Errorf("oras storage was requested but is not valid") - } - - // The selector for the namespaced registry is the namespace - if pod.Labels == nil { - pod.Labels = map[string]string{} - } - - // Even pods without say, the launcher, that are marked should have the network added - pod.Labels[defaults.OrasSelectorKey] = pod.ObjectMeta.Namespace - oras.AddSidecar(&pod.Spec, pod.ObjectMeta.Namespace, settings) - logger.Info(fmt.Sprintf("Pod %s is marked for oras storage.", pod.Name))*/ - return nil -} +// EnsureGroup adds pod group label and size if not present +// This ensures that every pod passing through is part of a group. +// Note that we need to do similar for Job. +// A pod without a job wrapper, and without metadata is a group +// of size 1. +func (a *fluenceWatcher) EnsureGroup(pod *corev1.Pod) error { -// InjectJob adds the sidecar container to the PodTemplateSpec of the Job -func (a *fluenceWatcher) InjectJob(job *batchv1.Job) error { - - /* - // Cut out early if we have no labels - if job.Annotations == nil { - logger.Info(fmt.Sprintf("Job %s is not marked for oras storage.", job.Name)) - return nil - } - - // Parse oras known labels into settings - settings := orasSettings.NewOrasCacheSettings(job.Annotations) - - // Cut out early if no oras identifiers! - if !settings.MarkedForOras { - logger.Warnf("Job %s is not marked for oras storage.", job.Name) - return nil - } - - // Validate, return error if no good here. - if !settings.Validate() { - logger.Warnf("Job %s oras storage did not validate.", job.Name) - return fmt.Errorf("oras storage was requested but is not valid") - } - - // Add the sidecar to the podspec of the job - if job.Spec.Template.Labels == nil { - job.Spec.Template.Labels = map[string]string{} - } - - // Add network to spec template so all pods are targeted - job.Spec.Template.Labels[defaults.OrasSelectorKey] = job.ObjectMeta.Namespace - oras.AddSidecar(&job.Spec.Template.Spec, job.ObjectMeta.Namespace, settings) - logger.Info(fmt.Sprintf("Job %s is marked for oras storage.", job.Name))*/ + // Add labels if we don't have anything. Everything is a group! + if pod.Labels == nil { + pod.Labels = map[string]string{} + } + + // Do we have a group name? + groupName, ok := pod.Labels[labels.PodGroupNameLabel] + + // If we don't have a fluence group, create one under fluence namespace + if !ok { + groupName = fmt.Sprintf("fluence-group-%s-%s", pod.Namespace, pod.Name) + pod.Labels[labels.PodGroupNameLabel] = groupName + } + + // Do we have a group size? This will be parsed as a string, likely + groupSize, ok := pod.Labels[labels.PodGroupSizeLabel] + if !ok { + groupSize = "1" + pod.Labels[labels.PodGroupSizeLabel] = groupSize + } return nil } diff --git a/sig-scheduler-plugins/manifests/install/charts/as-a-second-scheduler/values.yaml b/sig-scheduler-plugins/manifests/install/charts/as-a-second-scheduler/values.yaml index a5a7870..e48aa98 100644 --- a/sig-scheduler-plugins/manifests/install/charts/as-a-second-scheduler/values.yaml +++ b/sig-scheduler-plugins/manifests/install/charts/as-a-second-scheduler/values.yaml @@ -23,7 +23,7 @@ controller: name: scheduler-plugins-controller image: ghcr.io/flux-framework/fluence-controller:latest replicaCount: 1 - pullPolicy: IfNotPresent + pullPolicy: Always # LoadVariationRiskBalancing and TargetLoadPacking are not enabled by default # as they need extra RBAC privileges on metrics.k8s.io. diff --git a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go index 02eb4e4..fc8e8d4 100644 --- a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go +++ b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go @@ -199,11 +199,12 @@ func (r *PodGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { // podToPodGroup is a watcher that looks for pods and associated pod group func (r *PodGroupReconciler) podToPodGroup(ctx context.Context, obj client.Object) []ctrl.Request { + r.log.Info("PANCAKES pre get pod in podToPodGroup flux-framework/fluence-controller") pod, ok := obj.(*v1.Pod) if !ok { return nil } - r.log.Info("podToPodGroup flux-framework/fluence-controller") + r.log.Info("PANCAKES post get pod in podToPodGroup flux-framework/fluence-controller") r.log.V(5).Info("Running podToPodGroup", "pod", pod.Name, "namespace", pod.Namespace) pgName := util.GetPodGroupLabel(pod) if len(pgName) == 0 { @@ -212,6 +213,32 @@ func (r *PodGroupReconciler) podToPodGroup(ctx context.Context, obj client.Objec r.log.V(5).Info("Add pod group when pod gets added", "podGroup", pgName, "pod", pod.Name, "namespace", pod.Namespace) + // TODO we need an ability to trigger a create here. Likely we will just add + // the create function to watches. I'm wondering if we want to set the owner + // to the pod or the job that triggers? + // newPodGroup ensures we have a pod group + /*func newPodGroup(name, namespace string, size int32, pod *v1.Pod) { + + // Create an owner reference to the pod + // https://github.com/kubernetes/apimachinery/blob/master/pkg/apis/meta/v1/types.go#L295 + ownerRef := metav1.OwnerReferences{ + Kind: pod.ObjectMeta.Kind, + Name: pod.Name, + APIVersion: pod.ObjectMeta.APIVersion, + UID: pod.ObjectMeta.UID, + } + pg := PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + OwnerReferences: []metav1.OwnerReferences{ownerRef}, + }, + Spec: PodGroupSpec{ + MinMember: size, + }, + } + }*/ + return []ctrl.Request{{ NamespacedName: types.NamespacedName{ Namespace: pod.Namespace, diff --git a/sig-scheduler-plugins/pkg/fluence/group/group.go b/sig-scheduler-plugins/pkg/fluence/group/group.go index b681504..291ad17 100644 --- a/sig-scheduler-plugins/pkg/fluence/group/group.go +++ b/sig-scheduler-plugins/pkg/fluence/group/group.go @@ -10,11 +10,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" fcore "sigs.k8s.io/scheduler-plugins/pkg/fluence/core" -) - -const ( - PodGroupNameLabel = "fluence.pod-group" - PodGroupSizeLabel = "fluence.group-size" + "sigs.k8s.io/scheduler-plugins/pkg/fluence/labels" ) // getDefaultGroupName returns a group name based on the pod namespace and name @@ -71,13 +67,13 @@ func DeleteFluenceGroup(pod *v1.Pod) { // getFluenceGroupName looks for the group to indicate a fluence group, and returns it func getFluenceGroupName(pod *v1.Pod) string { - groupName, _ := pod.Labels[PodGroupNameLabel] + groupName, _ := pod.Labels[labels.PodGroupNameLabel] return groupName } // getFluenceGroupSize gets the size of the fluence group func getFluenceGroupSize(pod *v1.Pod) int32 { - size, _ := pod.Labels[PodGroupSizeLabel] + size, _ := pod.Labels[labels.PodGroupSizeLabel] // Default size of 1 if the label is not set (but name is) if size == "" { diff --git a/sig-scheduler-plugins/pkg/fluence/labels/labels.go b/sig-scheduler-plugins/pkg/fluence/labels/labels.go new file mode 100644 index 0000000..e409ddc --- /dev/null +++ b/sig-scheduler-plugins/pkg/fluence/labels/labels.go @@ -0,0 +1,8 @@ +package labels + +// Labels to be shared between different components + +const ( + PodGroupNameLabel = "fluence.pod-group" + PodGroupSizeLabel = "fluence.group-size" +)