Skip to content

Commit

Permalink
Lepton commit
Browse files Browse the repository at this point in the history
  • Loading branch information
FillZpp committed Jan 10, 2025
1 parent 3c7cae8 commit f6c518d
Show file tree
Hide file tree
Showing 13 changed files with 355 additions and 40 deletions.
11 changes: 0 additions & 11 deletions .github/workflows/krew-release.yml

This file was deleted.

55 changes: 55 additions & 0 deletions .github/workflows/lepton-build-image.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
name: Build Docker Image

on:
workflow_call:
inputs:
repo:
required: true
type: string
tag:
required: false
type: string
default: test$(git rev-parse --short HEAD)
commit:
required: false
type: string
default: $(git rev-parse --short HEAD)
runs-on:
required: false
type: string
default: ubuntu-latest

jobs:
build:
runs-on: ${{ inputs.runs-on }}
steps:
- uses: actions/checkout@v3
with:
submodules: recursive
- name: Install awscli
run: |
python -m pip install --upgrade pip
pip install awscli
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Build and push the Docker image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
run: |
set -x
docker build . --file Dockerfile \
--tag ${{ inputs.repo }}:${{ inputs.tag }} \
--build-arg GIT_COMMIT=${{ inputs.commit }} \
--build-arg REPO=$ECR_REGISTRY/ecr-public
echo "tagging container image with ${{ inputs.repo }}:${{ inputs.tag }}}"
docker tag ${{ inputs.repo }}:${{ inputs.tag }} $ECR_REGISTRY/${{ inputs.repo }}:${{ inputs.tag }}
docker push $ECR_REGISTRY/${{ inputs.repo }}:${{ inputs.tag }}
12 changes: 12 additions & 0 deletions .github/workflows/lepton-pr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: lepton-pr

on:
pull_request:
branches: ["**"]

jobs:
build-image:
uses: ./.github/workflows/lepton-build-image.yaml
secrets: inherit
with:
repo: lepton-kueue
15 changes: 15 additions & 0 deletions .github/workflows/lepton-release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: lepton-release

on:
push:
tags:
- "*"

jobs:
build-image:
uses: ./.github/workflows/lepton-build-image.yaml
secrets: inherit
with:
repo: lepton-kueue
tag: ${GITHUB_REF##*/}
commit: ${GITHUB_REF##*/}
3 changes: 3 additions & 0 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
// holds admitted workloads.
type clusterQueue struct {
Name string
Annotations map[string]string
ResourceGroups []ResourceGroup
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
Expand Down Expand Up @@ -150,6 +151,8 @@ func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, i
}
}

c.Annotations = in.Annotations

nsSelector, err := metav1.LabelSelectorAsSelector(in.Spec.NamespaceSelector)
if err != nil {
return err
Expand Down
9 changes: 8 additions & 1 deletion pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

type ClusterQueueSnapshot struct {
Name string
Annotations map[string]string
ResourceGroups []ResourceGroup
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
Expand Down Expand Up @@ -102,7 +103,13 @@ func (c *ClusterQueueSnapshot) BorrowingWith(fr resources.FlavorResource, val in
// Cohort. When the ClusterQueue/Cohort is in debt, Available
// will return 0.
func (c *ClusterQueueSnapshot) Available(fr resources.FlavorResource) int64 {
return max(0, available(c, fr))
val, _ := available(c, fr)
return max(0, val)
}

func (c *ClusterQueueSnapshot) AvailableV2(fr resources.FlavorResource) (int64, bool) {
val, isRoot := available(c, fr)
return max(0, val), isRoot
}

// PotentialAvailable returns the largest workload this ClusterQueue could
Expand Down
11 changes: 7 additions & 4 deletions pkg/cache/resource_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,24 @@ type hierarchicalResourceNode interface {
// This function may return a negative number in the case of
// overadmission - e.g. capacity was removed or the node moved to
// another Cohort.
func available(node hierarchicalResourceNode, fr resources.FlavorResource) int64 {
func available(node hierarchicalResourceNode, fr resources.FlavorResource) (_ int64, _isRoot bool) {
r := node.getResourceNode()
if !node.HasParent() {
return r.SubtreeQuota[fr] - r.Usage[fr]
return r.SubtreeQuota[fr] - r.Usage[fr], true
}
localAvailable := max(0, r.guaranteedQuota(fr)-r.Usage[fr])
parentAvailable := available(node.parentHRN(), fr)
parentAvailable, parentIsRoot := available(node.parentHRN(), fr)

if borrowingLimit := r.Quotas[fr].BorrowingLimit; borrowingLimit != nil {
storedInParent := r.SubtreeQuota[fr] - r.guaranteedQuota(fr)
usedInParent := max(0, r.Usage[fr]-r.guaranteedQuota(fr))
withMaxFromParent := storedInParent - usedInParent + *borrowingLimit
if parentAvailable >= withMaxFromParent {
parentIsRoot = false
}
parentAvailable = min(withMaxFromParent, parentAvailable)
}
return localAvailable + parentAvailable
return localAvailable + parentAvailable, parentIsRoot
}

// potentialAvailable returns the maximum capacity available to this node,
Expand Down
1 change: 1 addition & 0 deletions pkg/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (c *Cache) Snapshot(ctx context.Context) (*Snapshot, error) {
func snapshotClusterQueue(c *clusterQueue) *ClusterQueueSnapshot {
cc := &ClusterQueueSnapshot{
Name: c.Name,
Annotations: c.Annotations,
ResourceGroups: make([]ResourceGroup, len(c.ResourceGroups)),
FlavorFungibility: c.FlavorFungibility,
FairWeight: c.FairWeight,
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ func (r *ClusterQueueReconciler) Update(e event.UpdateEvent) bool {
return true
}
defer r.notifyWatchers(oldCq, newCq)
specUpdated := !equality.Semantic.DeepEqual(oldCq.Spec, newCq.Spec)
specUpdated := !equality.Semantic.DeepEqual(oldCq.Spec, newCq.Spec) ||
!equality.Semantic.DeepEqual(oldCq.Annotations, newCq.Annotations)

if err := r.cache.UpdateClusterQueue(newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in cache")
Expand Down
113 changes: 98 additions & 15 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"context"
"fmt"
"strconv"
"time"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -38,8 +40,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
podcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/pod"
leptonapis "sigs.k8s.io/kueue/pkg/lepton/apis"
"sigs.k8s.io/kueue/pkg/podset"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)
Expand Down Expand Up @@ -165,10 +170,58 @@ func (j *Job) Suspend() {
j.Spec.Suspend = ptr.To(true)
}

func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, _ string) (bool, error) {
func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason jobframework.StopReason, eventMsg string) (bool, error) {
object := j.Object()
stoppedNow := false

// Lepton: patch preempted condition to all pods belong to this job
podList := &corev1.PodList{}
if err := c.List(ctx, podList, client.InNamespace(j.Namespace), client.MatchingFields{"LepPodOwnerRefUID": string(j.UID)}); err != nil {
return false, fmt.Errorf("failed to list pods: %w", err)
}
for i := range podList.Items {
pod := &podList.Items[i]
if !pod.DeletionTimestamp.IsZero() ||
pod.Status.Phase == corev1.PodFailed ||
pod.Status.Phase == corev1.PodSucceeded {
continue
}
var exists bool
for _, cond := range pod.Status.Conditions {
if cond.Type == podcontroller.ConditionTypeTerminationTarget {
exists = true
break
}
}
if exists {
continue
}
pCopy := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
},
TypeMeta: pod.TypeMeta,
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{
{
Type: podcontroller.ConditionTypeTerminationTarget,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Time{
Time: time.Now(),
},
Reason: string(stopReason),
Message: eventMsg,
},
},
},
}
if err := c.Status().Patch(ctx, pCopy, client.Apply, client.FieldOwner(constants.KueueName)); err != nil && !apierrors.IsNotFound(err) {
return false, fmt.Errorf("failed to patch pod %s status before suspending job: %w", pod.Name, err)
}
}

if !j.IsSuspended() {
if err := clientutil.Patch(ctx, c, object, true, func() (bool, error) {
j.Suspend()
Expand All @@ -177,30 +230,46 @@ func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.Po
}
// We are using annotation to be sure that all updates finished successfully.
j.ObjectMeta.Annotations[StoppingAnnotation] = "true"
// Lepton customized
if j.ObjectMeta.Labels == nil {
j.ObjectMeta.Labels = map[string]string{}
}
j.ObjectMeta.Labels[leptonapis.LabelJobPreempted] = "true"
return true, nil
}); err != nil {
return false, fmt.Errorf("suspend: %w", err)
}
stoppedNow = true
}

// Reset start time if necessary, so we can update the scheduling directives.
if j.Status.StartTime != nil {
if err := clientutil.PatchStatus(ctx, c, object, func() (bool, error) {
j.Status.StartTime = nil
} else {
// Lepton customized
if err := clientutil.Patch(ctx, c, object, true, func() (bool, error) {
if j.ObjectMeta.Labels == nil {
j.ObjectMeta.Labels = map[string]string{}
}
j.ObjectMeta.Labels[leptonapis.LabelJobPreempted] = "true"
return true, nil
}); err != nil {
return stoppedNow, fmt.Errorf("reset status: %w", err)
return stoppedNow, fmt.Errorf("failed to patch lepton job preempted: %w", err)
}
}

if err := clientutil.Patch(ctx, c, object, true, func() (bool, error) {
j.RestorePodSetsInfo(podSetsInfo)
delete(j.ObjectMeta.Annotations, StoppingAnnotation)
return true, nil
}); err != nil {
return false, fmt.Errorf("restore info: %w", err)
}
//// Reset start time if necessary, so we can update the scheduling directives.
//if j.Status.StartTime != nil {
// if err := clientutil.PatchStatus(ctx, c, object, func() (bool, error) {
// j.Status.StartTime = nil
// return true, nil
// }); err != nil {
// return stoppedNow, fmt.Errorf("reset status: %w", err)
// }
//}
//
//if err := clientutil.Patch(ctx, c, object, true, func() (bool, error) {
// j.RestorePodSetsInfo(podSetsInfo)
// delete(j.ObjectMeta.Annotations, StoppingAnnotation)
// return true, nil
//}); err != nil {
// return false, fmt.Errorf("restore info: %w", err)
//}

return stoppedNow, nil
}
Expand Down Expand Up @@ -346,6 +415,20 @@ func SetupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {
if err := fieldIndexer.IndexField(ctx, &batchv1.Job{}, indexer.OwnerReferenceUID, indexer.IndexOwnerUID); err != nil {
return err
}
if err := fieldIndexer.IndexField(ctx, &corev1.Pod{}, "LepPodOwnerRefUID", func(o client.Object) []string {
pod, ok := o.(*corev1.Pod)
if !ok {
return nil
}

ownerRef := metav1.GetControllerOf(pod)
if ownerRef == nil {
return nil
}
return []string{string(ownerRef.UID)}
}); err != nil {
return err
}
return jobframework.SetupWorkloadOwnerIndex(ctx, fieldIndexer, gvk)
}

Expand Down
42 changes: 42 additions & 0 deletions pkg/lepton/apis/lepton_apis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package apis

import (
"encoding/json"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

const (
labelCanPreempt = "kueue.lepton.ai/can-preempt"
labelCanBePreempted = "kueue.lepton.ai/can-be-preempted"

annotationPreemptionStrategy = "kueue.lepton.ai/preemption-strategy"

LabelJobPreempted = "kueue.lepton.ai/job-preempted"
)

func CanPreempt(wl *kueue.Workload) bool {
return wl.Labels[labelCanPreempt] == "true"
}

func CanBePreempted(wl *kueue.Workload) bool {
return wl.Labels[labelCanBePreempted] == "true"
}

type PreemptionStrategy struct {
CrossNamespaces bool `json:"crossNamespaces,omitempty"`
MaxPriorityThreshold *int32 `json:"maxPriorityThreshold,omitempty"`
}

func GetQueuePreemptionStrategy(annotations map[string]string) PreemptionStrategy {
val, ok := annotations[annotationPreemptionStrategy]
if !ok {
return PreemptionStrategy{}
}

var p PreemptionStrategy
if err := json.Unmarshal([]byte(val), &p); err != nil {
return PreemptionStrategy{}
}
return p
}
Loading

0 comments on commit f6c518d

Please sign in to comment.