diff --git a/sig-scheduler-plugins/manifests/fluence/configmap.yaml b/sig-scheduler-plugins/manifests/fluence/configmap.yaml new file mode 100644 index 0000000..21ffacc --- /dev/null +++ b/sig-scheduler-plugins/manifests/fluence/configmap.yaml @@ -0,0 +1,23 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: scheduler-config + namespace: scheduler-plugins +data: + scheduler-config.yaml: | + apiVersion: kubescheduler.config.k8s.io/v1beta3 + kind: KubeSchedulerConfiguration + leaderElection: + leaderElect: false + profiles: + - schedulerName: fluence + plugins: + preFilter: + enabled: + - name: Fluence + filter: + enabled: + - name: Fluence + score: + disabled: + - name: '*' \ No newline at end of file diff --git a/sig-scheduler-plugins/manifests/fluence/deploy.yaml b/sig-scheduler-plugins/manifests/fluence/deploy.yaml new file mode 100644 index 0000000..92e39b0 --- /dev/null +++ b/sig-scheduler-plugins/manifests/fluence/deploy.yaml @@ -0,0 +1,45 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: fluence + namespace: scheduler-plugins +spec: + replicas: 1 + selector: + matchLabels: + component: scheduler + template: + metadata: + labels: + component: scheduler + spec: + serviceAccountName: scheduler-plugins + containers: + - image: quay.io/cmisale1/fluence-sidecar:latest + imagePullPolicy: Always + command: + - /go/src/fluence/bin/server + - --policy=lonode + name: fluence-sidecar + - image: quay.io/cmisale1/fluence:dev + imagePullPolicy: Always + command: + - /bin/kube-scheduler + - --config=/etc/kubernetes/scheduler-config.yaml + - -v=9 + name: fluence + resources: + requests: + cpu: '0.1' + securityContext: + privileged: false + volumeMounts: + - mountPath: /etc/kubernetes + name: scheduler-config + hostNetwork: false + hostPID: false + volumes: + - name: scheduler-config + configMap: + name: scheduler-config + diff --git a/sig-scheduler-plugins/manifests/fluence/rbac.yaml b/sig-scheduler-plugins/manifests/fluence/rbac.yaml new file mode 100644 index 0000000..3416e18 --- /dev/null +++ b/sig-scheduler-plugins/manifests/fluence/rbac.yaml @@ -0,0 +1,82 @@ +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: scheduler-plugins +rules: +- apiGroups: [""] + resources: ["namespaces", "configmaps"] + verbs: ["get", "list", "watch"] +- apiGroups: ["", "events.k8s.io"] + resources: ["events"] + verbs: ["create", "patch", "update"] +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["create"] +- apiGroups: ["coordination.k8s.io"] + resourceNames: ["kube-scheduler"] + resources: ["leases"] + verbs: ["get", "update"] +- apiGroups: [""] + resources: ["endpoints"] + verbs: ["create"] +- apiGroups: [""] + resourceNames: ["kube-scheduler"] + resources: ["endpoints"] + verbs: ["get", "update"] +- apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list", "watch", "patch"] +- apiGroups: [""] + resources: ["pods"] + verbs: ["delete", "get", "list", "watch", "update"] +- apiGroups: [""] + resources: ["bindings", "pods/binding"] + verbs: ["create"] +- apiGroups: [""] + resources: ["pods/status"] + verbs: ["patch", "update"] +- apiGroups: [""] + resources: ["replicationcontrollers", "services"] + verbs: ["get", "list", "watch"] +- apiGroups: ["apps", "extensions"] + resources: ["replicasets"] + verbs: ["get", "list", "watch"] +- apiGroups: ["apps"] + resources: ["statefulsets"] + verbs: ["get", "list", "watch"] +- apiGroups: ["policy"] + resources: ["poddisruptionbudgets"] + verbs: ["get", "list", "watch"] +- apiGroups: [""] + resources: ["persistentvolumeclaims", "persistentvolumes"] + verbs: ["get", "list", "watch", "patch", "update"] +- apiGroups: ["authentication.k8s.io"] + resources: ["tokenreviews"] + verbs: ["create"] +- apiGroups: ["authorization.k8s.io"] + resources: ["subjectaccessreviews"] + verbs: ["create"] +- apiGroups: ["storage.k8s.io"] + resources: ["csinodes", "storageclasses" , "csidrivers" , "csistoragecapacities"] + verbs: ["get", "list", "watch"] +- apiGroups: ["topology.node.k8s.io"] + resources: ["noderesourcetopologies"] + verbs: ["*"] +# resources need to be updated with the scheduler plugins used +- apiGroups: ["scheduling.sigs.k8s.io"] + resources: ["podgroups", "elasticquotas"] + verbs: ["get", "list", "watch", "create", "delete", "update", "patch"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: scheduler-plugins + namespace: scheduler-plugins +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: scheduler-plugins +subjects: + - kind: ServiceAccount + name: scheduler-plugins + namespace: scheduler-plugins diff --git a/sig-scheduler-plugins/manifests/fluence/scheduling.sigs.k8s.io_podgroups.yaml b/sig-scheduler-plugins/manifests/fluence/scheduling.sigs.k8s.io_podgroups.yaml new file mode 120000 index 0000000..7f8408e --- /dev/null +++ b/sig-scheduler-plugins/manifests/fluence/scheduling.sigs.k8s.io_podgroups.yaml @@ -0,0 +1 @@ +../coscheduling/crd.yaml \ No newline at end of file diff --git a/sig-scheduler-plugins/manifests/fluence/serviceaccount.yaml b/sig-scheduler-plugins/manifests/fluence/serviceaccount.yaml new file mode 100644 index 0000000..fface49 --- /dev/null +++ b/sig-scheduler-plugins/manifests/fluence/serviceaccount.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: scheduler-plugins +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: scheduler-plugins + namespace: scheduler-plugins \ No newline at end of file diff --git a/sig-scheduler-plugins/pkg/fluence/README.md b/sig-scheduler-plugins/pkg/fluence/README.md new file mode 100644 index 0000000..61f4923 --- /dev/null +++ b/sig-scheduler-plugins/pkg/fluence/README.md @@ -0,0 +1,29 @@ +# Overview + +Project to manage Flux tasks needed to standardize kubernetes HPC scheduling interfaces + +## Installing the chart + +More detail will be added here about installing the chart. You will +be using the [install-as-a-second-scheduler](https://github.com/kubernetes-sigs/scheduler-plugins/tree/master/manifests/install/charts/as-a-second-scheduler) +charts. Fluence-specific values are detailed below. + +### Fluence specific values + +In `values.yaml` it is possible to customize the container image, already defaulted to the latest release, and the allocation policy +used by the scheduler. +Most common options are: + +- `lonode`: choose the nodes with lower ID first. Can be compared to packing +- `low`: choose cores with lowest IDs from multiple nodes. Can be compared to spread process-to-resource placement + +## Maturity Level + + + +- [x] Sample (for demonstrating and inspiring purpose) +- [ ] Alpha (used in companies for pilot projects) +- [ ] Beta (used in companies and developed actively) +- [ ] Stable (used in companies for production workloads) + + diff --git a/sig-scheduler-plugins/pkg/fluence/core/core.go b/sig-scheduler-plugins/pkg/fluence/core/core.go new file mode 100644 index 0000000..11c90ef --- /dev/null +++ b/sig-scheduler-plugins/pkg/fluence/core/core.go @@ -0,0 +1,99 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "fmt" + + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" +) + +type FluxStateData struct { + NodeName string +} + +func (s *FluxStateData) Clone() framework.StateData { + clone := &FluxStateData{ + NodeName: s.NodeName, + } + return clone +} + +type NodePodsCount struct { + NodeName string + Count int +} + +var podgroupMap map[string][]NodePodsCount + +func Init() { + podgroupMap = make(map[string][]NodePodsCount, 0) +} + +func (n *NodePodsCount) Clone() framework.StateData { + return &NodePodsCount{ + NodeName: n.NodeName, + Count: n.Count, + } +} + +func CreateNodePodsList(nodelist []*pb.NodeAlloc, pgname string) (nodepods []NodePodsCount) { + nodepods = make([]NodePodsCount, len(nodelist)) + for i, v := range nodelist { + nodepods[i] = NodePodsCount{ + NodeName: v.GetNodeID(), + Count: int(v.GetTasks()), + } + } + podgroupMap[pgname] = nodepods + klog.Info("MAP ", podgroupMap) + + return +} + +func HaveList(pgname string) bool { + _, exists := podgroupMap[pgname] + return exists +} + +func GetNextNode(pgname string) (string, error) { + entry, ok := podgroupMap[pgname] + if !ok { + err := fmt.Errorf("Map is empty") + return "", err + } + if len(entry) == 0 { + err := fmt.Errorf("Error while getting a node") + return "", err + } + + nodename := entry[0].NodeName + + if entry[0].Count == 1 { + slice := entry[1:] + if len(slice) == 0 { + delete(podgroupMap, pgname) + return nodename, nil + } + podgroupMap[pgname] = slice + return nodename, nil + } + entry[0].Count = entry[0].Count - 1 + return nodename, nil +} diff --git a/sig-scheduler-plugins/pkg/fluence/fluence.go b/sig-scheduler-plugins/pkg/fluence/fluence.go new file mode 100644 index 0000000..fec0a35 --- /dev/null +++ b/sig-scheduler-plugins/pkg/fluence/fluence.go @@ -0,0 +1,396 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fluence + +import ( + "context" + "fmt" + "os" + "sync" + "time" + + "google.golang.org/grpc" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + clientscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" + corev1helpers "k8s.io/component-helpers/scheduling/corev1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/metrics" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" + coschedulingcore "sigs.k8s.io/scheduler-plugins/pkg/coscheduling/core" + fcore "sigs.k8s.io/scheduler-plugins/pkg/fluence/core" + pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" + "sigs.k8s.io/scheduler-plugins/pkg/fluence/utils" +) + +type Fluence struct { + mutex sync.Mutex + handle framework.Handle + podNameToJobId map[string]uint64 + pgMgr coschedulingcore.Manager +} + +var _ framework.QueueSortPlugin = &Fluence{} +var _ framework.PreFilterPlugin = &Fluence{} +var _ framework.FilterPlugin = &Fluence{} + +// Name is the name of the plugin used in the Registry and configurations. +const Name = "Fluence" + +func (f *Fluence) Name() string { + return Name +} + +// initialize and return a new Flux Plugin +// Note from vsoch: seems analogous to: +// https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/coscheduling/coscheduling.go#L63 +func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { + + f := &Fluence{handle: handle, podNameToJobId: make(map[string]uint64)} + klog.Info("Create plugin") + ctx := context.TODO() + fcore.Init() + + fluxPodsInformer := handle.SharedInformerFactory().Core().V1().Pods().Informer() + fluxPodsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: f.updatePod, + DeleteFunc: f.deletePod, + }) + + go fluxPodsInformer.Run(ctx.Done()) + klog.Info("Create generic pod informer") + + scheme := runtime.NewScheme() + _ = clientscheme.AddToScheme(scheme) + _ = v1.AddToScheme(scheme) + _ = v1alpha1.AddToScheme(scheme) + client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme}) + if err != nil { + return nil, err + } + + fieldSelector, err := fields.ParseSelector(",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) + if err != nil { + klog.ErrorS(err, "ParseSelector failed") + os.Exit(1) + } + informerFactory := informers.NewSharedInformerFactoryWithOptions(handle.ClientSet(), 0, informers.WithTweakListOptions(func(opt *metav1.ListOptions) { + opt.FieldSelector = fieldSelector.String() + })) + podInformer := informerFactory.Core().V1().Pods() + + scheduleTimeDuration := time.Duration(500) * time.Second + + pgMgr := coschedulingcore.NewPodGroupManager( + client, + handle.SnapshotSharedLister(), + &scheduleTimeDuration, + podInformer, + ) + f.pgMgr = pgMgr + + // stopCh := make(chan struct{}) + // defer close(stopCh) + // informerFactory.Start(stopCh) + informerFactory.Start(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { + err := fmt.Errorf("WaitForCacheSync failed") + klog.ErrorS(err, "Cannot sync caches") + return nil, err + } + + klog.Info("Fluence starts") + return f, nil +} + +// Less is used to sort pods in the scheduling queue in the following order. +// 1. Compare the priorities of Pods. +// 2. Compare the initialization timestamps of PodGroups or Pods. +// 3. Compare the keys of PodGroups/Pods: /. +func (f *Fluence) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { + klog.Infof("ordering pods from Coscheduling") + prio1 := corev1helpers.PodPriority(podInfo1.Pod) + prio2 := corev1helpers.PodPriority(podInfo2.Pod) + if prio1 != prio2 { + return prio1 > prio2 + } + creationTime1 := f.pgMgr.GetCreationTimestamp(podInfo1.Pod, *podInfo1.InitialAttemptTimestamp) + creationTime2 := f.pgMgr.GetCreationTimestamp(podInfo2.Pod, *podInfo2.InitialAttemptTimestamp) + if creationTime1.Equal(creationTime2) { + return coschedulingcore.GetNamespacedName(podInfo1.Pod) < coschedulingcore.GetNamespacedName(podInfo2.Pod) + } + return creationTime1.Before(creationTime2) +} + +func (f *Fluence) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + klog.Infof("Examining the pod") + var err error + var nodename string + if pgname, ok := f.isGroup(ctx, pod); ok { + if !fcore.HaveList(pgname) { + klog.Infof("Getting a pod group") + groupSize, _ := f.groupPreFilter(ctx, pod) + if _, err = f.AskFlux(ctx, pod, groupSize); err != nil { + return nil, framework.NewStatus(framework.Unschedulable, err.Error()) + } + } + nodename, err = fcore.GetNextNode(pgname) + klog.Infof("Node Selected %s (%s:%s)", nodename, pod.Name, pgname) + if err != nil { + return nil, framework.NewStatus(framework.Unschedulable, err.Error()) + } + } else { + nodename, err = f.AskFlux(ctx, pod, 1) + if err != nil { + return nil, framework.NewStatus(framework.Unschedulable, err.Error()) + } + } + + klog.Info("Node Selected: ", nodename) + state.Write(framework.StateKey(pod.Name), &fcore.FluxStateData{NodeName: nodename}) + return nil, framework.NewStatus(framework.Success, "") + +} + +func (f *Fluence) isGroup(ctx context.Context, pod *v1.Pod) (string, bool) { + pgFullName, pg := f.pgMgr.GetPodGroup(ctx, pod) + if pg == nil { + klog.InfoS("Not in group", "pod", klog.KObj(pod)) + return "", false + } + return pgFullName, true +} + +func (f *Fluence) groupPreFilter(ctx context.Context, pod *v1.Pod) (int, error) { + // klog.InfoS("Flux Pre-Filter", "pod", klog.KObj(pod)) + klog.InfoS("Flux Pre-Filter", "pod labels", pod.Labels) + _, pg := f.pgMgr.GetPodGroup(ctx, pod) + if pg == nil { + klog.InfoS("Not in group", "pod", klog.KObj(pod)) + return 0, nil + } + + klog.Info("pod group members ", pg.Spec.MinMember) + return int(pg.Spec.MinMember), nil +} + +func (f *Fluence) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + klog.Info("Filtering input node ", nodeInfo.Node().Name) + if v, e := cycleState.Read(framework.StateKey(pod.Name)); e == nil { + if value, ok := v.(*fcore.FluxStateData); ok && value.NodeName != nodeInfo.Node().Name { + return framework.NewStatus(framework.Unschedulable, "pod is not permitted") + } else { + klog.Info("Filter: node selected by Flux ", value.NodeName) + } + } + + return framework.NewStatus(framework.Success) +} + +func (f *Fluence) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +func (f *Fluence) AskFlux(ctx context.Context, pod *v1.Pod, count int) (string, error) { + // clean up previous match if a pod has already allocated previously + f.mutex.Lock() + _, isPodAllocated := f.podNameToJobId[pod.Name] + f.mutex.Unlock() + + if isPodAllocated { + klog.Info("Clean up previous allocation") + f.mutex.Lock() + f.cancelFluxJobForPod(pod.Name) + f.mutex.Unlock() + } + + jobspec := utils.InspectPodInfo(pod) + conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) + + if err != nil { + klog.Errorf("[FluxClient] Error connecting to server: %v", err) + return "", err + } + defer conn.Close() + + grpcclient := pb.NewFluxcliServiceClient(conn) + _, cancel := context.WithTimeout(context.Background(), 200*time.Second) + defer cancel() + + request := &pb.MatchRequest{ + Ps: jobspec, + Request: "allocate", + Count: int32(count)} + + r, err2 := grpcclient.Match(context.Background(), request) + if err2 != nil { + klog.Errorf("[FluxClient] did not receive any match response: %v", err2) + return "", err + } + + klog.Infof("[FluxClient] response podID %s", r.GetPodID()) + + _, ok := f.isGroup(ctx, pod) + if count > 1 || ok { + pgFullName, _ := f.pgMgr.GetPodGroup(ctx, pod) + nodelist := fcore.CreateNodePodsList(r.GetNodelist(), pgFullName) + klog.Infof("[FluxClient] response nodeID %s", r.GetNodelist()) + klog.Info("[FluxClient] Parsed Nodelist ", nodelist) + jobid := uint64(r.GetJobID()) + + f.mutex.Lock() + f.podNameToJobId[pod.Name] = jobid + klog.Info("Check job set: ", f.podNameToJobId) + f.mutex.Unlock() + } else { + nodename := r.GetNodelist()[0].GetNodeID() + jobid := uint64(r.GetJobID()) + + f.mutex.Lock() + f.podNameToJobId[pod.Name] = jobid + klog.Info("Check job set: ", f.podNameToJobId) + f.mutex.Unlock() + + return nodename, nil + } + + return "", nil +} + +func (f *Fluence) cancelFluxJobForPod(podName string) error { + jobid := f.podNameToJobId[podName] + + klog.Infof("Cancel flux job: %v for pod %s", jobid, podName) + + start := time.Now() + + conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) + + if err != nil { + klog.Errorf("[FluxClient] Error connecting to server: %v", err) + return err + } + defer conn.Close() + + grpcclient := pb.NewFluxcliServiceClient(conn) + _, cancel := context.WithTimeout(context.Background(), 200*time.Second) + defer cancel() + + request := &pb.CancelRequest{ + JobID: int64(jobid), + } + + res, err := grpcclient.Cancel(context.Background(), request) + if err != nil { + klog.Errorf("[FluxClient] did not receive any cancel response: %v", err) + return err + } + + if res.Error == 0 { + delete(f.podNameToJobId, podName) + } else { + klog.Warningf("Failed to delete pod %s from the podname-jobid map.", podName) + } + + elapsed := metrics.SinceInSeconds(start) + klog.Info("Time elapsed (Cancel Job) :", elapsed) + + klog.Infof("Job cancellation for pod %s result: %d", podName, err) + if klog.V(2).Enabled() { + klog.Info("Check job set: after delete") + klog.Info(f.podNameToJobId) + } + return nil +} + +// EventHandlers +func (f *Fluence) updatePod(oldObj, newObj interface{}) { + // klog.Info("Update Pod event handler") + newPod := newObj.(*v1.Pod) + klog.Infof("Processing event for pod %s", newPod) + switch newPod.Status.Phase { + case v1.PodPending: + // in this state we don't know if a pod is going to be running, thus we don't need to update job map + case v1.PodRunning: + // if a pod is start running, we can add it state to the delta graph if it is scheduled by other scheduler + case v1.PodSucceeded: + klog.Infof("Pod %s succeeded, Fluence needs to free the resources", newPod.Name) + + f.mutex.Lock() + defer f.mutex.Unlock() + + if _, ok := f.podNameToJobId[newPod.Name]; ok { + f.cancelFluxJobForPod(newPod.Name) + } else { + klog.Infof("Succeeded pod %s/%s doesn't have flux jobid", newPod.Namespace, newPod.Name) + } + case v1.PodFailed: + // a corner case need to be tested, the pod exit code is not 0, can be created with segmentation fault pi test + klog.Warningf("Pod %s failed, Fluence needs to free the resources", newPod.Name) + + f.mutex.Lock() + defer f.mutex.Unlock() + + if _, ok := f.podNameToJobId[newPod.Name]; ok { + f.cancelFluxJobForPod(newPod.Name) + } else { + klog.Errorf("Failed pod %s/%s doesn't have flux jobid", newPod.Namespace, newPod.Name) + } + case v1.PodUnknown: + // don't know how to deal with it as it's unknown phase + default: + // shouldn't enter this branch + } +} + +func (f *Fluence) deletePod(podObj interface{}) { + klog.Info("Delete Pod event handler") + + pod := podObj.(*v1.Pod) + klog.Info("Pod status: ", pod.Status.Phase) + switch pod.Status.Phase { + case v1.PodSucceeded: + case v1.PodPending: + klog.Infof("Pod %s completed and is Pending termination, Fluence needs to free the resources", pod.Name) + + f.mutex.Lock() + defer f.mutex.Unlock() + + if _, ok := f.podNameToJobId[pod.Name]; ok { + f.cancelFluxJobForPod(pod.Name) + } else { + klog.Infof("Terminating pod %s/%s doesn't have flux jobid", pod.Namespace, pod.Name) + } + case v1.PodRunning: + f.mutex.Lock() + defer f.mutex.Unlock() + + if _, ok := f.podNameToJobId[pod.Name]; ok { + f.cancelFluxJobForPod(pod.Name) + } else { + klog.Infof("Deleted pod %s/%s doesn't have flux jobid", pod.Namespace, pod.Name) + } + } +} diff --git a/sig-scheduler-plugins/pkg/fluence/fluxcli-grpc/fluxcli.pb.go b/sig-scheduler-plugins/pkg/fluence/fluxcli-grpc/fluxcli.pb.go new file mode 100644 index 0000000..e317af2 --- /dev/null +++ b/sig-scheduler-plugins/pkg/fluence/fluxcli-grpc/fluxcli.pb.go @@ -0,0 +1,838 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.15.8 +// source: fluence/fluxcli-grpc/fluxcli.proto + +package fluxcli + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PodSpec struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Container string `protobuf:"bytes,2,opt,name=container,proto3" json:"container,omitempty"` + Cpu int32 `protobuf:"varint,3,opt,name=cpu,proto3" json:"cpu,omitempty"` + Memory int64 `protobuf:"varint,4,opt,name=memory,proto3" json:"memory,omitempty"` + Gpu int64 `protobuf:"varint,5,opt,name=gpu,proto3" json:"gpu,omitempty"` + Storage int64 `protobuf:"varint,6,opt,name=storage,proto3" json:"storage,omitempty"` + Labels []string `protobuf:"bytes,7,rep,name=labels,proto3" json:"labels,omitempty"` +} + +func (x *PodSpec) Reset() { + *x = PodSpec{} + if protoimpl.UnsafeEnabled { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PodSpec) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PodSpec) ProtoMessage() {} + +func (x *PodSpec) ProtoReflect() protoreflect.Message { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PodSpec.ProtoReflect.Descriptor instead. +func (*PodSpec) Descriptor() ([]byte, []int) { + return file_fluence_fluxcli_grpc_fluxcli_proto_rawDescGZIP(), []int{0} +} + +func (x *PodSpec) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *PodSpec) GetContainer() string { + if x != nil { + return x.Container + } + return "" +} + +func (x *PodSpec) GetCpu() int32 { + if x != nil { + return x.Cpu + } + return 0 +} + +func (x *PodSpec) GetMemory() int64 { + if x != nil { + return x.Memory + } + return 0 +} + +func (x *PodSpec) GetGpu() int64 { + if x != nil { + return x.Gpu + } + return 0 +} + +func (x *PodSpec) GetStorage() int64 { + if x != nil { + return x.Storage + } + return 0 +} + +func (x *PodSpec) GetLabels() []string { + if x != nil { + return x.Labels + } + return nil +} + +// The Match request message (allocate, allocate_orelse_reserve) +type MatchRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Ps *PodSpec `protobuf:"bytes,1,opt,name=ps,proto3" json:"ps,omitempty"` + Request string `protobuf:"bytes,2,opt,name=request,proto3" json:"request,omitempty"` + Count int32 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"` +} + +func (x *MatchRequest) Reset() { + *x = MatchRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MatchRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MatchRequest) ProtoMessage() {} + +func (x *MatchRequest) ProtoReflect() protoreflect.Message { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MatchRequest.ProtoReflect.Descriptor instead. +func (*MatchRequest) Descriptor() ([]byte, []int) { + return file_fluence_fluxcli_grpc_fluxcli_proto_rawDescGZIP(), []int{1} +} + +func (x *MatchRequest) GetPs() *PodSpec { + if x != nil { + return x.Ps + } + return nil +} + +func (x *MatchRequest) GetRequest() string { + if x != nil { + return x.Request + } + return "" +} + +func (x *MatchRequest) GetCount() int32 { + if x != nil { + return x.Count + } + return 0 +} + +// The Nodes/Cluster Update Status +type NodeAlloc struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + NodeID string `protobuf:"bytes,1,opt,name=nodeID,proto3" json:"nodeID,omitempty"` + Tasks int32 `protobuf:"varint,2,opt,name=tasks,proto3" json:"tasks,omitempty"` +} + +func (x *NodeAlloc) Reset() { + *x = NodeAlloc{} + if protoimpl.UnsafeEnabled { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NodeAlloc) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NodeAlloc) ProtoMessage() {} + +func (x *NodeAlloc) ProtoReflect() protoreflect.Message { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NodeAlloc.ProtoReflect.Descriptor instead. +func (*NodeAlloc) Descriptor() ([]byte, []int) { + return file_fluence_fluxcli_grpc_fluxcli_proto_rawDescGZIP(), []int{2} +} + +func (x *NodeAlloc) GetNodeID() string { + if x != nil { + return x.NodeID + } + return "" +} + +func (x *NodeAlloc) GetTasks() int32 { + if x != nil { + return x.Tasks + } + return 0 +} + +// The Match response message +type MatchResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PodID string `protobuf:"bytes,1,opt,name=podID,proto3" json:"podID,omitempty"` + Nodelist []*NodeAlloc `protobuf:"bytes,2,rep,name=nodelist,proto3" json:"nodelist,omitempty"` + JobID int64 `protobuf:"varint,3,opt,name=jobID,proto3" json:"jobID,omitempty"` +} + +func (x *MatchResponse) Reset() { + *x = MatchResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MatchResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MatchResponse) ProtoMessage() {} + +func (x *MatchResponse) ProtoReflect() protoreflect.Message { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MatchResponse.ProtoReflect.Descriptor instead. +func (*MatchResponse) Descriptor() ([]byte, []int) { + return file_fluence_fluxcli_grpc_fluxcli_proto_rawDescGZIP(), []int{3} +} + +func (x *MatchResponse) GetPodID() string { + if x != nil { + return x.PodID + } + return "" +} + +func (x *MatchResponse) GetNodelist() []*NodeAlloc { + if x != nil { + return x.Nodelist + } + return nil +} + +func (x *MatchResponse) GetJobID() int64 { + if x != nil { + return x.JobID + } + return 0 +} + +type CancelRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + JobID int64 `protobuf:"varint,2,opt,name=jobID,proto3" json:"jobID,omitempty"` +} + +func (x *CancelRequest) Reset() { + *x = CancelRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CancelRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CancelRequest) ProtoMessage() {} + +func (x *CancelRequest) ProtoReflect() protoreflect.Message { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CancelRequest.ProtoReflect.Descriptor instead. +func (*CancelRequest) Descriptor() ([]byte, []int) { + return file_fluence_fluxcli_grpc_fluxcli_proto_rawDescGZIP(), []int{4} +} + +func (x *CancelRequest) GetJobID() int64 { + if x != nil { + return x.JobID + } + return 0 +} + +// The Match response message +type CancelResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + JobID int64 `protobuf:"varint,1,opt,name=jobID,proto3" json:"jobID,omitempty"` + Error int32 `protobuf:"varint,2,opt,name=error,proto3" json:"error,omitempty"` +} + +func (x *CancelResponse) Reset() { + *x = CancelResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CancelResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CancelResponse) ProtoMessage() {} + +func (x *CancelResponse) ProtoReflect() protoreflect.Message { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CancelResponse.ProtoReflect.Descriptor instead. +func (*CancelResponse) Descriptor() ([]byte, []int) { + return file_fluence_fluxcli_grpc_fluxcli_proto_rawDescGZIP(), []int{5} +} + +func (x *CancelResponse) GetJobID() int64 { + if x != nil { + return x.JobID + } + return 0 +} + +func (x *CancelResponse) GetError() int32 { + if x != nil { + return x.Error + } + return 0 +} + +// The Nodes/Cluster Update Status +type NodeStatus struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CpuAvail int32 `protobuf:"varint,1,opt,name=cpuAvail,proto3" json:"cpuAvail,omitempty"` + GpuAvail int32 `protobuf:"varint,2,opt,name=gpuAvail,proto3" json:"gpuAvail,omitempty"` + StorageAvail int64 `protobuf:"varint,3,opt,name=storageAvail,proto3" json:"storageAvail,omitempty"` + MemoryAvail int64 `protobuf:"varint,4,opt,name=memoryAvail,proto3" json:"memoryAvail,omitempty"` + AllowedPods int64 `protobuf:"varint,5,opt,name=allowedPods,proto3" json:"allowedPods,omitempty"` + NodeIP string `protobuf:"bytes,6,opt,name=nodeIP,proto3" json:"nodeIP,omitempty"` + Replication int32 `protobuf:"varint,7,opt,name=replication,proto3" json:"replication,omitempty"` +} + +func (x *NodeStatus) Reset() { + *x = NodeStatus{} + if protoimpl.UnsafeEnabled { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NodeStatus) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NodeStatus) ProtoMessage() {} + +func (x *NodeStatus) ProtoReflect() protoreflect.Message { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NodeStatus.ProtoReflect.Descriptor instead. +func (*NodeStatus) Descriptor() ([]byte, []int) { + return file_fluence_fluxcli_grpc_fluxcli_proto_rawDescGZIP(), []int{6} +} + +func (x *NodeStatus) GetCpuAvail() int32 { + if x != nil { + return x.CpuAvail + } + return 0 +} + +func (x *NodeStatus) GetGpuAvail() int32 { + if x != nil { + return x.GpuAvail + } + return 0 +} + +func (x *NodeStatus) GetStorageAvail() int64 { + if x != nil { + return x.StorageAvail + } + return 0 +} + +func (x *NodeStatus) GetMemoryAvail() int64 { + if x != nil { + return x.MemoryAvail + } + return 0 +} + +func (x *NodeStatus) GetAllowedPods() int64 { + if x != nil { + return x.AllowedPods + } + return 0 +} + +func (x *NodeStatus) GetNodeIP() string { + if x != nil { + return x.NodeIP + } + return "" +} + +func (x *NodeStatus) GetReplication() int32 { + if x != nil { + return x.Replication + } + return 0 +} + +// The JGF response message +type JGFRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Jgf string `protobuf:"bytes,1,opt,name=jgf,proto3" json:"jgf,omitempty"` +} + +func (x *JGFRequest) Reset() { + *x = JGFRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *JGFRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JGFRequest) ProtoMessage() {} + +func (x *JGFRequest) ProtoReflect() protoreflect.Message { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JGFRequest.ProtoReflect.Descriptor instead. +func (*JGFRequest) Descriptor() ([]byte, []int) { + return file_fluence_fluxcli_grpc_fluxcli_proto_rawDescGZIP(), []int{7} +} + +func (x *JGFRequest) GetJgf() string { + if x != nil { + return x.Jgf + } + return "" +} + +// The JGF response message +type JGFResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Jgf string `protobuf:"bytes,1,opt,name=jgf,proto3" json:"jgf,omitempty"` +} + +func (x *JGFResponse) Reset() { + *x = JGFResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *JGFResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JGFResponse) ProtoMessage() {} + +func (x *JGFResponse) ProtoReflect() protoreflect.Message { + mi := &file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JGFResponse.ProtoReflect.Descriptor instead. +func (*JGFResponse) Descriptor() ([]byte, []int) { + return file_fluence_fluxcli_grpc_fluxcli_proto_rawDescGZIP(), []int{8} +} + +func (x *JGFResponse) GetJgf() string { + if x != nil { + return x.Jgf + } + return "" +} + +var File_fluence_fluxcli_grpc_fluxcli_proto protoreflect.FileDescriptor + +var file_fluence_fluxcli_grpc_fluxcli_proto_rawDesc = []byte{ + 0x0a, 0x22, 0x66, 0x6c, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x2f, 0x66, 0x6c, 0x75, 0x78, 0x63, 0x6c, + 0x69, 0x2d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x66, 0x6c, 0x75, 0x78, 0x63, 0x6c, 0x69, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x66, 0x6c, 0x75, 0x78, 0x63, 0x6c, 0x69, 0x22, 0xa5, 0x01, + 0x0a, 0x07, 0x50, 0x6f, 0x64, 0x53, 0x70, 0x65, 0x63, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6f, 0x6e, + 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6f, + 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03, 0x63, 0x70, 0x75, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x63, 0x70, 0x75, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x6d, + 0x6f, 0x72, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6d, 0x65, 0x6d, 0x6f, 0x72, + 0x79, 0x12, 0x10, 0x0a, 0x03, 0x67, 0x70, 0x75, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, + 0x67, 0x70, 0x75, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x16, 0x0a, + 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x6c, + 0x61, 0x62, 0x65, 0x6c, 0x73, 0x22, 0x60, 0x0a, 0x0c, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x20, 0x0a, 0x02, 0x70, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x10, 0x2e, 0x66, 0x6c, 0x75, 0x78, 0x63, 0x6c, 0x69, 0x2e, 0x50, 0x6f, 0x64, 0x53, + 0x70, 0x65, 0x63, 0x52, 0x02, 0x70, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, + 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x39, 0x0a, 0x09, 0x4e, 0x6f, 0x64, 0x65, 0x41, + 0x6c, 0x6c, 0x6f, 0x63, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, + 0x74, 0x61, 0x73, 0x6b, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x74, 0x61, 0x73, + 0x6b, 0x73, 0x22, 0x6b, 0x0a, 0x0d, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x6f, 0x64, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x70, 0x6f, 0x64, 0x49, 0x44, 0x12, 0x2e, 0x0a, 0x08, 0x6e, 0x6f, 0x64, + 0x65, 0x6c, 0x69, 0x73, 0x74, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x66, 0x6c, + 0x75, 0x78, 0x63, 0x6c, 0x69, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x6c, 0x6c, 0x6f, 0x63, 0x52, + 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x6c, 0x69, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6a, 0x6f, 0x62, + 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x44, 0x22, + 0x25, 0x0a, 0x0d, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x14, 0x0a, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x44, 0x22, 0x3c, 0x0a, 0x0e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6a, 0x6f, 0x62, 0x49, + 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x44, 0x12, 0x14, + 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x22, 0xe6, 0x01, 0x0a, 0x0a, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x70, 0x75, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x63, 0x70, 0x75, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x12, + 0x1a, 0x0a, 0x08, 0x67, 0x70, 0x75, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x08, 0x67, 0x70, 0x75, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x12, 0x22, 0x0a, 0x0c, 0x73, + 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x12, + 0x20, 0x0a, 0x0b, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x41, 0x76, 0x61, 0x69, + 0x6c, 0x12, 0x20, 0x0a, 0x0b, 0x61, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x64, 0x50, 0x6f, 0x64, 0x73, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x64, 0x50, + 0x6f, 0x64, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x50, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x50, 0x12, 0x20, 0x0a, 0x0b, 0x72, + 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, + 0x52, 0x0b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x1e, 0x0a, + 0x0a, 0x4a, 0x47, 0x46, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6a, + 0x67, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6a, 0x67, 0x66, 0x22, 0x1f, 0x0a, + 0x0b, 0x4a, 0x47, 0x46, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, + 0x6a, 0x67, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6a, 0x67, 0x66, 0x32, 0x87, + 0x01, 0x0a, 0x0e, 0x46, 0x6c, 0x75, 0x78, 0x63, 0x6c, 0x69, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x12, 0x38, 0x0a, 0x05, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x12, 0x15, 0x2e, 0x66, 0x6c, 0x75, + 0x78, 0x63, 0x6c, 0x69, 0x2e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x16, 0x2e, 0x66, 0x6c, 0x75, 0x78, 0x63, 0x6c, 0x69, 0x2e, 0x4d, 0x61, 0x74, 0x63, + 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x06, 0x43, + 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x12, 0x16, 0x2e, 0x66, 0x6c, 0x75, 0x78, 0x63, 0x6c, 0x69, 0x2e, + 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, + 0x66, 0x6c, 0x75, 0x78, 0x63, 0x6c, 0x69, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0e, 0x5a, 0x0c, 0x67, 0x72, 0x70, 0x63, + 0x2f, 0x66, 0x6c, 0x75, 0x78, 0x63, 0x6c, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_fluence_fluxcli_grpc_fluxcli_proto_rawDescOnce sync.Once + file_fluence_fluxcli_grpc_fluxcli_proto_rawDescData = file_fluence_fluxcli_grpc_fluxcli_proto_rawDesc +) + +func file_fluence_fluxcli_grpc_fluxcli_proto_rawDescGZIP() []byte { + file_fluence_fluxcli_grpc_fluxcli_proto_rawDescOnce.Do(func() { + file_fluence_fluxcli_grpc_fluxcli_proto_rawDescData = protoimpl.X.CompressGZIP(file_fluence_fluxcli_grpc_fluxcli_proto_rawDescData) + }) + return file_fluence_fluxcli_grpc_fluxcli_proto_rawDescData +} + +var file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_fluence_fluxcli_grpc_fluxcli_proto_goTypes = []interface{}{ + (*PodSpec)(nil), // 0: fluxcli.PodSpec + (*MatchRequest)(nil), // 1: fluxcli.MatchRequest + (*NodeAlloc)(nil), // 2: fluxcli.NodeAlloc + (*MatchResponse)(nil), // 3: fluxcli.MatchResponse + (*CancelRequest)(nil), // 4: fluxcli.CancelRequest + (*CancelResponse)(nil), // 5: fluxcli.CancelResponse + (*NodeStatus)(nil), // 6: fluxcli.NodeStatus + (*JGFRequest)(nil), // 7: fluxcli.JGFRequest + (*JGFResponse)(nil), // 8: fluxcli.JGFResponse +} +var file_fluence_fluxcli_grpc_fluxcli_proto_depIdxs = []int32{ + 0, // 0: fluxcli.MatchRequest.ps:type_name -> fluxcli.PodSpec + 2, // 1: fluxcli.MatchResponse.nodelist:type_name -> fluxcli.NodeAlloc + 1, // 2: fluxcli.FluxcliService.Match:input_type -> fluxcli.MatchRequest + 4, // 3: fluxcli.FluxcliService.Cancel:input_type -> fluxcli.CancelRequest + 3, // 4: fluxcli.FluxcliService.Match:output_type -> fluxcli.MatchResponse + 5, // 5: fluxcli.FluxcliService.Cancel:output_type -> fluxcli.CancelResponse + 4, // [4:6] is the sub-list for method output_type + 2, // [2:4] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_fluence_fluxcli_grpc_fluxcli_proto_init() } +func file_fluence_fluxcli_grpc_fluxcli_proto_init() { + if File_fluence_fluxcli_grpc_fluxcli_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PodSpec); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MatchRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NodeAlloc); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MatchResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CancelRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CancelResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NodeStatus); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*JGFRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*JGFResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_fluence_fluxcli_grpc_fluxcli_proto_rawDesc, + NumEnums: 0, + NumMessages: 9, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_fluence_fluxcli_grpc_fluxcli_proto_goTypes, + DependencyIndexes: file_fluence_fluxcli_grpc_fluxcli_proto_depIdxs, + MessageInfos: file_fluence_fluxcli_grpc_fluxcli_proto_msgTypes, + }.Build() + File_fluence_fluxcli_grpc_fluxcli_proto = out.File + file_fluence_fluxcli_grpc_fluxcli_proto_rawDesc = nil + file_fluence_fluxcli_grpc_fluxcli_proto_goTypes = nil + file_fluence_fluxcli_grpc_fluxcli_proto_depIdxs = nil +} diff --git a/sig-scheduler-plugins/pkg/fluence/fluxcli-grpc/fluxcli.proto b/sig-scheduler-plugins/pkg/fluence/fluxcli-grpc/fluxcli.proto new file mode 100644 index 0000000..f47d35b --- /dev/null +++ b/sig-scheduler-plugins/pkg/fluence/fluxcli-grpc/fluxcli.proto @@ -0,0 +1,76 @@ +syntax = "proto3"; +option go_package = "grpc/fluxcli"; + +package fluxcli; + + +// Service definition +service FluxcliService { + // Sends a Match command + rpc Match(MatchRequest) returns (MatchResponse) {} + rpc Cancel(CancelRequest) returns (CancelResponse) {} +} + +message PodSpec { + string id = 1; + string container = 2; + int32 cpu = 3; + int64 memory = 4; + int64 gpu = 5; + int64 storage = 6; + repeated string labels = 7; +} + +// The Match request message (allocate, allocate_orelse_reserve) +message MatchRequest { + PodSpec ps = 1; + string request = 2; + int32 count = 3; +} + +// The Nodes/Cluster Update Status +message NodeAlloc { + string nodeID = 1; + int32 tasks = 2; +} + +// The Match response message +message MatchResponse { + string podID = 1; + repeated NodeAlloc nodelist = 2; + int64 jobID = 3; +} + +message CancelRequest { + int64 jobID = 2; +} + +// The Match response message +message CancelResponse { + int64 jobID = 1; + int32 error = 2; +} + + + +// The Nodes/Cluster Update Status +message NodeStatus { + int32 cpuAvail = 1; + int32 gpuAvail = 2; + int64 storageAvail = 3; + int64 memoryAvail = 4; + int64 allowedPods = 5; + string nodeIP = 6; + int32 replication = 7; +} + +// The JGF response message +message JGFRequest { + string jgf = 1; +} + + +// The JGF response message +message JGFResponse { + string jgf = 1; +} diff --git a/sig-scheduler-plugins/pkg/fluence/fluxcli-grpc/fluxcli_grpc.pb.go b/sig-scheduler-plugins/pkg/fluence/fluxcli-grpc/fluxcli_grpc.pb.go new file mode 100644 index 0000000..7bd905a --- /dev/null +++ b/sig-scheduler-plugins/pkg/fluence/fluxcli-grpc/fluxcli_grpc.pb.go @@ -0,0 +1,139 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package fluxcli + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// FluxcliServiceClient is the client API for FluxcliService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type FluxcliServiceClient interface { + // Sends a Match command + Match(ctx context.Context, in *MatchRequest, opts ...grpc.CallOption) (*MatchResponse, error) + Cancel(ctx context.Context, in *CancelRequest, opts ...grpc.CallOption) (*CancelResponse, error) +} + +type fluxcliServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewFluxcliServiceClient(cc grpc.ClientConnInterface) FluxcliServiceClient { + return &fluxcliServiceClient{cc} +} + +func (c *fluxcliServiceClient) Match(ctx context.Context, in *MatchRequest, opts ...grpc.CallOption) (*MatchResponse, error) { + out := new(MatchResponse) + err := c.cc.Invoke(ctx, "/fluxcli.FluxcliService/Match", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *fluxcliServiceClient) Cancel(ctx context.Context, in *CancelRequest, opts ...grpc.CallOption) (*CancelResponse, error) { + out := new(CancelResponse) + err := c.cc.Invoke(ctx, "/fluxcli.FluxcliService/Cancel", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// FluxcliServiceServer is the server API for FluxcliService service. +// All implementations must embed UnimplementedFluxcliServiceServer +// for forward compatibility +type FluxcliServiceServer interface { + // Sends a Match command + Match(context.Context, *MatchRequest) (*MatchResponse, error) + Cancel(context.Context, *CancelRequest) (*CancelResponse, error) + mustEmbedUnimplementedFluxcliServiceServer() +} + +// UnimplementedFluxcliServiceServer must be embedded to have forward compatible implementations. +type UnimplementedFluxcliServiceServer struct { +} + +func (UnimplementedFluxcliServiceServer) Match(context.Context, *MatchRequest) (*MatchResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Match not implemented") +} +func (UnimplementedFluxcliServiceServer) Cancel(context.Context, *CancelRequest) (*CancelResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Cancel not implemented") +} +func (UnimplementedFluxcliServiceServer) mustEmbedUnimplementedFluxcliServiceServer() {} + +// UnsafeFluxcliServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to FluxcliServiceServer will +// result in compilation errors. +type UnsafeFluxcliServiceServer interface { + mustEmbedUnimplementedFluxcliServiceServer() +} + +func RegisterFluxcliServiceServer(s grpc.ServiceRegistrar, srv FluxcliServiceServer) { + s.RegisterService(&FluxcliService_ServiceDesc, srv) +} + +func _FluxcliService_Match_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MatchRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FluxcliServiceServer).Match(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fluxcli.FluxcliService/Match", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FluxcliServiceServer).Match(ctx, req.(*MatchRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _FluxcliService_Cancel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CancelRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FluxcliServiceServer).Cancel(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fluxcli.FluxcliService/Cancel", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FluxcliServiceServer).Cancel(ctx, req.(*CancelRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// FluxcliService_ServiceDesc is the grpc.ServiceDesc for FluxcliService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var FluxcliService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "fluxcli.FluxcliService", + HandlerType: (*FluxcliServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Match", + Handler: _FluxcliService_Match_Handler, + }, + { + MethodName: "Cancel", + Handler: _FluxcliService_Cancel_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "fluence/fluxcli-grpc/fluxcli.proto", +} diff --git a/sig-scheduler-plugins/pkg/fluence/utils/utils.go b/sig-scheduler-plugins/pkg/fluence/utils/utils.go new file mode 100644 index 0000000..cfb857d --- /dev/null +++ b/sig-scheduler-plugins/pkg/fluence/utils/utils.go @@ -0,0 +1,76 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "strings" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" +) + +type NoopStateData struct{} + +func NewNoopStateData() framework.StateData { + return &NoopStateData{} +} + +func (d *NoopStateData) Clone() framework.StateData { + return d +} + +// InspectPodInfo takes a pod object and returns the pod.spec +func InspectPodInfo(pod *v1.Pod) *pb.PodSpec { + ps := new(pb.PodSpec) + ps.Id = pod.Name + cont := pod.Spec.Containers[0] + + //This will need to be done here AND at client level + if len(pod.Labels) > 0 { + r := make([]string, 0) + for key, val := range pod.Labels { + if strings.Contains(key, "jobspec") { + r = append(r, val) + } + } + if len(r) > 0 { + ps.Labels = r + } + } + + specRequests := cont.Resources.Requests + specLimits := cont.Resources.Limits + + if specRequests.Cpu().Value() == 0 { + ps.Cpu = 1 + } else { + ps.Cpu = int32(specRequests.Cpu().Value()) + } + + if specRequests.Memory().Value() > 0 { + ps.Memory = specRequests.Memory().Value() + } + gpu := specLimits["nvidia.com/gpu"] + ps.Gpu = gpu.Value() + ps.Storage = specRequests.StorageEphemeral().Value() + + klog.Infof("[Jobspec] Pod spec: CPU %v/%v-milli, memory %v, GPU %v, storage %v", ps.Cpu, specRequests.Cpu().MilliValue(), specRequests.Memory().Value(), ps.Gpu, ps.Storage) + + return ps +}