diff --git a/.github/workflows/krew-release.yml b/.github/workflows/krew-release.yml deleted file mode 100644 index c5bc611b71a..00000000000 --- a/.github/workflows/krew-release.yml +++ /dev/null @@ -1,11 +0,0 @@ -on: - release: - types: [released] -jobs: - krew-release: - runs-on: ubuntu-24.04 - steps: - - name: Checkout - uses: actions/checkout@v4 - - name: Update new version in krew-index - uses: rajatjindal/krew-release-bot@v0.0.47 \ No newline at end of file diff --git a/.github/workflows/lepton-build-image.yaml b/.github/workflows/lepton-build-image.yaml new file mode 100644 index 00000000000..ac6cd59e12d --- /dev/null +++ b/.github/workflows/lepton-build-image.yaml @@ -0,0 +1,55 @@ +name: Build Docker Image + +on: + workflow_call: + inputs: + repo: + required: true + type: string + tag: + required: false + type: string + default: test$(git rev-parse --short HEAD) + commit: + required: false + type: string + default: $(git rev-parse --short HEAD) + runs-on: + required: false + type: string + default: ubuntu-latest + +jobs: + build: + runs-on: ${{ inputs.runs-on }} + steps: + - uses: actions/checkout@v3 + with: + submodules: recursive + - name: Install awscli + run: | + python -m pip install --upgrade pip + pip install awscli + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v2 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + - name: Build and push the Docker image + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + run: | + set -x + + docker build . --file Dockerfile \ + --tag ${{ inputs.repo }}:${{ inputs.tag }} \ + --build-arg GIT_COMMIT=${{ inputs.commit }} \ + --build-arg REPO=$ECR_REGISTRY/ecr-public + + echo "tagging container image with ${{ inputs.repo }}:${{ inputs.tag }}}" + docker tag ${{ inputs.repo }}:${{ inputs.tag }} $ECR_REGISTRY/${{ inputs.repo }}:${{ inputs.tag }} + docker push $ECR_REGISTRY/${{ inputs.repo }}:${{ inputs.tag }} \ No newline at end of file diff --git a/.github/workflows/lepton-pr.yaml b/.github/workflows/lepton-pr.yaml new file mode 100644 index 00000000000..dda86c1e7ed --- /dev/null +++ b/.github/workflows/lepton-pr.yaml @@ -0,0 +1,12 @@ +name: lepton-pr + +on: + pull_request: + branches: ["**"] + +jobs: + build-image: + uses: ./.github/workflows/lepton-build-image.yaml + secrets: inherit + with: + repo: lepton-kueue \ No newline at end of file diff --git a/.github/workflows/lepton-release.yaml b/.github/workflows/lepton-release.yaml new file mode 100644 index 00000000000..bd53fe9605f --- /dev/null +++ b/.github/workflows/lepton-release.yaml @@ -0,0 +1,15 @@ +name: lepton-release + +on: + push: + tags: + - "*" + +jobs: + build-image: + uses: ./.github/workflows/lepton-build-image.yaml + secrets: inherit + with: + repo: lepton-kueue + tag: ${GITHUB_REF##*/} + commit: ${GITHUB_REF##*/} \ No newline at end of file diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index 915653d90b7..2119d55e35b 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -53,6 +53,7 @@ var ( // holds admitted workloads. type clusterQueue struct { Name string + Annotations map[string]string ResourceGroups []ResourceGroup Workloads map[string]*workload.Info WorkloadsNotReady sets.Set[string] @@ -150,6 +151,8 @@ func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, i } } + c.Annotations = in.Annotations + nsSelector, err := metav1.LabelSelectorAsSelector(in.Spec.NamespaceSelector) if err != nil { return err diff --git a/pkg/cache/clusterqueue_snapshot.go b/pkg/cache/clusterqueue_snapshot.go index 39075cfeb63..0e9f7387df1 100644 --- a/pkg/cache/clusterqueue_snapshot.go +++ b/pkg/cache/clusterqueue_snapshot.go @@ -31,6 +31,7 @@ import ( type ClusterQueueSnapshot struct { Name string + Annotations map[string]string ResourceGroups []ResourceGroup Workloads map[string]*workload.Info WorkloadsNotReady sets.Set[string] @@ -102,7 +103,13 @@ func (c *ClusterQueueSnapshot) BorrowingWith(fr resources.FlavorResource, val in // Cohort. When the ClusterQueue/Cohort is in debt, Available // will return 0. func (c *ClusterQueueSnapshot) Available(fr resources.FlavorResource) int64 { - return max(0, available(c, fr)) + val, _ := available(c, fr) + return max(0, val) +} + +func (c *ClusterQueueSnapshot) AvailableV2(fr resources.FlavorResource) (int64, bool) { + val, isRoot := available(c, fr) + return max(0, val), isRoot } // PotentialAvailable returns the largest workload this ClusterQueue could diff --git a/pkg/cache/resource_node.go b/pkg/cache/resource_node.go index d1e9695cef0..248f57baebc 100644 --- a/pkg/cache/resource_node.go +++ b/pkg/cache/resource_node.go @@ -84,21 +84,24 @@ type hierarchicalResourceNode interface { // This function may return a negative number in the case of // overadmission - e.g. capacity was removed or the node moved to // another Cohort. -func available(node hierarchicalResourceNode, fr resources.FlavorResource) int64 { +func available(node hierarchicalResourceNode, fr resources.FlavorResource) (_ int64, _isRoot bool) { r := node.getResourceNode() if !node.HasParent() { - return r.SubtreeQuota[fr] - r.Usage[fr] + return r.SubtreeQuota[fr] - r.Usage[fr], true } localAvailable := max(0, r.guaranteedQuota(fr)-r.Usage[fr]) - parentAvailable := available(node.parentHRN(), fr) + parentAvailable, parentIsRoot := available(node.parentHRN(), fr) if borrowingLimit := r.Quotas[fr].BorrowingLimit; borrowingLimit != nil { storedInParent := r.SubtreeQuota[fr] - r.guaranteedQuota(fr) usedInParent := max(0, r.Usage[fr]-r.guaranteedQuota(fr)) withMaxFromParent := storedInParent - usedInParent + *borrowingLimit + if parentAvailable >= withMaxFromParent { + parentIsRoot = false + } parentAvailable = min(withMaxFromParent, parentAvailable) } - return localAvailable + parentAvailable + return localAvailable + parentAvailable, parentIsRoot } // potentialAvailable returns the maximum capacity available to this node, diff --git a/pkg/cache/snapshot.go b/pkg/cache/snapshot.go index f04fe419fd7..adf620e0b08 100644 --- a/pkg/cache/snapshot.go +++ b/pkg/cache/snapshot.go @@ -138,6 +138,7 @@ func (c *Cache) Snapshot(ctx context.Context) (*Snapshot, error) { func snapshotClusterQueue(c *clusterQueue) *ClusterQueueSnapshot { cc := &ClusterQueueSnapshot{ Name: c.Name, + Annotations: c.Annotations, ResourceGroups: make([]ResourceGroup, len(c.ResourceGroups)), FlavorFungibility: c.FlavorFungibility, FairWeight: c.FairWeight, diff --git a/pkg/controller/jobs/job/job_controller.go b/pkg/controller/jobs/job/job_controller.go index 4bf84f91f46..e09ceecde7c 100644 --- a/pkg/controller/jobs/job/job_controller.go +++ b/pkg/controller/jobs/job/job_controller.go @@ -20,9 +20,11 @@ import ( "context" "fmt" "strconv" + "time" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -38,8 +40,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" + podcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/pod" "sigs.k8s.io/kueue/pkg/podset" clientutil "sigs.k8s.io/kueue/pkg/util/client" ) @@ -165,10 +169,58 @@ func (j *Job) Suspend() { j.Spec.Suspend = ptr.To(true) } -func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, _ string) (bool, error) { +func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason jobframework.StopReason, eventMsg string) (bool, error) { object := j.Object() stoppedNow := false + // Lepton: patch preempted condition to all pods belong to this job + podList := &corev1.PodList{} + if err := c.List(ctx, podList, client.InNamespace(j.Namespace), client.MatchingFields{"LepPodOwnerRefUID": string(j.UID)}); err != nil { + return false, fmt.Errorf("failed to list pods: %w", err) + } + for i := range podList.Items { + pod := &podList.Items[i] + if !pod.DeletionTimestamp.IsZero() || + pod.Status.Phase == corev1.PodFailed || + pod.Status.Phase == corev1.PodSucceeded { + continue + } + var exists bool + for _, cond := range pod.Status.Conditions { + if cond.Type == podcontroller.ConditionTypeTerminationTarget { + exists = true + break + } + } + if exists { + continue + } + pCopy := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: pod.UID, + Name: pod.Name, + Namespace: pod.Namespace, + }, + TypeMeta: pod.TypeMeta, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: podcontroller.ConditionTypeTerminationTarget, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{ + Time: time.Now(), + }, + Reason: string(stopReason), + Message: eventMsg, + }, + }, + }, + } + if err := c.Status().Patch(ctx, pCopy, client.Apply, client.FieldOwner(constants.KueueName)); err != nil && !apierrors.IsNotFound(err) { + return false, fmt.Errorf("failed to patch pod %s status before suspending job: %w", pod.Name, err) + } + } + if !j.IsSuspended() { if err := clientutil.Patch(ctx, c, object, true, func() (bool, error) { j.Suspend() @@ -346,6 +398,20 @@ func SetupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error { if err := fieldIndexer.IndexField(ctx, &batchv1.Job{}, indexer.OwnerReferenceUID, indexer.IndexOwnerUID); err != nil { return err } + if err := fieldIndexer.IndexField(ctx, &corev1.Pod{}, "LepPodOwnerRefUID", func(o client.Object) []string { + pod, ok := o.(*corev1.Pod) + if !ok { + return nil + } + + ownerRef := metav1.GetControllerOf(pod) + if ownerRef == nil { + return nil + } + return []string{string(ownerRef.UID)} + }); err != nil { + return err + } return jobframework.SetupWorkloadOwnerIndex(ctx, fieldIndexer, gvk) } diff --git a/pkg/lepton/apis/lepton_apis.go b/pkg/lepton/apis/lepton_apis.go new file mode 100644 index 00000000000..94591efac05 --- /dev/null +++ b/pkg/lepton/apis/lepton_apis.go @@ -0,0 +1,40 @@ +package apis + +import ( + "encoding/json" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" +) + +const ( + labelCanPreempt = "kueue.lepton.ai/can-preempt" + labelCanBePreempted = "kueue.lepton.ai/can-be-preempted" + + annotationPreemptionPolicy = "kueue.lepton.ai/preemption-policy" +) + +func CanPreempt(wl *kueue.Workload) bool { + return wl.Labels[labelCanPreempt] == "true" +} + +func CanBePreempted(wl *kueue.Workload) bool { + return wl.Labels[labelCanBePreempted] == "true" +} + +type PreemptionPolicy struct { + CrossNamespaces bool `json:"crossNamespaces,omitempty"` + MaxPriorityThreshold *int32 `json:"maxPriorityThreshold,omitempty"` +} + +func GetQueuePreemptionPolicy(annotations map[string]string) PreemptionPolicy { + val, ok := annotations[annotationPreemptionPolicy] + if !ok { + return PreemptionPolicy{} + } + + var p PreemptionPolicy + if err := json.Unmarshal([]byte(val), &p); err != nil { + return PreemptionPolicy{} + } + return p +} diff --git a/pkg/scheduler/flavorassigner/flavorassigner.go b/pkg/scheduler/flavorassigner/flavorassigner.go index 4bddd08bcfe..682ac4aa18a 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner.go +++ b/pkg/scheduler/flavorassigner/flavorassigner.go @@ -35,6 +35,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/features" + leptonapis "sigs.k8s.io/kueue/pkg/lepton/apis" "sigs.k8s.io/kueue/pkg/resources" "sigs.k8s.io/kueue/pkg/workload" ) @@ -604,7 +605,7 @@ func (a *FlavorAssigner) fitsResourceQuota(log logr.Logger, fr resources.FlavorR var status Status borrow := a.cq.BorrowingWith(fr, val) && a.cq.HasParent() - available := a.cq.Available(fr) + available, availableFromRoot := a.cq.AvailableV2(fr) maxCapacity := a.cq.PotentialAvailable(fr) // No Fit @@ -621,17 +622,25 @@ func (a *FlavorAssigner) fitsResourceQuota(log logr.Logger, fr resources.FlavorR // Check if preemption is possible mode := noFit - if val <= rQuota.Nominal { + if leptonapis.CanPreempt(a.wl.Obj) { mode = preempt - if a.oracle.IsReclaimPossible(log, a.cq, *a.wl, fr, val) { - mode = reclaim + } else { + if val <= rQuota.Nominal { + mode = preempt + if a.oracle.IsReclaimPossible(log, a.cq, *a.wl, fr, val) { + mode = reclaim + } + } else if a.canPreemptWhileBorrowing() { + mode = preempt } - } else if a.canPreemptWhileBorrowing() { - mode = preempt } - status.append(fmt.Sprintf("insufficient unused quota for %s in flavor %s, %s more needed", - fr.Resource, fr.Flavor, resources.ResourceQuantityString(fr.Resource, val-available))) + msg := fmt.Sprintf("insufficient unused quota for %s in flavor %s, %s more needed", + fr.Resource, fr.Flavor, resources.ResourceQuantityString(fr.Resource, val-available)) + if availableFromRoot { + msg += " from root" + } + status.append(msg) return mode, borrow, &status } diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index 23369c2ac1b..41f0fcf6d9b 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -38,6 +38,7 @@ import ( config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" + leptonapis "sigs.k8s.io/kueue/pkg/lepton/apis" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/resources" "sigs.k8s.io/kueue/pkg/scheduler/flavorassigner" @@ -116,9 +117,23 @@ type Target struct { func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*Target { frsNeedPreemption := flavorResourcesNeedPreemption(assignment) requests := assignment.TotalRequestsFor(&wl) + if leptonapis.CanPreempt(wl.Obj) { + return p.getTargetsByLepton(log, wl, requests, frsNeedPreemption, snapshot) + } return p.getTargets(log, wl, requests, frsNeedPreemption, snapshot) } +func (p *Preemptor) getTargetsByLepton(log logr.Logger, wl workload.Info, requests resources.FlavorResourceQuantities, + frsNeedPreemption sets.Set[resources.FlavorResource], snapshot *cache.Snapshot) []*Target { + cq := snapshot.ClusterQueues[wl.ClusterQueue] + candidates := p.findCandidatesByLepton(wl.Obj, cq, frsNeedPreemption) + if len(candidates) == 0 { + return nil + } + sort.Slice(candidates, candidatesOrderingByLepton(candidates, cq.Name, p.clock.Now())) + return minimalPreemptions(log, requests, cq, snapshot, frsNeedPreemption, candidates, true, nil) +} + func (p *Preemptor) getTargets(log logr.Logger, wl workload.Info, requests resources.FlavorResourceQuantities, frsNeedPreemption sets.Set[resources.FlavorResource], snapshot *cache.Snapshot) []*Target { cq := snapshot.ClusterQueues[wl.ClusterQueue] @@ -483,6 +498,52 @@ func flavorResourcesNeedPreemption(assignment flavorassigner.Assignment) sets.Se return resPerFlavor } +func (p *Preemptor) findCandidatesByLepton(wl *kueue.Workload, cq *cache.ClusterQueueSnapshot, frsNeedPreemption sets.Set[resources.FlavorResource]) []*workload.Info { + var candidates []*workload.Info + preemptionPolicy := leptonapis.GetQueuePreemptionPolicy(cq.Annotations) + + for _, candidateWl := range cq.Workloads { + if canBeCandidateByLepton(preemptionPolicy, wl, candidateWl, frsNeedPreemption) { + candidates = append(candidates, candidateWl) + } + } + + if cq.HasParent() { + for _, cohortCQ := range cq.Parent().Root().SubtreeClusterQueues() { + if cq == cohortCQ { + continue + } + for _, candidateWl := range cohortCQ.Workloads { + if canBeCandidateByLepton(preemptionPolicy, wl, candidateWl, frsNeedPreemption) { + candidates = append(candidates, candidateWl) + } + } + } + } + return candidates +} + +func canBeCandidateByLepton(preemptionPolicy leptonapis.PreemptionPolicy, selfWl *kueue.Workload, candidateWl *workload.Info, frsNeedPreemption sets.Set[resources.FlavorResource]) bool { + selfPriority := priority.Priority(selfWl) + candidatePriority := priority.Priority(candidateWl.Obj) + if !leptonapis.CanBePreempted(candidateWl.Obj) { + return false + } + if candidatePriority >= selfPriority { + return false + } + if !workloadUsesResources(candidateWl, frsNeedPreemption) { + return false + } + if !preemptionPolicy.CrossNamespaces && selfWl.Namespace != candidateWl.Obj.Namespace { + return false + } + if preemptionPolicy.MaxPriorityThreshold != nil && candidatePriority > *preemptionPolicy.MaxPriorityThreshold { + return false + } + return true +} + // findCandidates obtains candidates for preemption within the ClusterQueue and // cohort that respect the preemption policy and are using a resource that the // preempting workload needs. @@ -579,6 +640,40 @@ func queueUnderNominalInResourcesNeedingPreemption(frsNeedPreemption sets.Set[re return true } +// candidatesOrdering criteria: +// 0. Workloads already marked for preemption first. +// 1. Workloads in the same ClusterQueue before the ones from other ClusterQueues in the cohort as the preemptor. +// 2. Workloads with lower priority first. +// 3. Workloads admitted more recently first. +func candidatesOrderingByLepton(candidates []*workload.Info, cq string, now time.Time) func(int, int) bool { + return func(i, j int) bool { + a := candidates[i] + b := candidates[j] + aEvicted := meta.IsStatusConditionTrue(a.Obj.Status.Conditions, kueue.WorkloadEvicted) + bEvicted := meta.IsStatusConditionTrue(b.Obj.Status.Conditions, kueue.WorkloadEvicted) + if aEvicted != bEvicted { + return aEvicted + } + aInCQ := a.ClusterQueue == cq + bInCQ := b.ClusterQueue == cq + if aInCQ != bInCQ { + return aInCQ + } + pa := priority.Priority(a.Obj) + pb := priority.Priority(b.Obj) + if pa != pb { + return pa < pb + } + timeA := quotaReservationTime(a.Obj, now) + timeB := quotaReservationTime(b.Obj, now) + if !timeA.Equal(timeB) { + return timeA.After(timeB) + } + // Arbitrary comparison for deterministic sorting. + return a.Obj.UID < b.Obj.UID + } +} + // candidatesOrdering criteria: // 0. Workloads already marked for preemption first. // 1. Workloads from other ClusterQueues in the cohort before the ones in the