Skip to content

Commit

Permalink
Lepton commit
Browse files Browse the repository at this point in the history
  • Loading branch information
FillZpp committed Jan 6, 2025
1 parent 604f460 commit 82ce5ec
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 24 deletions.
11 changes: 0 additions & 11 deletions .github/workflows/krew-release.yml

This file was deleted.

55 changes: 55 additions & 0 deletions .github/workflows/lepton-build-image.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
name: Build Docker Image

on:
workflow_call:
inputs:
repo:
required: true
type: string
tag:
required: false
type: string
default: test$(git rev-parse --short HEAD)
commit:
required: false
type: string
default: $(git rev-parse --short HEAD)
runs-on:
required: false
type: string
default: ubuntu-latest

jobs:
build:
runs-on: ${{ inputs.runs-on }}
steps:
- uses: actions/checkout@v3
with:
submodules: recursive
- name: Install awscli
run: |
python -m pip install --upgrade pip
pip install awscli
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Build and push the Docker image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
run: |
set -x
docker build . --file Dockerfile \
--tag ${{ inputs.repo }}:${{ inputs.tag }} \
--build-arg GIT_COMMIT=${{ inputs.commit }} \
--build-arg REPO=$ECR_REGISTRY/ecr-public
echo "tagging container image with ${{ inputs.repo }}:${{ inputs.tag }}}"
docker tag ${{ inputs.repo }}:${{ inputs.tag }} $ECR_REGISTRY/${{ inputs.repo }}:${{ inputs.tag }}
docker push $ECR_REGISTRY/${{ inputs.repo }}:${{ inputs.tag }}
12 changes: 12 additions & 0 deletions .github/workflows/lepton-pr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: lepton-pr

on:
pull_request:
branches: ["**"]

jobs:
build-image:
uses: ./.github/workflows/lepton-build-image.yaml
secrets: inherit
with:
repo: lepton-kueue
15 changes: 15 additions & 0 deletions .github/workflows/lepton-release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: lepton-release

on:
push:
tags:
- "*"

jobs:
build-image:
uses: ./.github/workflows/lepton-build-image.yaml
secrets: inherit
with:
repo: lepton-kueue
tag: ${GITHUB_REF##*/}
commit: ${GITHUB_REF##*/}
8 changes: 7 additions & 1 deletion pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,13 @@ func (c *ClusterQueueSnapshot) BorrowingWith(fr resources.FlavorResource, val in
// Cohort. When the ClusterQueue/Cohort is in debt, Available
// will return 0.
func (c *ClusterQueueSnapshot) Available(fr resources.FlavorResource) int64 {
return max(0, available(c, fr))
val, _ := available(c, fr)
return max(0, val)
}

func (c *ClusterQueueSnapshot) AvailableV2(fr resources.FlavorResource) (int64, bool) {
val, isRoot := available(c, fr)
return max(0, val), isRoot
}

// PotentialAvailable returns the largest workload this ClusterQueue could
Expand Down
11 changes: 7 additions & 4 deletions pkg/cache/resource_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,24 @@ type hierarchicalResourceNode interface {
// This function may return a negative number in the case of
// overadmission - e.g. capacity was removed or the node moved to
// another Cohort.
func available(node hierarchicalResourceNode, fr resources.FlavorResource) int64 {
func available(node hierarchicalResourceNode, fr resources.FlavorResource) (_ int64, _isRoot bool) {
r := node.getResourceNode()
if !node.HasParent() {
return r.SubtreeQuota[fr] - r.Usage[fr]
return r.SubtreeQuota[fr] - r.Usage[fr], true
}
localAvailable := max(0, r.guaranteedQuota(fr)-r.Usage[fr])
parentAvailable := available(node.parentHRN(), fr)
parentAvailable, parentIsRoot := available(node.parentHRN(), fr)

if borrowingLimit := r.Quotas[fr].BorrowingLimit; borrowingLimit != nil {
storedInParent := r.SubtreeQuota[fr] - r.guaranteedQuota(fr)
usedInParent := max(0, r.Usage[fr]-r.guaranteedQuota(fr))
withMaxFromParent := storedInParent - usedInParent + *borrowingLimit
if parentAvailable >= withMaxFromParent {
parentIsRoot = false
}
parentAvailable = min(withMaxFromParent, parentAvailable)
}
return localAvailable + parentAvailable
return localAvailable + parentAvailable, parentIsRoot
}

// potentialAvailable returns the maximum capacity available to this node,
Expand Down
18 changes: 18 additions & 0 deletions pkg/lepton/apis/lepton_apis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package apis

import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

const (
labelCanPreempt = "kueue.lepton.ai/can-preempt"
labelCanBePreempted = "kueue.lepton.ai/can-be-preempted"
)

func CanPreempt(wl *kueue.Workload) bool {
return wl.Labels[labelCanPreempt] == "true"
}

func CanBePreempted(wl *kueue.Workload) bool {
return wl.Labels[labelCanBePreempted] == "true"
}
25 changes: 17 additions & 8 deletions pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/features"
leptonapis "sigs.k8s.io/kueue/pkg/lepton/apis"
"sigs.k8s.io/kueue/pkg/resources"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -604,7 +605,7 @@ func (a *FlavorAssigner) fitsResourceQuota(log logr.Logger, fr resources.FlavorR
var status Status

borrow := a.cq.BorrowingWith(fr, val) && a.cq.HasParent()
available := a.cq.Available(fr)
available, availableFromRoot := a.cq.AvailableV2(fr)
maxCapacity := a.cq.PotentialAvailable(fr)

// No Fit
Expand All @@ -621,17 +622,25 @@ func (a *FlavorAssigner) fitsResourceQuota(log logr.Logger, fr resources.FlavorR

// Check if preemption is possible
mode := noFit
if val <= rQuota.Nominal {
if leptonapis.CanPreempt(a.wl.Obj) {
mode = preempt
if a.oracle.IsReclaimPossible(log, a.cq, *a.wl, fr, val) {
mode = reclaim
} else {
if val <= rQuota.Nominal {
mode = preempt
if a.oracle.IsReclaimPossible(log, a.cq, *a.wl, fr, val) {
mode = reclaim
}
} else if a.canPreemptWhileBorrowing() {
mode = preempt
}
} else if a.canPreemptWhileBorrowing() {
mode = preempt
}

status.append(fmt.Sprintf("insufficient unused quota for %s in flavor %s, %s more needed",
fr.Resource, fr.Flavor, resources.ResourceQuantityString(fr.Resource, val-available)))
msg := fmt.Sprintf("insufficient unused quota for %s in flavor %s, %s more needed",
fr.Resource, fr.Flavor, resources.ResourceQuantityString(fr.Resource, val-available))
if availableFromRoot {
msg += " from root"
}
status.append(msg)

return mode, borrow, &status
}
Expand Down
87 changes: 87 additions & 0 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
leptonapis "sigs.k8s.io/kueue/pkg/lepton/apis"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
"sigs.k8s.io/kueue/pkg/scheduler/flavorassigner"
Expand Down Expand Up @@ -116,9 +117,23 @@ type Target struct {
func (p *Preemptor) GetTargets(log logr.Logger, wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*Target {
frsNeedPreemption := flavorResourcesNeedPreemption(assignment)
requests := assignment.TotalRequestsFor(&wl)
if leptonapis.CanPreempt(wl.Obj) {
return p.getTargetsByLepton(log, wl, requests, frsNeedPreemption, snapshot)
}
return p.getTargets(log, wl, requests, frsNeedPreemption, snapshot)
}

func (p *Preemptor) getTargetsByLepton(log logr.Logger, wl workload.Info, requests resources.FlavorResourceQuantities,
frsNeedPreemption sets.Set[resources.FlavorResource], snapshot *cache.Snapshot) []*Target {
cq := snapshot.ClusterQueues[wl.ClusterQueue]
candidates := p.findCandidatesByLepton(wl.Obj, cq, frsNeedPreemption)
if len(candidates) == 0 {
return nil
}
sort.Slice(candidates, candidatesOrderingByLepton(candidates, cq.Name, p.clock.Now()))
return minimalPreemptions(log, requests, cq, snapshot, frsNeedPreemption, candidates, true, nil)
}

func (p *Preemptor) getTargets(log logr.Logger, wl workload.Info, requests resources.FlavorResourceQuantities,
frsNeedPreemption sets.Set[resources.FlavorResource], snapshot *cache.Snapshot) []*Target {
cq := snapshot.ClusterQueues[wl.ClusterQueue]
Expand Down Expand Up @@ -483,6 +498,44 @@ func flavorResourcesNeedPreemption(assignment flavorassigner.Assignment) sets.Se
return resPerFlavor
}

func (p *Preemptor) findCandidatesByLepton(wl *kueue.Workload, cq *cache.ClusterQueueSnapshot, frsNeedPreemption sets.Set[resources.FlavorResource]) []*workload.Info {
var candidates []*workload.Info
wlPriority := priority.Priority(wl)

for _, candidateWl := range cq.Workloads {
if canBeCandidateByLepton(wlPriority, candidateWl, frsNeedPreemption) {
candidates = append(candidates, candidateWl)
}
}

if cq.HasParent() {
for _, cohortCQ := range cq.Parent().Root().SubtreeClusterQueues() {
if cq == cohortCQ {
continue
}
for _, candidateWl := range cohortCQ.Workloads {
if canBeCandidateByLepton(wlPriority, candidateWl, frsNeedPreemption) {
candidates = append(candidates, candidateWl)
}
}
}
}
return candidates
}

func canBeCandidateByLepton(selfPriority int32, candidateWl *workload.Info, frsNeedPreemption sets.Set[resources.FlavorResource]) bool {
if !leptonapis.CanBePreempted(candidateWl.Obj) {
return false
}
if priority.Priority(candidateWl.Obj) >= selfPriority {
return false
}
if !workloadUsesResources(candidateWl, frsNeedPreemption) {
return false
}
return true
}

// findCandidates obtains candidates for preemption within the ClusterQueue and
// cohort that respect the preemption policy and are using a resource that the
// preempting workload needs.
Expand Down Expand Up @@ -579,6 +632,40 @@ func queueUnderNominalInResourcesNeedingPreemption(frsNeedPreemption sets.Set[re
return true
}

// candidatesOrdering criteria:
// 0. Workloads already marked for preemption first.
// 1. Workloads in the same ClusterQueue before the ones from other ClusterQueues in the cohort as the preemptor.
// 2. Workloads with lower priority first.
// 3. Workloads admitted more recently first.
func candidatesOrderingByLepton(candidates []*workload.Info, cq string, now time.Time) func(int, int) bool {
return func(i, j int) bool {
a := candidates[i]
b := candidates[j]
aEvicted := meta.IsStatusConditionTrue(a.Obj.Status.Conditions, kueue.WorkloadEvicted)
bEvicted := meta.IsStatusConditionTrue(b.Obj.Status.Conditions, kueue.WorkloadEvicted)
if aEvicted != bEvicted {
return aEvicted
}
aInCQ := a.ClusterQueue == cq
bInCQ := b.ClusterQueue == cq
if aInCQ != bInCQ {
return aInCQ
}
pa := priority.Priority(a.Obj)
pb := priority.Priority(b.Obj)
if pa != pb {
return pa < pb
}
timeA := quotaReservationTime(a.Obj, now)
timeB := quotaReservationTime(b.Obj, now)
if !timeA.Equal(timeB) {
return timeA.After(timeB)
}
// Arbitrary comparison for deterministic sorting.
return a.Obj.UID < b.Obj.UID
}
}

// candidatesOrdering criteria:
// 0. Workloads already marked for preemption first.
// 1. Workloads from other ClusterQueues in the cohort before the ones in the
Expand Down

0 comments on commit 82ce5ec

Please sign in to comment.