Skip to content

Commit

Permalink
webhook: adding support for adding pod group labels
Browse files Browse the repository at this point in the history
Problem: we need every pod object coming into the cluster to be
part of a group.
Solution: This change adds logic to the mutating webhook to
add the labels that indicate the group name and size. We
can eventually add flexibility here. I also realize that
we can easily watch for job objects first, and add the
group size/name to the pod template. This will be much
more efficient to then not have to add to the individual
pods that are part of a larger job. With this approach
I was able to create a fluence scheduled pod, and then
see my labels added! It does not do anything beyond that.
I am also adding a nice script that makes it easy to
build, load, and install fluence freshly, otherwise you
will use all your internet data for the month in like,
two days. Do not do that :P

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Feb 18, 2024
1 parent 0492e87 commit 2029a1d
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 176 deletions.
63 changes: 14 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,6 @@ helm install \
fluence as-a-second-scheduler/
```

If you load your images into your testing environment and don't need to pull, you can change the pull policy too:

```bash
helm install \
--set scheduler.image=vanessa/fluence:latest \
--set scheduler.sidecarimage=vanessa/fluence-sidecar \
--set controller.image=vanessa/fluence-controller \
--set scheduler.sidecarPullPolicy=IfNotPresent \
fluence as-a-second-scheduler/
```

If you need to uninstall (e.g., to redo something):

```bash
Expand Down Expand Up @@ -433,31 +422,27 @@ make proto

#### Workflow

The easiest thing to do is to build the containers in some container namespace that you control (meaning you can push to a registry), e.g.,:
You should first do these on your own:

```bash
make build REGISTRY=ghcr.io/vsoch
```
1. Create the kind cluster (`kubectl apply -f ./examples/kind-cluster.yaml`)
2. Install the certificate manager.

If needed, create a "multi node" kind cluster:
I was having trouble developing this easily because it's a lot of steps to build and load containers and change directories and uninstall/install the charts, so I put together a small script that does the following:

```bash
kind create cluster --config ./examples/kind-config.yaml
```
1. Takes a registry of interest (probably doesn't matter since we are working locally, defaults to `ghcr.io/vsoch`
2. builds all three images, the controller, sidecar, and fluence
3. loads them all into kind
4. changes directory to the charts
5. uninstalls the fluence helm instance (if installed)
6. installs it, targeted the images just built, and setting pullPolicy to never

And then install with your custom images:
The last step ensures we use the images we loaded! You can basically just do:

```bash
cd ./upstream/manifests/install/charts
helm install \
--set scheduler.image=ghcr.io/vsoch/fluence:latest \
--set controller.image=ghcr.io/vsoch/fluence-controller:latest \
--set scheduler.sidecarimage=ghcr.io/vsoch/fluence-sidecar:latest \
fluence as-a-second-scheduler/
./hack/quick-build.sh
```

And then apply what you need to test, and look at logs!
And then keep doing that until you get what you want :) Note that I haven't found a good way for the VSCode developer tools to work because we develop fluence outside of the tree it's supposed to be in.
This sped up my development time immensely. If you want to manually do the steps, see that script for instructions.

##### kubectl plugin

Expand All @@ -472,26 +457,7 @@ helm install \
fluence as-a-second-scheduler/
```

For this setup if you are developing locally with kind, you will need to enable the ingress. Here is `kind-config.yaml`

```yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 4242
hostPort: 4242
protocol: TCP
```
And to create:
For this setup if you are developing locally with kind, you will need to enable the ingress, as is done in [examples/kind-config.yaml](examples/kind-config.yaml).

```bash
kind create cluster --config ./kind-config.yaml
Expand All @@ -500,7 +466,6 @@ kind create cluster --config ./kind-config.yaml
#### TODO

- Try what [kueue does](https://github.com/kubernetes-sigs/kueue/blob/6d57813a52066dab412735deeeb60ebb0cdb8e8e/cmd/kueue/main.go#L146-L155) to not require cert-manager.
- Possible bug with using kind (with custom config we are scheduling things to the control plane) - need to verify this didn't start happening with mutating webhook addition.

#### Vanessa Thinking

Expand Down
36 changes: 36 additions & 0 deletions hack/quick-build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash

# Before running this, you should:
# 1. create the kind cluster (needs more than one node, fluence does not scheduler to the control plane)
# 2. Install cert-manager
# 3. Customize the script to point to your registry if you intend to push

REGISTRY="${1:-ghcr.io/vsoch}"
HERE=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
ROOT=$(dirname ${HERE})

# Go to the script directory
cd ${ROOT}

# These build each of the images. The sidecar is separate from the other two in src/
make REGISTRY=${REGISTRY} SCHEDULER_IMAGE=fluence SIDECAR_IMAGE=fluence-sidecar CONTROLLER_IMAGE=fluence-controller

# This is what it might look like to push
# docker push ghcr.io/vsoch/fluence-sidecar && docker push ghcr.io/vsoch/fluence-controller && docker push ghcr.io/vsoch/fluence:latest

# We load into kind so we don't need to push/pull and use up internet data ;)
kind load docker-image ${REGISTRY}/fluence-sidecar:latest
kind load docker-image ${REGISTRY}/fluence-controller:latest
kind load docker-image ${REGISTRY}/fluence:latest

# And then install using the charts. The pull policy ensures we use the loaded ones
cd ${ROOT}/upstream/manifests/install/charts
helm uninstall fluence || true
helm install \
--set scheduler.image=${REGISTRY}/fluence:latest \
--set scheduler.sidecarPullPolicy=Never \
--set scheduler.pullPolicy=Never \
--set controller.pullPolicy=Never \
--set controller.image=${REGISTRY}/fluence-controller:latest \
--set scheduler.sidecarimage=${REGISTRY}/fluence-sidecar:latest \
fluence as-a-second-scheduler/
165 changes: 47 additions & 118 deletions sig-scheduler-plugins/apis/scheduling/v1alpha1/podgroup_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"fmt"
"net/http"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
"sigs.k8s.io/scheduler-plugins/pkg/fluence/labels"
)

var (
Expand All @@ -41,150 +41,79 @@ func NewMutatingWebhook(mgr manager.Manager) *fluenceWatcher {
}

// mutate-v1-fluence

type fluenceWatcher struct {
decoder *admission.Decoder
}

func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admission.Response {

logger.Info("Running webhook handle")
// First try for job
job := &batchv1.Job{}
err := a.decoder.Decode(req, job)
if err != nil {

// Try for a pod next
pod := &corev1.Pod{}
err := a.decoder.Decode(req, pod)
if err != nil {
logger.Error(err, "Admission error.")
return admission.Errored(http.StatusBadRequest, err)
}

// If we get here, we decoded a pod
/*err = a.InjectPod(pod)
if err != nil {
logger.Error("Inject pod error.", err)
return admission.Errored(http.StatusBadRequest, err)
}*/

// Mutate the fields in pod
marshalledPod, err := json.Marshal(pod)
if err != nil {
logger.Error(err, "Marshalling pod error.")
return admission.Errored(http.StatusInternalServerError, err)
}
logger.Info("Admission pod success.")
return admission.PatchResponseFromRaw(req.Object.Raw, marshalledPod)
// Assume we operate on the level of pods for now
pod := &corev1.Pod{}
err := a.decoder.Decode(req, pod)
if err != nil {
logger.Error(err, "Admission error.")
return admission.Errored(http.StatusBadRequest, err)
}
/*
// If we get here, we found a job
err = a.InjectJob(job)

if err != nil {
logger.Error("Inject job error.", err)
return admission.Errored(http.StatusBadRequest, err)
}*/
// If we get here, we decoded a pod
err = a.EnsureGroup(pod)
if err != nil {
logger.Error(err, "Issue adding PodGroup.")
return admission.Errored(http.StatusBadRequest, err)
}

marshalledJob, err := json.Marshal(job)
logger.Info("Admission pod success.")

marshalledPod, err := json.Marshal(pod)
if err != nil {
logger.Error(err, "Marshalling job error.")
logger.Error(err, "Marshalling pod error.")
return admission.Errored(http.StatusInternalServerError, err)
}

logger.Info("Admission job success.")
return admission.PatchResponseFromRaw(req.Object.Raw, marshalledJob)
return admission.PatchResponseFromRaw(req.Object.Raw, marshalledPod)

}

// Default is the expected entrypoint for a webhook
// I don't remember how this is different from handle
func (a *fluenceWatcher) Default(ctx context.Context, obj runtime.Object) error {
pod, ok := obj.(*corev1.Pod)
if !ok {
job, ok := obj.(*batchv1.Job)
if !ok {
return fmt.Errorf("expected a Pod or Job but got a %T", obj)
}
logger.Info(fmt.Sprintf("Job %s is marked for fluence.", job.Name))
return nil
// return a.InjectJob(job)
return fmt.Errorf("expected a Pod or Job but got a %T", obj)
}
logger.Info(fmt.Sprintf("Pod %s is marked for fluence.", pod.Name))
return nil
//return a.InjectPod(pod)
return a.EnsureGroup(pod)
}

// InjectPod adds the sidecar container to a pod
func (a *fluenceWatcher) InjectPod(pod *corev1.Pod) error {

/*
// Cut out early if we have no labels
if pod.Annotations == nil {
logger.Info(fmt.Sprintf("Pod %s is not marked for oras storage.", pod.Name))
return nil
}
// Parse oras known labels into settings
settings := orasSettings.NewOrasCacheSettings(pod.Annotations)
// Cut out early if no oras identifiers!
if !settings.MarkedForOras {
logger.Warnf("Pod %s is not marked for oras storage.", pod.Name)
return nil
}
// Validate, return error if no good here.
if !settings.Validate() {
logger.Warnf("Pod %s oras storage did not validate.", pod.Name)
return fmt.Errorf("oras storage was requested but is not valid")
}
// The selector for the namespaced registry is the namespace
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
// Even pods without say, the launcher, that are marked should have the network added
pod.Labels[defaults.OrasSelectorKey] = pod.ObjectMeta.Namespace
oras.AddSidecar(&pod.Spec, pod.ObjectMeta.Namespace, settings)
logger.Info(fmt.Sprintf("Pod %s is marked for oras storage.", pod.Name))*/
return nil
}
// EnsureGroup adds pod group label and size if not present
// This ensures that every pod passing through is part of a group.
// Note that we need to do similar for Job.
// A pod without a job wrapper, and without metadata is a group
// of size 1.
func (a *fluenceWatcher) EnsureGroup(pod *corev1.Pod) error {

// InjectJob adds the sidecar container to the PodTemplateSpec of the Job
func (a *fluenceWatcher) InjectJob(job *batchv1.Job) error {

/*
// Cut out early if we have no labels
if job.Annotations == nil {
logger.Info(fmt.Sprintf("Job %s is not marked for oras storage.", job.Name))
return nil
}
// Parse oras known labels into settings
settings := orasSettings.NewOrasCacheSettings(job.Annotations)
// Cut out early if no oras identifiers!
if !settings.MarkedForOras {
logger.Warnf("Job %s is not marked for oras storage.", job.Name)
return nil
}
// Validate, return error if no good here.
if !settings.Validate() {
logger.Warnf("Job %s oras storage did not validate.", job.Name)
return fmt.Errorf("oras storage was requested but is not valid")
}
// Add the sidecar to the podspec of the job
if job.Spec.Template.Labels == nil {
job.Spec.Template.Labels = map[string]string{}
}
// Add network to spec template so all pods are targeted
job.Spec.Template.Labels[defaults.OrasSelectorKey] = job.ObjectMeta.Namespace
oras.AddSidecar(&job.Spec.Template.Spec, job.ObjectMeta.Namespace, settings)
logger.Info(fmt.Sprintf("Job %s is marked for oras storage.", job.Name))*/
// Add labels if we don't have anything. Everything is a group!
if pod.Labels == nil {
pod.Labels = map[string]string{}
}

// Do we have a group name?
groupName, ok := pod.Labels[labels.PodGroupNameLabel]

// If we don't have a fluence group, create one under fluence namespace
if !ok {
groupName = fmt.Sprintf("fluence-group-%s-%s", pod.Namespace, pod.Name)
pod.Labels[labels.PodGroupNameLabel] = groupName
}

// Do we have a group size? This will be parsed as a string, likely
groupSize, ok := pod.Labels[labels.PodGroupSizeLabel]
if !ok {
groupSize = "1"
pod.Labels[labels.PodGroupSizeLabel] = groupSize
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ controller:
name: scheduler-plugins-controller
image: ghcr.io/flux-framework/fluence-controller:latest
replicaCount: 1
pullPolicy: IfNotPresent
pullPolicy: Always

# LoadVariationRiskBalancing and TargetLoadPacking are not enabled by default
# as they need extra RBAC privileges on metrics.k8s.io.
Expand Down
Loading

0 comments on commit 2029a1d

Please sign in to comment.