From a250385d21c8d0e0744d0ff1f9bb994c22df2f94 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 7 Mar 2024 01:33:45 -0700 Subject: [PATCH] refactor: testing idea to wrap coscheduling This is the "skeleton" of a new idea to wrap coscheduling, adding in the logic for fluence only where it is needed, likely in the PodGroup (in the new fluence/core/core that wraps the same in coscheduling). This is just a skeleton because we are deploying the sidecar with the wrapped scheduling and absolutely no logic ported over to AskFlux. I think I have a sense of where to put this, but wanted to save this vanilla/skeleton state in case we need to go back to it. Note that it did not work to have fluence inherit the functions from coscheduler, so I opted for a strategy of adding it as a helper field, and then just using it when necessary. Signed-off-by: vsoch --- README.md | 6 + examples/pod-group/lammps/lammps2.yaml | 4 +- examples/pod-group/lammps/lammps4-2.yaml | 4 +- examples/pod-group/lammps/lammps4-3.yaml | 4 +- examples/pod-group/lammps/lammps4.yaml | 4 +- examples/pod-group/lammps/lammps5.yaml | 4 +- examples/pod-group/lammps/lammps6.yaml | 4 +- examples/test_example/fluence-sized-job.yaml | 2 +- sig-scheduler-plugins/cmd/scheduler/main.go | 5 +- .../pkg/controllers/podgroup_controller.go | 46 +- sig-scheduler-plugins/pkg/fluence/README.md | 29 -- .../pkg/fluence/core/core.go | 392 ++++++++++++----- sig-scheduler-plugins/pkg/fluence/events.go | 166 ------- sig-scheduler-plugins/pkg/fluence/fluence.go | 414 ++++++++---------- .../pkg/fluence/labels/labels.go | 40 +- 15 files changed, 556 insertions(+), 568 deletions(-) delete mode 100644 sig-scheduler-plugins/pkg/fluence/README.md delete mode 100644 sig-scheduler-plugins/pkg/fluence/events.go diff --git a/README.md b/README.md index ae420fd..89f2a18 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,12 @@ Fluence enables HPC-grade pod scheduling in Kubernetes via the [Kubernetes Sched **Important** Fluence does not currently support use in conjunction with the kube-scheduler. Pods must all be scheduled by Fluence, and *you should not use both schedulers in the same cluster*. +## TODO + +- Need to list pods, get state, and if is completed, cancel the job id. +- Keep track of state of all pods in group, when all of pods are completed, then issue cancel. +- Calculate on the fly - on the update event we want to loop through pods, if ALL completed, then delete the podid for fluence. + ## Getting started For instructions on how to start Fluence on a K8s cluster, see [examples](examples/). Documentation and instructions for reproducing our CANOPIE-2022 paper (citation below) can be found in the [canopie22-artifacts branch](https://github.com/flux-framework/flux-k8s/tree/canopie22-artifacts). diff --git a/examples/pod-group/lammps/lammps2.yaml b/examples/pod-group/lammps/lammps2.yaml index 5cc7535..5a83c97 100644 --- a/examples/pod-group/lammps/lammps2.yaml +++ b/examples/pod-group/lammps/lammps2.yaml @@ -14,6 +14,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/pod-group/lammps/lammps4-2.yaml b/examples/pod-group/lammps/lammps4-2.yaml index 777e73c..6b647bc 100644 --- a/examples/pod-group/lammps/lammps4-2.yaml +++ b/examples/pod-group/lammps/lammps4-2.yaml @@ -17,6 +17,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/pod-group/lammps/lammps4-3.yaml b/examples/pod-group/lammps/lammps4-3.yaml index 76c5ed0..b182751 100644 --- a/examples/pod-group/lammps/lammps4-3.yaml +++ b/examples/pod-group/lammps/lammps4-3.yaml @@ -17,6 +17,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/pod-group/lammps/lammps4.yaml b/examples/pod-group/lammps/lammps4.yaml index 38ae0a7..9420902 100644 --- a/examples/pod-group/lammps/lammps4.yaml +++ b/examples/pod-group/lammps/lammps4.yaml @@ -18,6 +18,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/pod-group/lammps/lammps5.yaml b/examples/pod-group/lammps/lammps5.yaml index 7546b48..e85299f 100644 --- a/examples/pod-group/lammps/lammps5.yaml +++ b/examples/pod-group/lammps/lammps5.yaml @@ -17,6 +17,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/pod-group/lammps/lammps6.yaml b/examples/pod-group/lammps/lammps6.yaml index 2030192..14ebae3 100644 --- a/examples/pod-group/lammps/lammps6.yaml +++ b/examples/pod-group/lammps/lammps6.yaml @@ -17,6 +17,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/test_example/fluence-sized-job.yaml b/examples/test_example/fluence-sized-job.yaml index a195d87..d1e7556 100644 --- a/examples/test_example/fluence-sized-job.yaml +++ b/examples/test_example/fluence-sized-job.yaml @@ -11,6 +11,6 @@ spec: containers: - name: fluence-job image: busybox - command: [echo, potato] + command: [sleep, "20"] restartPolicy: Never backoffLimit: 4 diff --git a/sig-scheduler-plugins/cmd/scheduler/main.go b/sig-scheduler-plugins/cmd/scheduler/main.go index d9a580a..2b21d28 100644 --- a/sig-scheduler-plugins/cmd/scheduler/main.go +++ b/sig-scheduler-plugins/cmd/scheduler/main.go @@ -26,6 +26,7 @@ import ( "sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling" "sigs.k8s.io/scheduler-plugins/pkg/coscheduling" + "sigs.k8s.io/scheduler-plugins/pkg/fluence" "sigs.k8s.io/scheduler-plugins/pkg/networkaware/networkoverhead" "sigs.k8s.io/scheduler-plugins/pkg/networkaware/topologicalsort" "sigs.k8s.io/scheduler-plugins/pkg/noderesources" @@ -36,7 +37,7 @@ import ( "sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing" "sigs.k8s.io/scheduler-plugins/pkg/trimaran/lowriskovercommitment" "sigs.k8s.io/scheduler-plugins/pkg/trimaran/targetloadpacking" - "sigs.k8s.io/scheduler-plugins/pkg/fluence" + // Ensure scheme package is initialized. _ "sigs.k8s.io/scheduler-plugins/apis/config/scheme" ) @@ -56,8 +57,6 @@ func main() { app.WithPlugin(preemptiontoleration.Name, preemptiontoleration.New), app.WithPlugin(targetloadpacking.Name, targetloadpacking.New), app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New), - // Sample plugins below. - // app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New), app.WithPlugin(podstate.Name, podstate.New), app.WithPlugin(qos.Name, qos.New), app.WithPlugin(fluence.Name, fluence.New), diff --git a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go index 73b7d2d..27c31cb 100644 --- a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go +++ b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go @@ -58,10 +58,8 @@ type PodGroupReconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the PodGroup object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. +// Note that we currently don't do deletion based on owner references, but that +// would be ideal (I could not get it to work) // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile @@ -82,6 +80,7 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c log.Error(err, fmt.Sprintf("Unable to retrieve pod group %s", req.NamespacedName)) return ctrl.Result{}, err } + log.Info("REFERENCES", "Reconciler", pg.ObjectMeta.OwnerReferences) // Grab all statuses (and groups of them) we are interested in schedulingOrPending := (pg.Status.Phase == schedv1alpha1.PodGroupScheduling || pg.Status.Phase == schedv1alpha1.PodGroupPending) @@ -175,35 +174,32 @@ func (r *PodGroupReconciler) updateStatus( pods []v1.Pod, ) (ctrl.Result, error) { + log := log.FromContext(ctx) patch := client.MergeFrom(pg.DeepCopy()) + log.Info("PodGroup", "Phase", pg.Status.Phase) switch pg.Status.Phase { case "": pg.Status.Phase = schedv1alpha1.PodGroupPending - result, err := r.updateOwnerReferences(ctx, pg, pods) - if result.Requeue || err != nil { - return result, err - } case schedv1alpha1.PodGroupPending: if len(pods) >= int(pg.Spec.MinMember) { + log.Info("PodGroup", "Phase", "Scheduling") pg.Status.Phase = schedv1alpha1.PodGroupScheduling - result, err := r.updateOwnerReferences(ctx, pg, pods) - if result.Requeue || err != nil { - return result, err - } } default: - // Get updated counts of running, succeeded, and failed pods - running, succeeded, failed := getCurrentPodStats(pods) - // If for some reason we weren't pending and now have fewer than min required, flip back to pending if len(pods) < int(pg.Spec.MinMember) { + log.Info("PodGroup", "Phase", "Length of pods less than min member, pending") pg.Status.Phase = schedv1alpha1.PodGroupPending break } + // Get updated counts of running, succeeded, and failed pods + running, succeeded, failed := getCurrentPodStats(pods) + log.Info("PodGroup", "Running", running, "Succeeded", succeeded, "Failed", failed) + // A pod with succeeded + running STILL less than the minimum required is scheduling if succeeded+running < pg.Spec.MinMember { pg.Status.Phase = schedv1alpha1.PodGroupScheduling @@ -232,16 +228,18 @@ func (r *PodGroupReconciler) updateStatus( } // Apply the patch to update, or delete if finished - // TODO would be better if owner references took here, so delete on owner deletion - // TODO deletion is not currently handled for Deployment, ReplicaSet, StatefulSet - // as they are expected to persist. You can delete / lose and bring up again var err error if pg.Status.Phase == schedv1alpha1.PodGroupFinished || pg.Status.Phase == schedv1alpha1.PodGroupFailed { + log.Info("PodGroup", "Status", "Finished", "Owners", pg.OwnerReferences) + + // Delete the group if it is finished or failed err = r.Delete(ctx, pg) - } else { - r.Status().Update(ctx, pg) - err = r.Patch(ctx, pg, patch) + // Update but don't requeue + // _, err := r.updateOwnerReferences(ctx, pg, pods) + return ctrl.Result{}, err } + r.Status().Update(ctx, pg) + err = r.Patch(ctx, pg, patch) return ctrl.Result{Requeue: true}, err } @@ -366,21 +364,25 @@ func (r *PodGroupReconciler) updateOwnerReferences( return result, nil } - // Collect owner references for pod group + // Collect current owner references for pod group, + // We want to ensure we add unique ones across the pod owners := []metav1.OwnerReference{} var refs []string for _, ownerRef := range pod.OwnerReferences { refs = append(refs, fmt.Sprintf("%s/%s", pod.Namespace, ownerRef.Name)) owners = append(owners, ownerRef) } + patch := client.MergeFrom(pg.DeepCopy()) if len(refs) != 0 { sort.Strings(refs) pg.Status.OccupiedBy = strings.Join(refs, ",") } + // If we have owners, collapose into list if len(owners) > 0 { pg.ObjectMeta.OwnerReferences = owners } + // Apply the patch to update the size r.Status().Update(ctx, pg) err := r.Patch(ctx, pg, patch) diff --git a/sig-scheduler-plugins/pkg/fluence/README.md b/sig-scheduler-plugins/pkg/fluence/README.md deleted file mode 100644 index 61f4923..0000000 --- a/sig-scheduler-plugins/pkg/fluence/README.md +++ /dev/null @@ -1,29 +0,0 @@ -# Overview - -Project to manage Flux tasks needed to standardize kubernetes HPC scheduling interfaces - -## Installing the chart - -More detail will be added here about installing the chart. You will -be using the [install-as-a-second-scheduler](https://github.com/kubernetes-sigs/scheduler-plugins/tree/master/manifests/install/charts/as-a-second-scheduler) -charts. Fluence-specific values are detailed below. - -### Fluence specific values - -In `values.yaml` it is possible to customize the container image, already defaulted to the latest release, and the allocation policy -used by the scheduler. -Most common options are: - -- `lonode`: choose the nodes with lower ID first. Can be compared to packing -- `low`: choose cores with lowest IDs from multiple nodes. Can be compared to spread process-to-resource placement - -## Maturity Level - - - -- [x] Sample (for demonstrating and inspiring purpose) -- [ ] Alpha (used in companies for pilot projects) -- [ ] Beta (used in companies and developed actively) -- [ ] Stable (used in companies for production workloads) - - diff --git a/sig-scheduler-plugins/pkg/fluence/core/core.go b/sig-scheduler-plugins/pkg/fluence/core/core.go index a3f4531..efa1127 100644 --- a/sig-scheduler-plugins/pkg/fluence/core/core.go +++ b/sig-scheduler-plugins/pkg/fluence/core/core.go @@ -1,161 +1,329 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package core import ( + "context" "fmt" + "sync" + "time" - klog "k8s.io/klog/v2" - + gochache "github.com/patrickmn/go-cache" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + informerv1 "k8s.io/client-go/informers/core/v1" + listerv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" - pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" + "sigs.k8s.io/scheduler-plugins/pkg/util" ) -// FluxStateData is a CycleState -// It holds the PodCache for a pod, which has node assignment, group, and group size -// We also save the group name and size, and time created, in case we want to (somehow) resume scheduling -// In practice I'm not sure how CycleState objects are dumped and loaded. Kueue has a dumper :P -// https://github.com/kubernetes/enhancements/blob/master/keps/sig-scheduling/624-scheduling-framework/README.md#cyclestate -type FluxStateData struct { - NodeCache NodeCache +type Status string + +const ( + // PodGroupNotSpecified denotes no PodGroup is specified in the Pod spec. + PodGroupNotSpecified Status = "PodGroup not specified" + // PodGroupNotFound denotes the specified PodGroup in the Pod spec is + // not found in API server. + PodGroupNotFound Status = "PodGroup not found" + Success Status = "Success" + Wait Status = "Wait" +) + +// Manager defines the interfaces for PodGroup management. +type Manager interface { + PreFilter(context.Context, *corev1.Pod) error + Permit(context.Context, *corev1.Pod) Status + GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup) + GetCreationTimestamp(*corev1.Pod, time.Time) time.Time + DeletePermittedPodGroup(string) + CalculateAssignedPods(string, string) int + ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) + BackoffPodGroup(string, time.Duration) } -// Clone is required for CycleState plugins -func (s *FluxStateData) Clone() framework.StateData { - return &FluxStateData{NodeCache: s.NodeCache} +// PodGroupManager defines the scheduling operation called +type PodGroupManager struct { + // client is a generic controller-runtime client to manipulate both core resources and PodGroups. + client client.Client + // snapshotSharedLister is pod shared list + snapshotSharedLister framework.SharedLister + // scheduleTimeout is the default timeout for podgroup scheduling. + // If podgroup's scheduleTimeoutSeconds is set, it will be used. + scheduleTimeout *time.Duration + // permittedPG stores the podgroup name which has passed the pre resource check. + permittedPG *gochache.Cache + // backedOffPG stores the podgorup name which failed scheudling recently. + backedOffPG *gochache.Cache + // podLister is pod lister + podLister listerv1.PodLister + sync.RWMutex } -// NewFluxState creates an entry for the CycleState with the node and group name -func NewFluxState(nodeName string, groupName string) *FluxStateData { - cache := NodeCache{NodeName: nodeName} - return &FluxStateData{NodeCache: cache} +// NewPodGroupManager creates a new operation object. +func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager { + pgMgr := &PodGroupManager{ + client: client, + snapshotSharedLister: snapshotSharedLister, + scheduleTimeout: scheduleTimeout, + podLister: podInformer.Lister(), + permittedPG: gochache.New(3*time.Second, 3*time.Second), + backedOffPG: gochache.New(10*time.Second, 10*time.Second), + } + return pgMgr } -// NodeCache holds the node name and tasks for the node -// For the PodGroupCache, these are organized by group name, -// and there is a list of them -type NodeCache struct { - NodeName string +func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) { + if backoff == time.Duration(0) { + return + } + pgMgr.backedOffPG.Add(pgName, nil, backoff) +} - // Tie assignment back to PodGroup, which can be used to get size and time created - GroupName string +// ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod +// in the given state, with a reserved key "kubernetes.io/pods-to-activate". +func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) { + pgName := util.GetPodGroupLabel(pod) + if pgName == "" { + return + } - // Assigned tasks (often pods) to nodes - // https://github.com/flux-framework/flux-k8s/blob/9f24f36752e3cced1b1112d93bfa366fb58b3c84/src/fluence/fluxion/fluxion.go#L94-L97 - AssignedTasks int -} + pods, err := pgMgr.podLister.Pods(pod.Namespace).List( + labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: pgName}), + ) + if err != nil { + klog.ErrorS(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName) + return + } -// A pod group cache holds a list of nodes for an allocation, where each has some number of tasks -// along with the expected group size. This is intended to replace PodGroup -// given the group name, size (derived from annotations) and timestamp -type PodGroupCache struct { - GroupName string + for i := range pods { + if pods[i].UID == pod.UID { + pods = append(pods[:i], pods[i+1:]...) + break + } + } - // This is a cache of nodes for pods - Nodes []NodeCache + if len(pods) != 0 { + if c, err := state.Read(framework.PodsToActivateKey); err == nil { + if s, ok := c.(*framework.PodsToActivate); ok { + s.Lock() + for _, pod := range pods { + namespacedName := GetNamespacedName(pod) + s.Map[namespacedName] = pod + } + s.Unlock() + } + } + } } -// PodGroups seen by fluence -var groupsSeen map[string]*PodGroupCache +// PreFilter filters out a pod if +// 1. it belongs to a podgroup that was recently denied or +// 2. the total number of pods in the podgroup is less than the minimum number of pods +// that is required to be scheduled. +func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error { + klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod)) + pgFullName, pg := pgMgr.GetPodGroup(ctx, pod) + if pg == nil { + return nil + } -// Init populates the groupsSeen cache -func Init() { - groupsSeen = map[string]*PodGroupCache{} -} + if _, exist := pgMgr.backedOffPG.Get(pgFullName); exist { + return fmt.Errorf("podGroup %v failed recently", pgFullName) + } -// GetFluenceCache determines if a group has been seen. -// Yes -> we return the PodGroupCache entry -// No -> the entry is nil / does not exist -func GetFluenceCache(groupName string) *PodGroupCache { - entry, _ := groupsSeen[groupName] - return entry -} + pods, err := pgMgr.podLister.Pods(pod.Namespace).List( + labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: util.GetPodGroupLabel(pod)}), + ) + if err != nil { + return fmt.Errorf("podLister list pods failed: %w", err) + } -// DeletePodGroup deletes a pod from the group cache -func DeletePodGroup(groupName string) { - delete(groupsSeen, groupName) -} + if len(pods) < int(pg.Spec.MinMember) { + return fmt.Errorf("pre-filter pod %v cannot find enough sibling pods, "+ + "current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember) + } -// CreateNodePodsList creates a list of node pod caches -func CreateNodeList(nodelist []*pb.NodeAlloc, groupName string) (nodepods []NodeCache) { + if pg.Spec.MinResources == nil { + return nil + } - // Create a pod cache for each node - nodepods = make([]NodeCache, len(nodelist)) + // TODO(cwdsuzhou): This resource check may not always pre-catch unschedulable pod group. + // It only tries to PreFilter resource constraints so even if a PodGroup passed here, + // it may not necessarily pass Filter due to other constraints such as affinity/taints. + if _, ok := pgMgr.permittedPG.Get(pgFullName); ok { + return nil + } - // TODO: should we be integrating topology information here? Could it be the - // case that some nodes (pods) in the group should be closer? - for i, v := range nodelist { - nodepods[i] = NodeCache{ - NodeName: v.GetNodeID(), - AssignedTasks: int(v.GetTasks()), - GroupName: groupName, - } + nodes, err := pgMgr.snapshotSharedLister.NodeInfos().List() + if err != nil { + return err } - // Update the pods in the PodGroupCache (groupsSeen) - updatePodGroupCache(groupName, nodepods) - return nodepods + minResources := pg.Spec.MinResources.DeepCopy() + podQuantity := resource.NewQuantity(int64(pg.Spec.MinMember), resource.DecimalSI) + minResources[corev1.ResourcePods] = *podQuantity + err = CheckClusterResource(nodes, minResources, pgFullName) + if err != nil { + klog.ErrorS(err, "Failed to PreFilter", "podGroup", klog.KObj(pg)) + return err + } + pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout) + return nil } -// updatePodGroupList updates the PodGroupCache with a listing of nodes -func updatePodGroupCache(groupName string, nodes []NodeCache) { - cache := PodGroupCache{ - Nodes: nodes, - GroupName: groupName, +// Permit permits a pod to run, if the minMember match, it would send a signal to chan. +func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Status { + pgFullName, pg := pgMgr.GetPodGroup(ctx, pod) + if pgFullName == "" { + return PodGroupNotSpecified + } + if pg == nil { + // A Pod with a podGroup name but without a PodGroup found is denied. + return PodGroupNotFound + } + + assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace) + // The number of pods that have been assigned nodes is calculated from the snapshot. + // The current pod in not included in the snapshot during the current scheduling cycle. + if int32(assigned)+1 >= pg.Spec.MinMember { + return Success } - groupsSeen[groupName] = &cache + return Wait } -// GetNextNode gets the next node in the PodGroupCache -func (p *PodGroupCache) GetNextNode() (string, error) { +// GetCreationTimestamp returns the creation time of a podGroup or a pod. +func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time { + pgName := util.GetPodGroupLabel(pod) + if len(pgName) == 0 { + return ts + } + var pg v1alpha1.PodGroup + if err := pgMgr.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil { + return ts + } + return pg.CreationTimestamp.Time +} - nextnode := "" +// DeletePermittedPodGroup deletes a podGroup that passes Pre-Filter but reaches PostFilter. +func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) { + pgMgr.permittedPG.Delete(pgFullName) +} - // Quick failure state - we ran out of nodes - if len(p.Nodes) == 0 { - return nextnode, fmt.Errorf("[Fluence] PodGroup %s ran out of nodes.", p.GroupName) +// GetPodGroup returns the PodGroup that a Pod belongs to in cache. +func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) (string, *v1alpha1.PodGroup) { + pgName := util.GetPodGroupLabel(pod) + if len(pgName) == 0 { + return "", nil + } + var pg v1alpha1.PodGroup + if err := pgMgr.client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil { + return fmt.Sprintf("%v/%v", pod.Namespace, pgName), nil } + return fmt.Sprintf("%v/%v", pod.Namespace, pgName), &pg +} - // The next is the 0th in the list - nextnode = p.Nodes[0].NodeName - klog.Infof("[Fluence] Next node for group %s is %s", p.GroupName, nextnode) +// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound. +func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int { + nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List() + klog.Info(nodeInfos) + if err != nil { + klog.ErrorS(err, "Cannot get nodeInfos from frameworkHandle") + return 0 + } + var count int + for _, nodeInfo := range nodeInfos { + for _, podInfo := range nodeInfo.Pods { + pod := podInfo.Pod + if util.GetPodGroupLabel(pod) == podGroupName && pod.Namespace == namespace && pod.Spec.NodeName != "" { + count++ + } + } + } - // If there is only one task left, we are going to use it (and remove the node) - if p.Nodes[0].AssignedTasks == 1 { - klog.Infof("[Fluence] First node has one remaining task slot") - slice := p.Nodes[1:] + return count +} - // If after we remove the node there are no nodes left... - // Note that I'm not deleting the node from the cache because that is the - // only way fluence knows it has already assigned work (presence of the key) - if len(slice) == 0 { - klog.Infof("[Fluence] Assigning node %s. There are NO reamining nodes for group %s\n", nextnode, p.GroupName) - // delete(podGroupCache, groupName) - return nextnode, nil +// CheckClusterResource checks if resource capacity of the cluster can satisfy . +// It returns an error detailing the resource gap if not satisfied; otherwise returns nil. +func CheckClusterResource(nodeList []*framework.NodeInfo, resourceRequest corev1.ResourceList, desiredPodGroupName string) error { + for _, info := range nodeList { + if info == nil || info.Node() == nil { + continue } - klog.Infof("[Fluence] Assigning node %s. There are nodes left for group", nextnode, p.GroupName) - updatePodGroupCache(p.GroupName, slice) - return nextnode, nil + nodeResource := util.ResourceList(getNodeResource(info, desiredPodGroupName)) + for name, quant := range resourceRequest { + quant.Sub(nodeResource[name]) + if quant.Sign() <= 0 { + delete(resourceRequest, name) + continue + } + resourceRequest[name] = quant + } + if len(resourceRequest) == 0 { + return nil + } } + return fmt.Errorf("resource gap: %v", resourceRequest) +} - // If we get here the first node had >1 assigned tasks - klog.Infof("[Fluence] Assigning node %s for group %s. There are still task assignments available for this node.", nextnode, p.GroupName) - p.Nodes[0].AssignedTasks = p.Nodes[0].AssignedTasks - 1 - return nextnode, nil +// GetNamespacedName returns the namespaced name. +func GetNamespacedName(obj metav1.Object) string { + return fmt.Sprintf("%v/%v", obj.GetNamespace(), obj.GetName()) } -// GetNextNode gets the next available node we can allocate for a group -// TODO this should be able to take and pass forward a number of tasks. -// It is implicity 1 now, but doesn't have to be. -func GetNextNode(groupName string) (string, error) { +func getNodeResource(info *framework.NodeInfo, desiredPodGroupName string) *framework.Resource { + nodeClone := info.Clone() + for _, podInfo := range info.Pods { + if podInfo == nil || podInfo.Pod == nil { + continue + } + if util.GetPodGroupFullName(podInfo.Pod) != desiredPodGroupName { + continue + } + nodeClone.RemovePod(podInfo.Pod) + } - // Get our entry from the groupsSeen cache - klog.Infof("[Fluence] groups seen %s", groupsSeen) - entry, ok := groupsSeen[groupName] + leftResource := framework.Resource{ + ScalarResources: make(map[corev1.ResourceName]int64), + } + allocatable := nodeClone.Allocatable + requested := nodeClone.Requested + + leftResource.AllowedPodNumber = allocatable.AllowedPodNumber - len(nodeClone.Pods) + leftResource.MilliCPU = allocatable.MilliCPU - requested.MilliCPU + leftResource.Memory = allocatable.Memory - requested.Memory + leftResource.EphemeralStorage = allocatable.EphemeralStorage - requested.EphemeralStorage - // This case should not happen - if !ok { - return "", fmt.Errorf("[Fluence] Map is empty") + for k, allocatableEx := range allocatable.ScalarResources { + requestEx, ok := requested.ScalarResources[k] + if !ok { + leftResource.ScalarResources[k] = allocatableEx + } else { + leftResource.ScalarResources[k] = allocatableEx - requestEx + } } - // Get the next node from the PodGroupCache - return entry.GetNextNode() + klog.V(4).InfoS("Node left resource", "node", klog.KObj(info.Node()), "resource", leftResource) + return &leftResource } diff --git a/sig-scheduler-plugins/pkg/fluence/events.go b/sig-scheduler-plugins/pkg/fluence/events.go deleted file mode 100644 index b891713..0000000 --- a/sig-scheduler-plugins/pkg/fluence/events.go +++ /dev/null @@ -1,166 +0,0 @@ -package fluence - -import ( - "context" - "time" - - "google.golang.org/grpc" - v1 "k8s.io/api/core/v1" - klog "k8s.io/klog/v2" - - pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" - fgroup "sigs.k8s.io/scheduler-plugins/pkg/fluence/group" -) - -// Events are associated with inforers, typically on pods, e.g., -// delete: deletion of a pod -// update: update of a pod! -// For both of the above, there are cases to cancel the flux job -// associated with the group id - -// cancelFluxJobForPod cancels the flux job for a pod. -// We assume that the cancelled job also means deleting the pod group -func (f *Fluence) cancelFluxJob(groupName string) error { - - // TODO: it's a bit risky to store state here, because if the scheduler - // restarts we cannot look up the jobid, and then cannot cancel it. - // There is no way to request cancelling the job for a specific group - jobid, ok := f.groupToJobId[groupName] - - // The job was already cancelled by another pod - if !ok { - klog.Infof("[Fluence] Request for cancel of group %s is already complete.", groupName) - return nil - } - klog.Infof("[Fluence] Cancel flux job: %v for group %s", jobid, groupName) - - // This first error is about connecting to the server - conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) - if err != nil { - klog.Errorf("[Fluence] Error connecting to server: %v", err) - return err - } - defer conn.Close() - - grpcclient := pb.NewFluxcliServiceClient(conn) - _, cancel := context.WithTimeout(context.Background(), 200*time.Second) - defer cancel() - - // This error reflects the success or failure of the cancel request - request := &pb.CancelRequest{JobID: int64(jobid)} - res, err := grpcclient.Cancel(context.Background(), request) - if err != nil { - klog.Errorf("[Fluence] did not receive any cancel response: %v", err) - return err - } - klog.Infof("[Fluence] Job cancellation for group %s result: %d", groupName, res.Error) - - // And this error is if the cancel was successful or not - if res.Error == 0 { - klog.Infof("[Fluence] Successful cancel of flux job: %d for group %s", jobid, groupName) - delete(f.groupToJobId, groupName) - } else { - klog.Warningf("[Fluence] Failed to cancel flux job %d for group %s", jobid, groupName) - } - return nil -} - -// updatePod is called on an update, and the old and new object are presented -func (f *Fluence) updatePod(oldObj, newObj interface{}) { - - oldPod := oldObj.(*v1.Pod) - newPod := newObj.(*v1.Pod) - - // a pod is updated, get the group - // TODO should we be checking group / size for old vs new? - groupName, pg := f.pgMgr.GetPodGroup(context.TODO(), oldPod) - - // If PodGroup is nil, still try to look up a faux name - if pg == nil { - pg = fgroup.CreateFakeGroup(oldPod) - groupName = pg.Name - } - - klog.Infof("[Fluence] Processing event for pod %s in group %s from %s to %s", newPod.Name, groupName, newPod.Status.Phase, oldPod.Status.Phase) - - switch newPod.Status.Phase { - case v1.PodPending: - // in this state we don't know if a pod is going to be running, thus we don't need to update job map - case v1.PodRunning: - // if a pod is start running, we can add it state to the delta graph if it is scheduled by other scheduler - case v1.PodSucceeded: - klog.Infof("[Fluence] Pod %s succeeded, Fluence needs to free the resources", newPod.Name) - - f.mutex.Lock() - defer f.mutex.Unlock() - - // Do we have the group id in our cache? If yes, we haven't deleted the jobid yet - // I am worried here that if some pods are succeeded and others pending, this could - // be a mistake - fluence would schedule it again - _, ok := f.groupToJobId[groupName] - if ok { - f.cancelFluxJob(groupName) - } else { - klog.Infof("[Fluence] Succeeded pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) - } - - case v1.PodFailed: - - // a corner case need to be tested, the pod exit code is not 0, can be created with segmentation fault pi test - klog.Warningf("[Fluence] Pod %s in group %s failed, Fluence needs to free the resources", newPod.Name, groupName) - - f.mutex.Lock() - defer f.mutex.Unlock() - - _, ok := f.groupToJobId[groupName] - if ok { - f.cancelFluxJob(groupName) - } else { - klog.Errorf("[Fluence] Failed pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) - } - case v1.PodUnknown: - // don't know how to deal with it as it's unknown phase - default: - // shouldn't enter this branch - } -} - -// deletePod handles the delete event handler -func (f *Fluence) deletePod(podObj interface{}) { - klog.Info("[Fluence] Delete Pod event handler") - pod := podObj.(*v1.Pod) - groupName, pg := f.pgMgr.GetPodGroup(context.TODO(), pod) - - // If PodGroup is nil, still try to look up a faux name - if pg == nil { - pg = fgroup.CreateFakeGroup(pod) - groupName = pg.Name - } - - klog.Infof("[Fluence] Delete pod %s in group %s has status %s", pod.Status.Phase, pod.Name, groupName) - switch pod.Status.Phase { - case v1.PodSucceeded: - case v1.PodPending: - klog.Infof("[Fluence] Pod %s completed and is Pending termination, Fluence needs to free the resources", pod.Name) - - f.mutex.Lock() - defer f.mutex.Unlock() - - _, ok := f.groupToJobId[groupName] - if ok { - f.cancelFluxJob(groupName) - } else { - klog.Infof("[Fluence] Terminating pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) - } - case v1.PodRunning: - f.mutex.Lock() - defer f.mutex.Unlock() - - _, ok := f.groupToJobId[groupName] - if ok { - f.cancelFluxJob(groupName) - } else { - klog.Infof("[Fluence] Deleted pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) - } - } -} diff --git a/sig-scheduler-plugins/pkg/fluence/fluence.go b/sig-scheduler-plugins/pkg/fluence/fluence.go index 33976ae..1ad1fd3 100644 --- a/sig-scheduler-plugins/pkg/fluence/fluence.go +++ b/sig-scheduler-plugins/pkg/fluence/fluence.go @@ -1,124 +1,140 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package fluence import ( "context" "fmt" - "os" "sync" "time" - "google.golang.org/grpc" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/informers" clientscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + + fgroup "sigs.k8s.io/scheduler-plugins/pkg/fluence/group" + label "sigs.k8s.io/scheduler-plugins/pkg/fluence/labels" + corev1helpers "k8s.io/component-helpers/scheduling/corev1" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" - "sigs.k8s.io/controller-runtime/pkg/client" - sched "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" - coschedulingcore "sigs.k8s.io/scheduler-plugins/pkg/coscheduling/core" + "sigs.k8s.io/scheduler-plugins/pkg/util" + + "sigs.k8s.io/scheduler-plugins/apis/config" + "sigs.k8s.io/scheduler-plugins/apis/scheduling" + "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" fcore "sigs.k8s.io/scheduler-plugins/pkg/fluence/core" - pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" - fgroup "sigs.k8s.io/scheduler-plugins/pkg/fluence/group" - "sigs.k8s.io/scheduler-plugins/pkg/fluence/utils" ) +// Fluence schedules pods in a group using Fluxion as a backend +// We inherit cosched.Coscheduling to use some of the primary functions type Fluence struct { mutex sync.Mutex - handle framework.Handle client client.Client // Store jobid on the level of a group (which can be a single pod) groupToJobId map[string]uint64 - pgMgr coschedulingcore.Manager -} -// Name is the name of the plugin used in the Registry and configurations. -// Note that this would do better as an annotation (fluence.flux-framework.org/pod-group) -// But we cannot use them as selectors then! -const ( - Name = "Fluence" -) + frameworkHandler framework.Handle + pgMgr fcore.Manager + scheduleTimeout *time.Duration + pgBackoff *time.Duration +} var ( - _ framework.QueueSortPlugin = &Fluence{} - _ framework.PreFilterPlugin = &Fluence{} - _ framework.FilterPlugin = &Fluence{} + _ framework.QueueSortPlugin = &Fluence{} + _ framework.PreFilterPlugin = &Fluence{} + _ framework.PostFilterPlugin = &Fluence{} // Here down are from coscheduling + _ framework.PermitPlugin = &Fluence{} + _ framework.ReservePlugin = &Fluence{} + _ framework.EnqueueExtensions = &Fluence{} ) -func (f *Fluence) Name() string { - return Name -} +const ( + // Name is the name of the plugin used in Registry and configurations. + Name = "Fluence" +) // Initialize and return a new Fluence Custom Scheduler Plugin -// This class and functions are analogous to: -// https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/coscheduling/coscheduling.go#L63 -func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - f := &Fluence{handle: handle, groupToJobId: make(map[string]uint64)} - - ctx := context.TODO() - fcore.Init() - - fluxPodsInformer := handle.SharedInformerFactory().Core().V1().Pods().Informer() - fluxPodsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - UpdateFunc: f.updatePod, - DeleteFunc: f.deletePod, - }) - - go fluxPodsInformer.Run(ctx.Done()) + // Keep these empty for now, use defaults + args := config.CoschedulingArgs{} scheme := runtime.NewScheme() - clientscheme.AddToScheme(scheme) - v1.AddToScheme(scheme) - sched.AddToScheme(scheme) - k8scli, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme}) - if err != nil { - return nil, err - } - - // Save the kubernetes client for fluence to interact with cluster objects - f.client = k8scli + _ = clientscheme.AddToScheme(scheme) + _ = v1.AddToScheme(scheme) + _ = v1alpha1.AddToScheme(scheme) - fieldSelector, err := fields.ParseSelector(",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) + client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme}) if err != nil { - klog.Errorf("ParseSelector failed %s", err) - os.Exit(1) + return nil, err } - informerFactory := informers.NewSharedInformerFactoryWithOptions(handle.ClientSet(), 0, informers.WithTweakListOptions(func(opt *metav1.ListOptions) { - opt.FieldSelector = fieldSelector.String() - })) - podInformer := informerFactory.Core().V1().Pods() - scheduleTimeDuration := time.Duration(500) * time.Second + // Performance improvement when retrieving list of objects by namespace or we'll log 'index not exist' warning. + handle.SharedInformerFactory().Core().V1().Pods().Informer().AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - // https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/coscheduling/core/core.go#L84 - pgMgr := coschedulingcore.NewPodGroupManager( - k8scli, + // PermitWaitingTimeSeconds is the waiting timeout in seconds. + scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second + pgMgr := fcore.NewPodGroupManager( + client, handle.SnapshotSharedLister(), &scheduleTimeDuration, - podInformer, + // Keep the podInformer (from frameworkHandle) as the single source of Pods. + handle.SharedInformerFactory().Core().V1().Pods(), ) - f.pgMgr = pgMgr - // stopCh := make(chan struct{}) - // defer close(stopCh) - // informerFactory.Start(stopCh) - informerFactory.Start(ctx.Done()) + // The main difference here is adding the groupToJobId lookup + plugin := &Fluence{ + frameworkHandler: handle, + pgMgr: pgMgr, + scheduleTimeout: &scheduleTimeDuration, + groupToJobId: make(map[string]uint64), + } - if !cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { - err := fmt.Errorf("WaitForCacheSync failed") - klog.ErrorS(err, "Cannot sync caches") + // PodGroupBackoffSeconds: backoff time in seconds before a pod group can be scheduled again. + if args.PodGroupBackoffSeconds < 0 { + err := fmt.Errorf("parse arguments failed") + klog.ErrorS(err, "PodGroupBackoffSeconds cannot be negative") return nil, err + } else if args.PodGroupBackoffSeconds > 0 { + pgBackoff := time.Duration(args.PodGroupBackoffSeconds) * time.Second + plugin.pgBackoff = &pgBackoff } + return plugin, nil +} + +func (f *Fluence) Name() string { + return Name +} - klog.Info("Fluence scheduler plugin started") - return f, nil +// Fluence has added delete, although I wonder if update includes that signal +// and it's redundant? +func (f *Fluence) EventsToRegister() []framework.ClusterEventWithHint { + // To register a custom event, follow the naming convention at: + // https://git.k8s.io/kubernetes/pkg/scheduler/eventhandlers.go#L403-L410 + pgGVK := fmt.Sprintf("podgroups.v1alpha1.%v", scheduling.GroupName) + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.Delete}}, + {Event: framework.ClusterEvent{Resource: framework.GVK(pgGVK), ActionType: framework.Add | framework.Update | framework.Delete}}, + } } // Less is used to sort pods in the scheduling queue in the following order. @@ -147,177 +163,131 @@ func (f *Fluence) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { // If they are the same, fall back to sorting by name. if creationTime1.Equal(&creationTime2) { - return coschedulingcore.GetNamespacedName(podInfo1.Pod) < coschedulingcore.GetNamespacedName(podInfo2.Pod) + return fcore.GetNamespacedName(podInfo1.Pod) < fcore.GetNamespacedName(podInfo2.Pod) } return creationTime1.Before(&creationTime2) -} - -// PreFilter checks info about the Pod / checks conditions that the cluster or the Pod must meet. -// This comes after sort -func (f *Fluence) PreFilter( - ctx context.Context, - state *framework.CycleState, - pod *v1.Pod, -) (*framework.PreFilterResult, *framework.Status) { - klog.Infof("[Fluence] Examining pod %s", pod.Name) - - // groupName will be named according to the single pod namespace / pod if there wasn't - // a user defined group. This is a size 1 group we handle equivalently. - groupName, pg := f.pgMgr.GetPodGroup(ctx, pod) +} - // If we don't have a pod group and it's here, it was asked to be scheduled by fluence - // but the group isn't ready. Unshedulable for now. - if pg == nil { - klog.Infof("[Fluence] Group %s/%s does not have a pod group, not schedulable yet.", pod.Namespace, pod.Name) - return nil, framework.NewStatus(framework.Unschedulable, "Missing podgroup") - } - klog.Infof("[Fluence] Pod %s is in group %s with minimum members %d", pod.Name, groupName, pg.Spec.MinMember) - - // Has this podgroup been seen by fluence yet? If yes, we will have it in the cache - cache := fcore.GetFluenceCache(groupName) - klog.Infof("[Fluence] cache %s", cache) - - // Fluence has never seen this before, we need to schedule an allocation - // It also could have been seen, but was not able to get one. - if cache == nil { - klog.Infof("[Fluence] Does not have nodes for %s yet, asking Fluxion", groupName) - - // groupName is the namespaced name / - err := f.AskFlux(ctx, pod, pg, groupName) - if err != nil { - klog.Infof("[Fluence] Fluxion returned an error %s, not schedulable", err.Error()) - return nil, framework.NewStatus(framework.Unschedulable, err.Error()) - } +// PreFilter performs the following validations. +// 1. Whether the PodGroup that the Pod belongs to is on the deny list. +// 2. Whether the total number of pods in a PodGroup is less than its `minMember`. +func (f *Fluence) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + // If PreFilter fails, return framework.UnschedulableAndUnresolvable to avoid + // any preemption attempts. + if err := f.pgMgr.PreFilter(ctx, pod); err != nil { + klog.ErrorS(err, "PreFilter failed", "pod", klog.KObj(pod)) + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } - - // This is the next node in the list - nodename, err := fcore.GetNextNode(groupName) - if err != nil { - return nil, framework.NewStatus(framework.Unschedulable, err.Error()) - } - klog.Infof("Node Selected %s (pod %s:group %s)", nodename, pod.Name, groupName) - - // Create a fluxState (CycleState) with things that might be useful - // This isn't a PodGroupCache, but a single node cache, which also - // has group information, but just is for one node. Note that assigned - // tasks is hard coded to 1 but this isn't necessarily the case - we should - // eventually be able to GetNextNode for a number of tasks, for example - // (unless task == pod in which case it is always 1) - nodeCache := fcore.NodeCache{NodeName: nodename, GroupName: groupName, AssignedTasks: 1} - state.Write(framework.StateKey(pod.Name), &fcore.FluxStateData{NodeCache: nodeCache}) return nil, framework.NewStatus(framework.Success, "") } -// TODO we need to account for affinity here -func (f *Fluence) Filter( - ctx context.Context, - cycleState *framework.CycleState, - pod *v1.Pod, - nodeInfo *framework.NodeInfo, -) *framework.Status { +// PostFilter is used to reject a group of pods if a pod does not pass PreFilter or Filter. +func (f *Fluence) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, + filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { + pgName, pg := f.pgMgr.GetPodGroup(ctx, pod) + if pg == nil { + klog.V(4).InfoS("Pod does not belong to any group", "pod", klog.KObj(pod)) + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, "can not find pod group") + } - klog.Info("Filtering input node ", nodeInfo.Node().Name) - state, err := cycleState.Read(framework.StateKey(pod.Name)) + // This indicates there are already enough Pods satisfying the PodGroup, + // so don't bother to reject the whole PodGroup. + assigned := f.pgMgr.CalculateAssignedPods(pg.Name, pod.Namespace) + if assigned >= int(pg.Spec.MinMember) { + klog.V(4).InfoS("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned) + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) + } - // No error means we retrieved the state - if err == nil { + // If the gap is less than/equal 10%, we may want to try subsequent Pods + // to see they can satisfy the PodGroup + notAssignedPercentage := float32(int(pg.Spec.MinMember)-assigned) / float32(pg.Spec.MinMember) + if notAssignedPercentage <= 0.1 { + klog.V(4).InfoS("A small gap of pods to reach the quorum", "podGroup", klog.KObj(pg), "percentage", notAssignedPercentage) + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) + } - // Try to convert the state to FluxStateDate - value, ok := state.(*fcore.FluxStateData) + // It's based on an implicit assumption: if the nth Pod failed, + // it's inferrable other Pods belonging to the same PodGroup would be very likely to fail. + f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + if waitingPod.GetPod().Namespace == pod.Namespace && label.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name { + klog.V(3).InfoS("PostFilter rejects the pod", "podGroup", klog.KObj(pg), "pod", klog.KObj(waitingPod.GetPod())) + waitingPod.Reject(f.Name(), "optimistic rejection in PostFilter") + } + }) - // If we have state data that isn't equal to the current assignment, no go - if ok && value.NodeCache.NodeName != nodeInfo.Node().Name { - return framework.NewStatus(framework.Unschedulable, "pod is not permitted") - } else { - klog.Infof("Filter: node %s selected for %s\n", value.NodeCache.NodeName, pod.Name) + if f.pgBackoff != nil { + pods, err := f.frameworkHandler.SharedInformerFactory().Core().V1().Pods().Lister().Pods(pod.Namespace).List( + labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: label.GetPodGroupLabel(pod)}), + ) + if err == nil && len(pods) >= int(pg.Spec.MinMember) { + f.pgMgr.BackoffPodGroup(pgName, *f.pgBackoff) } } - return framework.NewStatus(framework.Success) + + f.pgMgr.DeletePermittedPodGroup(pgName) + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, + fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", pgName, pod.Name)) } -// PreFilterExtensions allow for callbacks on filtered states -// https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/framework/interface.go#L383 +// PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one. func (f *Fluence) PreFilterExtensions() framework.PreFilterExtensions { return nil } -// AskFlux will ask flux for an allocation for nodes for the pod group. -func (f *Fluence) AskFlux( - ctx context.Context, - pod *v1.Pod, - pg *sched.PodGroup, - groupName string, -) error { - - // clean up previous match if a pod has already allocated previously - f.mutex.Lock() - _, isAllocated := f.groupToJobId[groupName] - f.mutex.Unlock() - - // This case happens when there is some reason that an initial job pods partially allocated, - // but then the job restarted, and new pods are present but fluence had assigned nodes to - // the old ones (and there aren't enough). The job would have had to complete in some way, - // and the PodGroup would have to then recreate, and have the same job id (the group name). - // This happened when I cancalled a bunch of jobs and they didn't have the chance to - // cancel in fluence. What we can do here is assume the previous pods are no longer running - // and cancel the flux job to create again. - if isAllocated { - klog.Info("Warning - group %s was previously allocated and is requesting again, so must have completed.", groupName) - f.mutex.Lock() - f.cancelFluxJob(groupName) - f.mutex.Unlock() - } - - // IMPORTANT: this is a JobSpec for *one* pod, assuming they are all the same. - // This obviously may not be true if we have a hetereogenous PodGroup. - // We name it based on the group, since it will represent the group - jobspec := utils.PreparePodJobSpec(pod, groupName) - klog.Infof("[Fluence] Inspect pod info, jobspec: %s\n", jobspec) - conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) - - // TODO change this to just return fmt.Errorf - if err != nil { - klog.Errorf("[Fluence] Error connecting to server: %v\n", err) - return err +// Permit is the functions invoked by the framework at "Permit" extension point. +func (f *Fluence) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) { + waitTime := *f.scheduleTimeout + s := f.pgMgr.Permit(ctx, pod) + var retStatus *framework.Status + switch s { + case fcore.PodGroupNotSpecified: + return framework.NewStatus(framework.Success, ""), 0 + case fcore.PodGroupNotFound: + return framework.NewStatus(framework.Unschedulable, "PodGroup not found"), 0 + case fcore.Wait: + klog.InfoS("Pod is waiting to be scheduled to node", "pod", klog.KObj(pod), "nodeName", nodeName) + _, pg := f.pgMgr.GetPodGroup(ctx, pod) + + // Note this is in seconds, defaults to 60 seconds + if wait := util.GetWaitTimeDuration(pg, f.scheduleTimeout); wait != 0 { + waitTime = wait + } + retStatus = framework.NewStatus(framework.Wait) + // We will also request to move the sibling pods back to activeQ. + f.pgMgr.ActivateSiblings(pod, state) + case fcore.Success: + pgFullName := label.GetPodGroupFullName(pod) + f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + if label.GetPodGroupFullName(waitingPod.GetPod()) == pgFullName { + klog.V(3).InfoS("Permit allows", "pod", klog.KObj(waitingPod.GetPod())) + waitingPod.Allow(f.Name()) + } + }) + klog.V(3).InfoS("Permit allows", "pod", klog.KObj(pod)) + retStatus = framework.NewStatus(framework.Success) + waitTime = 0 } - defer conn.Close() - grpcclient := pb.NewFluxcliServiceClient(conn) - _, cancel := context.WithTimeout(context.Background(), 200*time.Second) - defer cancel() + return retStatus, waitTime +} - request := &pb.MatchRequest{ - Ps: jobspec, - Request: "allocate", - Count: pg.Spec.MinMember, - } +// Reserve is the functions invoked by the framework at "reserve" extension point. +func (f *Fluence) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + return nil +} - // An error here is an error with making the request - r, err := grpcclient.Match(context.Background(), request) - if err != nil { - klog.Errorf("[Fluence] did not receive any match response: %v\n", err) - return err +// Unreserve rejects all other Pods in the PodGroup when one of the pods in the group times out. +func (f *Fluence) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { + pgName, pg := f.pgMgr.GetPodGroup(ctx, pod) + if pg == nil { + return } - - // TODO GetPodID should be renamed, because it will reflect the group - klog.Infof("[Fluence] Match response ID %s\n", r.GetPodID()) - - // Get the nodelist and inspect - nodes := r.GetNodelist() - klog.Infof("[Fluence] Nodelist returned from Fluxion: %s\n", nodes) - - // Assign the nodelist - this sets the group name in the groupSeen cache - // at this point, we can retrieve the cache and get nodes - nodelist := fcore.CreateNodeList(nodes, groupName) - - jobid := uint64(r.GetJobID()) - klog.Infof("[Fluence] parsed node pods list %s for job id %d\n", nodelist, jobid) - - // TODO would be nice to actually be able to ask flux jobs -a to fluence - // That way we can verify assignments, etc. - f.mutex.Lock() - f.groupToJobId[groupName] = jobid - f.mutex.Unlock() - return nil + f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + if waitingPod.GetPod().Namespace == pod.Namespace && label.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name { + klog.V(3).InfoS("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod()), "podGroup", klog.KObj(pg)) + waitingPod.Reject(f.Name(), "rejection in Unreserve") + } + }) + f.pgMgr.DeletePermittedPodGroup(pgName) } diff --git a/sig-scheduler-plugins/pkg/fluence/labels/labels.go b/sig-scheduler-plugins/pkg/fluence/labels/labels.go index e377d97..f955d67 100644 --- a/sig-scheduler-plugins/pkg/fluence/labels/labels.go +++ b/sig-scheduler-plugins/pkg/fluence/labels/labels.go @@ -1,14 +1,33 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package labels import ( + "fmt" "time" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // Labels to be shared between different components const ( + // We use the same label to be consistent // https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/apis/scheduling/v1alpha1/types.go#L109 PodGroupLabel = "scheduling.x-k8s.io/pod-group" @@ -16,10 +35,29 @@ const ( //PodGroupNameLabel = "fluence.pod-group" PodGroupSizeLabel = "fluence.group-size" - // Internal use + // Internal use (not used yet) PodGroupTimeCreated = "flunce.created-at" ) +// GetPodGroupLabel get pod group name from pod labels +func GetPodGroupLabel(pod *v1.Pod) string { + return pod.Labels[PodGroupLabel] +} + +// GetPodGroupFullName get namespaced group name from pod labels +func GetPodGroupFullName(pod *v1.Pod) string { + pgName := GetPodGroupLabel(pod) + if len(pgName) == 0 { + return "" + } + return fmt.Sprintf("%v/%v", pod.Namespace, pgName) +} + +// GetPodGroupSize gets the pod group size from the label +func GetPodGroupSize(pod *v1.Pod) string { + return pod.Labels[PodGroupSizeLabel] +} + // getTimeCreated returns the timestamp when we saw the object func GetTimeCreated() string {