diff --git a/Makefile b/Makefile index 91789d8..d051a0e 100644 --- a/Makefile +++ b/Makefile @@ -26,9 +26,11 @@ update: clone prepare: clone # These are entirely new directory structures rm -rf $(CLONE_UPSTREAM)/pkg/fluence + rm -rf $(CLONE_UPSTREAM)/pkg/logger # rm -rf $(CLONE_UPSTREAM)/cmd/app rm -rf $(CLONE_UPSTREAM)/pkg/controllers/podgroup_controller.go rm -rf $(CLONE_UPSTREAM)/cmd/controller/app/server.go + cp -R sig-scheduler-plugins/pkg/logger $(CLONE_UPSTREAM)/pkg/logger cp -R sig-scheduler-plugins/pkg/fluence $(CLONE_UPSTREAM)/pkg/fluence cp -R sig-scheduler-plugins/pkg/controllers/* $(CLONE_UPSTREAM)/pkg/controllers/ # This is the one exception not from sig-scheduler-plugins because it is needed in both spots diff --git a/README.md b/README.md index 8c51de7..e3e1214 100644 --- a/README.md +++ b/README.md @@ -509,6 +509,14 @@ The last step ensures we use the images we loaded! You can basically just do: This sped up my development time immensely. If you want to manually do the steps, see that script for instructions. +#### Logging + +For easier viewing of what fluence is doing (in the sig-scheduler-plugins) we have a file logger that can be seen in the container: + +```bash +$ kubectl exec -it fluence-68c4c586c6-nktdl -c scheduler-plugins-scheduler -- cat /tmp/fluence.log +``` + ##### kubectl plugin Note that if you want to enable extra endpoints for the fluence kubectl plugin and expose the GRPC as a service, you can do: diff --git a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go index 27c31cb..a2fd4a6 100644 --- a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go +++ b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go @@ -405,6 +405,8 @@ func (r *PodGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +// ensurePodGroup ensures we create the pod group (or delete) when pod is deleted +// for delete, this would be better done as an owner reference., but I haven't gotten it working func (r *PodGroupReconciler) ensurePodGroup(ctx context.Context, obj client.Object) []ctrl.Request { pod, ok := obj.(*v1.Pod) if !ok { @@ -418,6 +420,18 @@ func (r *PodGroupReconciler) ensurePodGroup(ctx context.Context, obj client.Obje return nil } + // If we deleted the pod... assume we delete the group too + if !pod.ObjectMeta.DeletionTimestamp.IsZero() { + r.log.Info("Pod: ", "Name", pod.Name, "Status", pod.Status.Phase, "Action", "Deleted") + + pg := &schedv1alpha1.PodGroup{} + err := r.Get(ctx, types.NamespacedName{Name: groupName, Namespace: pod.Namespace}, pg) + if err != nil { + r.Delete(ctx, pg) + } + return nil + } + // If we are watching the Pod and it's beyond pending, we hopefully already made a group // and that group should be in the reconcile process. if pod.Status.Phase != v1.PodPending { diff --git a/sig-scheduler-plugins/pkg/fluence/core/core.go b/sig-scheduler-plugins/pkg/fluence/core/core.go index eed9536..8b08468 100644 --- a/sig-scheduler-plugins/pkg/fluence/core/core.go +++ b/sig-scheduler-plugins/pkg/fluence/core/core.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" + "sigs.k8s.io/scheduler-plugins/pkg/logger" "sigs.k8s.io/scheduler-plugins/pkg/util" ) @@ -84,10 +85,17 @@ type PodGroupManager struct { // Probably should just choose one... oh well sync.RWMutex mutex sync.Mutex + log *logger.DebugLogger } // NewPodGroupManager creates a new operation object. -func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager { +func NewPodGroupManager( + client client.Client, + snapshotSharedLister framework.SharedLister, + scheduleTimeout *time.Duration, + podInformer informerv1.PodInformer, + log *logger.DebugLogger, +) *PodGroupManager { pgMgr := &PodGroupManager{ client: client, snapshotSharedLister: snapshotSharedLister, @@ -97,6 +105,7 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha backedOffPG: gochache.New(10*time.Second, 10*time.Second), groupToJobId: map[string]uint64{}, podToNode: map[string]string{}, + log: log, } return pgMgr } @@ -126,13 +135,14 @@ func (pgMgr *PodGroupManager) PreFilter( state *framework.CycleState, ) error { - klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod)) + pgMgr.log.Info("[PodGroup PreFilter] pod %s", klog.KObj(pod)) pgFullName, pg := pgMgr.GetPodGroup(ctx, pod) if pg == nil { return nil } - if _, exist := pgMgr.backedOffPG.Get(pgFullName); exist { + _, exist := pgMgr.backedOffPG.Get(pgFullName) + if exist { return fmt.Errorf("podGroup %v failed recently", pgFullName) } @@ -147,7 +157,7 @@ func (pgMgr *PodGroupManager) PreFilter( statuses := pgMgr.GetStatuses(pods) // This shows us the number of pods we have in the set and their states - klog.Infof("Fluence Pre-filter", "group", pgFullName, "pods", statuses, "MinMember", pg.Spec.MinMember, "Size", len(pods)) + pgMgr.log.Info("[PodGroup PreFilter] group: %s pods: %s MinMember: %d Size: %d", pgFullName, statuses, pg.Spec.MinMember, len(pods)) if len(pods) < int(pg.Spec.MinMember) { return fmt.Errorf("pre-filter pod %v cannot find enough sibling pods, "+ "current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember) @@ -164,7 +174,8 @@ func (pgMgr *PodGroupManager) PreFilter( // TODO(cwdsuzhou): This resource check may not always pre-catch unschedulable pod group. // It only tries to PreFilter resource constraints so even if a PodGroup passed here, // it may not necessarily pass Filter due to other constraints such as affinity/taints. - if _, ok := pgMgr.permittedPG.Get(pgFullName); ok { + _, ok := pgMgr.permittedPG.Get(pgFullName) + if ok { return nil } @@ -173,14 +184,14 @@ func (pgMgr *PodGroupManager) PreFilter( repPod := pods[0] nodes, err := pgMgr.AskFlux(ctx, *repPod, pg, pgFullName) if err != nil { - klog.Infof("[Fluence] Fluxion returned an error %s, not schedulable", err.Error()) + pgMgr.log.Info("[PodGroup PreFilter] Fluxion returned an error %s, not schedulable", err.Error()) return err } - klog.Infof("Node Selected %s (pod group %s)", nodes, pgFullName) + pgMgr.log.Info("Node Selected %s (pod group %s)", nodes, pgFullName) // Some reason fluxion gave us the wrong size? if len(nodes) != len(pods) { - klog.Info("Warning - group %s needs %d nodes but Fluxion returned the wrong number nodes %d.", pgFullName, len(pods), len(nodes)) + pgMgr.log.Warning("[PodGroup PreFilter] group %s needs %d nodes but Fluxion returned the wrong number nodes %d.", pgFullName, len(pods), len(nodes)) pgMgr.mutex.Lock() pgMgr.cancelFluxJob(pgFullName, repPod) pgMgr.mutex.Unlock() @@ -236,38 +247,3 @@ func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) func GetNamespacedName(obj metav1.Object) string { return fmt.Sprintf("%v/%v", obj.GetNamespace(), obj.GetName()) } - -func getNodeResource(info *framework.NodeInfo, desiredPodGroupName string) *framework.Resource { - nodeClone := info.Clone() - for _, podInfo := range info.Pods { - if podInfo == nil || podInfo.Pod == nil { - continue - } - if util.GetPodGroupFullName(podInfo.Pod) != desiredPodGroupName { - continue - } - nodeClone.RemovePod(podInfo.Pod) - } - - leftResource := framework.Resource{ - ScalarResources: make(map[corev1.ResourceName]int64), - } - allocatable := nodeClone.Allocatable - requested := nodeClone.Requested - - leftResource.AllowedPodNumber = allocatable.AllowedPodNumber - len(nodeClone.Pods) - leftResource.MilliCPU = allocatable.MilliCPU - requested.MilliCPU - leftResource.Memory = allocatable.Memory - requested.Memory - leftResource.EphemeralStorage = allocatable.EphemeralStorage - requested.EphemeralStorage - - for k, allocatableEx := range allocatable.ScalarResources { - requestEx, ok := requested.ScalarResources[k] - if !ok { - leftResource.ScalarResources[k] = allocatableEx - } else { - leftResource.ScalarResources[k] = allocatableEx - requestEx - } - } - klog.V(4).InfoS("Node left resource", "node", klog.KObj(info.Node()), "resource", leftResource) - return &leftResource -} diff --git a/sig-scheduler-plugins/pkg/fluence/core/flux.go b/sig-scheduler-plugins/pkg/fluence/core/flux.go index def239f..48e1500 100644 --- a/sig-scheduler-plugins/pkg/fluence/core/flux.go +++ b/sig-scheduler-plugins/pkg/fluence/core/flux.go @@ -6,7 +6,6 @@ import ( "google.golang.org/grpc" "k8s.io/apimachinery/pkg/labels" - klog "k8s.io/klog/v2" pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" fgroup "sigs.k8s.io/scheduler-plugins/pkg/fluence/group" @@ -38,7 +37,7 @@ func (pgMgr *PodGroupManager) AskFlux( // cancel in fluence. What we can do here is assume the previous pods are no longer running // and cancel the flux job to create again. if isAllocated { - klog.Info("Warning - group %s was previously allocated and is requesting again, so must have completed.", groupName) + pgMgr.log.Warning("[PodGroup AskFlux] group %s was previously allocated and is requesting again, so must have completed.", groupName) pgMgr.mutex.Lock() pgMgr.cancelFluxJob(groupName, &pod) pgMgr.mutex.Unlock() @@ -49,12 +48,12 @@ func (pgMgr *PodGroupManager) AskFlux( // This obviously may not be true if we have a hetereogenous PodGroup. // We name it based on the group, since it will represent the group jobspec := utils.PreparePodJobSpec(&pod, groupName) - klog.Infof("[Fluence] Inspect pod info, jobspec: %s\n", jobspec) + pgMgr.log.Info("[PodGroup AskFlux] Inspect pod info, jobspec: %s\n", jobspec) conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) // TODO change this to just return fmt.Errorf if err != nil { - klog.Errorf("[Fluence] Error connecting to server: %v\n", err) + pgMgr.log.Error("[PodGroup AskFlux] Error connecting to server: %v\n", err) return nodes, err } defer conn.Close() @@ -72,12 +71,12 @@ func (pgMgr *PodGroupManager) AskFlux( // An error here is an error with making the request r, err := grpcclient.Match(context.Background(), request) if err != nil { - klog.Errorf("[Fluence] did not receive any match response: %v\n", err) + pgMgr.log.Warning("[PodGroup AskFlux] did not receive any match response: %v\n", err) return nodes, err } // TODO GetPodID should be renamed, because it will reflect the group - klog.Infof("[Fluence] Match response ID %s\n", r.GetPodID()) + pgMgr.log.Info("[PodGroup AskFlux] Match response ID %s\n", r.GetPodID()) // Get the nodelist and inspect nodelist := r.GetNodelist() @@ -85,7 +84,7 @@ func (pgMgr *PodGroupManager) AskFlux( nodes = append(nodes, node.NodeID) } jobid := uint64(r.GetJobID()) - klog.Infof("[Fluence] parsed node pods list %s for job id %d\n", nodes, jobid) + pgMgr.log.Info("[PodGroup AskFlux] parsed node pods list %s for job id %d\n", nodes, jobid) // TODO would be nice to actually be able to ask flux jobs -a to fluence // That way we can verify assignments, etc. @@ -103,15 +102,15 @@ func (pgMgr *PodGroupManager) cancelFluxJob(groupName string, pod *corev1.Pod) e // The job was already cancelled by another pod if !ok { - klog.Infof("[Fluence] Request for cancel of group %s is already complete.", groupName) + pgMgr.log.Info("[PodGroup cancelFluxJob] Request for cancel of group %s is already complete.", groupName) return nil } - klog.Infof("[Fluence] Cancel flux job: %v for group %s", jobid, groupName) + pgMgr.log.Info("[PodGroup cancelFluxJob] Cancel flux job: %v for group %s", jobid, groupName) // This first error is about connecting to the server conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) if err != nil { - klog.Errorf("[Fluence] Error connecting to server: %v", err) + pgMgr.log.Error("[PodGroup cancelFluxJob] Error connecting to server: %v", err) return err } defer conn.Close() @@ -124,17 +123,17 @@ func (pgMgr *PodGroupManager) cancelFluxJob(groupName string, pod *corev1.Pod) e request := &pb.CancelRequest{JobID: int64(jobid)} res, err := grpcclient.Cancel(context.Background(), request) if err != nil { - klog.Errorf("[Fluence] did not receive any cancel response: %v", err) + pgMgr.log.Error("[PodGroup cancelFluxJob] did not receive any cancel response: %v", err) return err } - klog.Infof("[Fluence] Job cancellation for group %s result: %d", groupName, res.Error) + pgMgr.log.Info("[PodGroup cancelFluxJob] Job cancellation for group %s result: %d", groupName, res.Error) // And this error is if the cancel was successful or not if res.Error == 0 { - klog.Infof("[Fluence] Successful cancel of flux job: %d for group %s", jobid, groupName) + pgMgr.log.Info("[PodGroup cancelFluxJob] Successful cancel of flux job: %d for group %s", jobid, groupName) pgMgr.cleanup(pod, groupName) } else { - klog.Warningf("[Fluence] Failed to cancel flux job %d for group %s", jobid, groupName) + pgMgr.log.Warning("[PodGroup cancelFluxJob] Failed to cancel flux job %d for group %s", jobid, groupName) } return nil } @@ -174,7 +173,7 @@ func (pgMgr *PodGroupManager) UpdatePod(oldObj, newObj interface{}) { groupName = pg.Name } - klog.Infof("[Fluence] Processing event for pod %s in group %s from %s to %s", newPod.Name, groupName, oldPod.Status.Phase, newPod.Status.Phase) + pgMgr.log.Verbose("[PodGroup UpdatePod] Processing event for pod %s in group %s from %s to %s", newPod.Name, groupName, oldPod.Status.Phase, newPod.Status.Phase) switch newPod.Status.Phase { case corev1.PodPending: @@ -182,7 +181,7 @@ func (pgMgr *PodGroupManager) UpdatePod(oldObj, newObj interface{}) { case corev1.PodRunning: // if a pod is start running, we can add it state to the delta graph if it is scheduled by other scheduler case corev1.PodSucceeded: - klog.Infof("[Fluence] Pod %s succeeded, Fluence needs to free the resources", newPod.Name) + pgMgr.log.Info("[PodGroup UpdatePod] Pod %s succeeded, Fluence needs to free the resources", newPod.Name) pgMgr.mutex.Lock() defer pgMgr.mutex.Unlock() @@ -194,13 +193,13 @@ func (pgMgr *PodGroupManager) UpdatePod(oldObj, newObj interface{}) { if ok { pgMgr.cancelFluxJob(groupName, oldPod) } else { - klog.Infof("[Fluence] Succeeded pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) + pgMgr.log.Verbose("[PodGroup UpdatePod] Succeeded pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) } case corev1.PodFailed: // a corner case need to be tested, the pod exit code is not 0, can be created with segmentation fault pi test - klog.Warningf("[Fluence] Pod %s in group %s failed, Fluence needs to free the resources", newPod.Name, groupName) + pgMgr.log.Warning("[PodGroup UpdatePod] Pod %s in group %s failed, Fluence needs to free the resources", newPod.Name, groupName) pgMgr.mutex.Lock() defer pgMgr.mutex.Unlock() @@ -209,7 +208,7 @@ func (pgMgr *PodGroupManager) UpdatePod(oldObj, newObj interface{}) { if ok { pgMgr.cancelFluxJob(groupName, oldPod) } else { - klog.Errorf("[Fluence] Failed pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) + pgMgr.log.Error("[PodGroup UpdatePod] Failed pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) } case corev1.PodUnknown: // don't know how to deal with it as it's unknown phase @@ -220,7 +219,6 @@ func (pgMgr *PodGroupManager) UpdatePod(oldObj, newObj interface{}) { // DeletePod handles the delete event handler func (pgMgr *PodGroupManager) DeletePod(podObj interface{}) { - klog.Info("[Fluence] Delete Pod event handler") pod := podObj.(*corev1.Pod) groupName, pg := pgMgr.GetPodGroup(context.TODO(), pod) @@ -230,11 +228,11 @@ func (pgMgr *PodGroupManager) DeletePod(podObj interface{}) { groupName = pg.Name } - klog.Infof("[Fluence] Delete pod %s in group %s has status %s", pod.Status.Phase, pod.Name, groupName) + pgMgr.log.Verbose("[PodGroup DeletePod] Delete pod %s in group %s has status %s", pod.Status.Phase, pod.Name, groupName) switch pod.Status.Phase { case corev1.PodSucceeded: case corev1.PodPending: - klog.Infof("[Fluence] Pod %s completed and is Pending termination, Fluence needs to free the resources", pod.Name) + pgMgr.log.Verbose("[PodGroup DeletePod] Pod %s completed and is Pending termination, Fluence needs to free the resources", pod.Name) pgMgr.mutex.Lock() defer pgMgr.mutex.Unlock() @@ -243,7 +241,7 @@ func (pgMgr *PodGroupManager) DeletePod(podObj interface{}) { if ok { pgMgr.cancelFluxJob(groupName, pod) } else { - klog.Infof("[Fluence] Terminating pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) + pgMgr.log.Info("[PodGroup DeletePod] Terminating pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) } case corev1.PodRunning: pgMgr.mutex.Lock() @@ -253,7 +251,7 @@ func (pgMgr *PodGroupManager) DeletePod(podObj interface{}) { if ok { pgMgr.cancelFluxJob(groupName, pod) } else { - klog.Infof("[Fluence] Deleted pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) + pgMgr.log.Info("[PodGroup DeletePod] Deleted pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) } } } diff --git a/sig-scheduler-plugins/pkg/fluence/fluence.go b/sig-scheduler-plugins/pkg/fluence/fluence.go index 5f9f635..84f3e95 100644 --- a/sig-scheduler-plugins/pkg/fluence/fluence.go +++ b/sig-scheduler-plugins/pkg/fluence/fluence.go @@ -25,6 +25,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" klog "k8s.io/klog/v2" + "sigs.k8s.io/scheduler-plugins/pkg/logger" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" clientscheme "k8s.io/client-go/kubernetes/scheme" @@ -50,6 +52,7 @@ type Fluence struct { frameworkHandler framework.Handle pgMgr fcore.Manager scheduleTimeout *time.Duration + log *logger.DebugLogger } var ( @@ -70,6 +73,11 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) args := config.CoschedulingArgs{} ctx := context.TODO() + // Make fluence his own little logger! + // This can eventually be a flag, but just going to set for now + // It shall be a very chonky file. Oh lawd he comin! + l := logger.NewDebugLogger(logger.LevelError, "/tmp/fluence.log") + scheme := runtime.NewScheme() _ = clientscheme.AddToScheme(scheme) _ = corev1.AddToScheme(scheme) @@ -92,6 +100,7 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) &scheduleTimeDuration, // Keep the podInformer (from frameworkHandle) as the single source of Pods. handle.SharedInformerFactory().Core().V1().Pods(), + l, ) // Event handlers to call on pgMgr @@ -105,8 +114,13 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) frameworkHandler: handle, pgMgr: pgMgr, scheduleTimeout: &scheduleTimeDuration, + log: l, } - return plugin, nil + + // TODO this is not supported yet + // Account for resources in running cluster + err = plugin.RegisterExisting(ctx) + return plugin, err } func (f *Fluence) Name() string { @@ -134,7 +148,7 @@ func (f *Fluence) Filter( nodeInfo *framework.NodeInfo, ) *framework.Status { - klog.Info("Filtering input node ", nodeInfo.Node().Name) + f.log.Verbose("[Fluence Filter] Filtering input node %s", nodeInfo.Node().Name) state, err := cycleState.Read(framework.StateKey(pod.Name)) // No error means we retrieved the state @@ -147,7 +161,7 @@ func (f *Fluence) Filter( if ok && value.NodeName != nodeInfo.Node().Name { return framework.NewStatus(framework.Unschedulable, "pod is not permitted") } else { - klog.Infof("Filter: node %s selected for %s\n", value.NodeName, pod.Name) + f.log.Info("[Fluence Filter] node %s selected for %s\n", value.NodeName, pod.Name) } } return framework.NewStatus(framework.Success) @@ -158,7 +172,6 @@ func (f *Fluence) Filter( // 2. Compare the initialization timestamps of PodGroups or Pods. // 3. Compare the keys of PodGroups/Pods: /. func (f *Fluence) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { - klog.Infof("ordering pods in fluence scheduler plugin") prio1 := corev1helpers.PodPriority(podInfo1.Pod) prio2 := corev1helpers.PodPriority(podInfo2.Pod) if prio1 != prio2 { @@ -212,7 +225,7 @@ func (f *Fluence) PreFilter( // This will populate the node name into the pod group manager err := f.pgMgr.PreFilter(ctx, pod, state) if err != nil { - klog.ErrorS(err, "PreFilter failed", "pod", klog.KObj(pod)) + f.log.Error("[Fluence PreFilter] failed pod %s: %s", klog.KObj(pod), err.Error()) return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } node = f.pgMgr.GetPodNode(pod) diff --git a/sig-scheduler-plugins/pkg/fluence/register.go b/sig-scheduler-plugins/pkg/fluence/register.go new file mode 100644 index 0000000..8f39f09 --- /dev/null +++ b/sig-scheduler-plugins/pkg/fluence/register.go @@ -0,0 +1,55 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fluence + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +// RegisterExisting uses the in cluster API to ensure existing pods +// are known to fluence, This is a one-time, static approach, so if a resource +// here goes away we cannot remove it from being known. But it's better than +// not having it, and having fluxion assume more resources than the +// cluster has available. This is a TODO as fluxion does not support it +func (f *Fluence) RegisterExisting(ctx context.Context) error { + + // creates an in-cluster config and client + config, err := rest.InClusterConfig() + if err != nil { + f.log.Error("[Fluence RegisterExisting] Error creating in-cluster config: %s\n", err) + return err + } + // creates the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + f.log.Error("[Fluence RegisterExisting] Error creating client for config: %s\n", err) + return err + } + // get pods in all the namespaces by omitting namespace + // Or specify namespace to get pods in particular namespace + pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{}) + if err != nil { + f.log.Info("[Fluence RegisterExisting] Error listing pods: %s\n", err) + return err + } + f.log.Info("[Fluence RegisterExisting] Found %d existing pods in the cluster\n", len(pods.Items)) + return nil +} diff --git a/sig-scheduler-plugins/pkg/logger/logger.go b/sig-scheduler-plugins/pkg/logger/logger.go new file mode 100644 index 0000000..522be61 --- /dev/null +++ b/sig-scheduler-plugins/pkg/logger/logger.go @@ -0,0 +1,88 @@ +package logger + +// A small debug logger to write to file instead of klog +// I don't know where to close, so I'm opening and appending each time +// It's a bad design, but will work for debugging. + +import ( + "fmt" + "log" + "os" +) + +const ( + LevelNone = iota + LevelInfo + LevelWarning + LevelError + LevelVerbose + LevelDebug +) + +// TODO try saving state here when we can close +type DebugLogger struct { + level int + Filename string + handle *os.File +} + +func NewDebugLogger(level int, filename string) *DebugLogger { + return &DebugLogger{ + level: LevelNone, + Filename: filename, + } +} + +func (l *DebugLogger) Start() (*log.Logger, error) { + f, err := os.OpenFile(l.Filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm) + if err != nil { + return nil, err + } + logger := log.New(f, "", 0) + l.handle = f + return logger, nil +} +func (l *DebugLogger) Stop() error { + if l.handle != nil { + return l.handle.Close() + } + return nil +} + +// Logging functions you should use! +func (l *DebugLogger) Info(message ...any) error { + return l.log(LevelInfo, " INFO: ", message...) +} +func (l *DebugLogger) Error(message ...any) error { + return l.log(LevelError, " ERROR: ", message...) +} +func (l *DebugLogger) Debug(message ...any) error { + return l.log(LevelDebug, " DEBUG: ", message...) +} +func (l *DebugLogger) Verbose(message ...any) error { + return l.log(LevelVerbose, "VERBOSE: ", message...) +} +func (l *DebugLogger) Warning(message ...any) error { + return l.log(LevelWarning, "WARNING: ", message...) +} + +// log is the shared class function for actually printing to the log +func (l *DebugLogger) log(level int, prefix string, message ...any) error { + logger, err := l.Start() + if err != nil { + return err + } + // Assume the prolog (to be formatted) is at index 0 + prolog := message[0].(string) + if prefix != "" { + prolog = prefix + " " + prolog + } + rest := message[1:] + + // msg := fmt.Sprintf(message...) + fmt.Printf("Compariing level %d >= %d\n", level, l.level) + if level >= l.level { + logger.Printf(prolog, rest...) + } + return l.Stop() +} diff --git a/src/fluence/utils/utils.go b/src/fluence/utils/utils.go index f81f81c..e429056 100644 --- a/src/fluence/utils/utils.go +++ b/src/fluence/utils/utils.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + klog "k8s.io/klog/v2" + "encoding/json" "github.com/flux-framework/flux-k8s/flux-plugin/fluence/jgf" @@ -20,7 +22,56 @@ var ( controlPlaneLabel = "node-role.kubernetes.io/control-plane" ) +// RegisterExisting uses the in cluster API to get existing pods +// This is actually the same as computeTotalRequests but I wanted to compare the two +// It is currently not being used. The main difference is that below, we are essentially +// rounding the cpu to the smaller unit (logically for the graph) but losing some +// granularity, if we think "milli" values have feet. +func RegisterExisting(clientset *kubernetes.Clientset, ctx context.Context) (map[string]PodSpec, error) { + + // We are using PodSpec as a holder for a *summary* of cpu/memory being used + // by the node, it is a summation across pods we find on each one + nodes := map[string]PodSpec{} + + // get pods in all the namespaces by omitting namespace + // Or specify namespace to get pods in particular namespace + pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{}) + if err != nil { + klog.Infof("Error listing pods: %s\n", err) + return nodes, err + } + klog.Infof("Found %d existing pods in the cluster\n", len(pods.Items)) + + // Create a new PodSpec for each + for _, pod := range pods.Items { + + // Add the node to our lookup if we don't have it yet + _, ok := nodes[pod.Spec.NodeName] + if !ok { + nodes[pod.Spec.NodeName] = PodSpec{} + } + ps := nodes[pod.Spec.NodeName] + + for _, container := range pod.Spec.Containers { + specRequests := container.Resources.Requests + ps.Cpu += int32(specRequests.Cpu().Value()) + ps.Memory += specRequests.Memory().Value() + ps.Storage += specRequests.StorageEphemeral().Value() + + specLimits := container.Resources.Limits + gpuSpec := specLimits["nvidia.com/gpu"] + ps.Gpu += gpuSpec.Value() + } + nodes[pod.Spec.NodeName] = ps + } + return nodes, nil +} + // CreateJGF creates the Json Graph Format +// We currently don't have support in fluxion to allocate jobs for existing pods, +// so instead we create the graph with fewer resources. When that support is +// added (see sig-scheduler-plugins/pkg/fluence/register.go) we can +// remove the adjustment here, which is more of a hack func CreateJGF(filename string, skipLabel *string) error { ctx := context.Background() config, err := rest.InClusterConfig() @@ -28,16 +79,19 @@ func CreateJGF(filename string, skipLabel *string) error { fmt.Println("Error getting InClusterConfig") return err } - // creates the clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { - fmt.Println("Error getting ClientSet") + fmt.Printf("Error getting ClientSet: %s", err) return err } nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + fmt.Printf("Error listing nodes: %s", err) + return err + } - var fluxgraph jgf.Fluxjgf - fluxgraph = jgf.InitJGF() + // Create a Flux Json Graph Format (JGF) with all cluster nodes + fluxgraph := jgf.InitJGF() // TODO it looks like we can add more to the graph here - // let's remember to consider what else we can. @@ -53,11 +107,11 @@ func CreateJGF(filename string, skipLabel *string) error { vcores := 0 fmt.Println("Number nodes ", len(nodes.Items)) - var totalAllocCpu, totalmem int64 + var totalAllocCpu int64 totalAllocCpu = 0 sdnCount := 0 - for node_index, node := range nodes.Items { + for nodeIndex, node := range nodes.Items { // We should not be scheduling to the control plane _, ok := node.Labels[controlPlaneLabel] @@ -71,107 +125,121 @@ func CreateJGF(filename string, skipLabel *string) error { if *skipLabel != "" { _, ok := node.Labels[*skipLabel] if ok { - fmt.Println("Skipping node ", node.GetName()) + fmt.Printf("Skipping node %s\n", node.GetName()) continue } } - fmt.Println("node in flux group ", node.GetName()) - if !node.Spec.Unschedulable { - fieldselector, err := fields.ParseSelector("spec.nodeName=" + node.GetName() + ",status.phase!=" + string(corev1.PodSucceeded) + ",status.phase!=" + string(corev1.PodFailed)) - if err != nil { - return err - } - pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{ - FieldSelector: fieldselector.String(), - }) - if err != nil { - return err - } + if node.Spec.Unschedulable { + fmt.Printf("Skipping node %s, unschedulable\n", node.GetName()) + continue + } - // fmt.Println("Node ", node.GetName(), " has pods ", pods) - // Check if subnet already exists - // Here we build subnets according to topology.kubernetes.io/zone label - subnetName := node.Labels["topology.kubernetes.io/zone"] - subnet := fluxgraph.MakeSubnet(sdnCount, subnetName) - sdnCount = sdnCount + 1 - fluxgraph.MakeEdge(cluster, subnet, "contains") - fluxgraph.MakeEdge(subnet, cluster, "in") - - reqs := computeTotalRequests(pods) - cpuReqs := reqs[corev1.ResourceCPU] - memReqs := reqs[corev1.ResourceMemory] - - avail := node.Status.Allocatable.Cpu().MilliValue() - totalcpu := int64((avail - cpuReqs.MilliValue()) / 1000) //- 1 - fmt.Println("Node ", node.GetName(), " flux cpu ", totalcpu) - totalAllocCpu = totalAllocCpu + totalcpu - totalmem = node.Status.Allocatable.Memory().Value() - memReqs.Value() - fmt.Println("Node ", node.GetName(), " total mem ", totalmem) - gpuAllocatable, hasGpuAllocatable := node.Status.Allocatable["nvidia.com/gpu"] - - // reslist := node.Status.Allocatable - // resources := make([]corev1.ResourceName, 0, len(reslist)) - // for resource := range reslist { - // fmt.Println("resource ", resource) - // resources = append(resources, resource) - // } - // for _, resource := range resources { - // value := reslist[resource] - - // fmt.Printf(" %s:\t%s\n", resource, value.String()) - // } - - workernode := fluxgraph.MakeNode(node_index, false, node.Name) - fluxgraph.MakeEdge(subnet, workernode, "contains") // this is rack otherwise - fluxgraph.MakeEdge(workernode, subnet, "in") // this is rack otherwise - - // socket := fluxgraph.MakeSocket(0, "socket") - // fluxgraph.MakeEdge(workernode, socket, "contains") - // fluxgraph.MakeEdge(socket, workernode, "in") - - if hasGpuAllocatable { - fmt.Println("GPU Resource quantity ", gpuAllocatable.Value()) - //MakeGPU(index int, name string, size int) string { - for index := 0; index < int(gpuAllocatable.Value()); index++ { - gpu := fluxgraph.MakeGPU(index, "nvidiagpu", 1) - fluxgraph.MakeEdge(workernode, gpu, "contains") // workernode was socket - fluxgraph.MakeEdge(gpu, workernode, "in") - } + fieldselector, err := fields.ParseSelector("spec.nodeName=" + node.GetName() + ",status.phase!=" + string(corev1.PodSucceeded) + ",status.phase!=" + string(corev1.PodFailed)) + if err != nil { + return err + } + pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + FieldSelector: fieldselector.String(), + }) + if err != nil { + return err + } + // Check if subnet already exists + // Here we build subnets according to topology.kubernetes.io/zone label + subnetName := node.Labels["topology.kubernetes.io/zone"] + subnet := fluxgraph.MakeSubnet(sdnCount, subnetName) + sdnCount = sdnCount + 1 + fluxgraph.MakeEdge(cluster, subnet, "contains") + fluxgraph.MakeEdge(subnet, cluster, "in") + + // These are requests for existing pods, for cpu and memory + reqs := computeTotalRequests(pods) + cpuReqs := reqs[corev1.ResourceCPU] + memReqs := reqs[corev1.ResourceMemory] + + // Actual values that we have available (minus requests) + totalCpu := node.Status.Allocatable.Cpu().MilliValue() + totalMem := node.Status.Allocatable.Memory().Value() + + // Values accounting for requests + availCpu := int64((totalCpu - cpuReqs.MilliValue()) / 1000) + availMem := totalMem - memReqs.Value() + + // Show existing to compare to + fmt.Printf("\n📦️ %s\n", node.GetName()) + fmt.Printf(" allocated cpu: %d\n", cpuReqs.Value()) + fmt.Printf(" allocated mem: %d\n", memReqs.Value()) + fmt.Printf(" available cpu: %d\n", availCpu) + fmt.Printf(" running pods: %d\n", len(pods.Items)) + + // keep track of overall total + totalAllocCpu += availCpu + fmt.Printf(" available mem: %d\n", availMem) + gpuAllocatable, hasGpuAllocatable := node.Status.Allocatable["nvidia.com/gpu"] + + // reslist := node.Status.Allocatable + // resources := make([]corev1.ResourceName, 0, len(reslist)) + // for resource := range reslist { + // fmt.Println("resource ", resource) + // resources = append(resources, resource) + // } + // for _, resource := range resources { + // value := reslist[resource] + + // fmt.Printf(" %s:\t%s\n", resource, value.String()) + // } + + workernode := fluxgraph.MakeNode(nodeIndex, false, node.Name) + fluxgraph.MakeEdge(subnet, workernode, "contains") // this is rack otherwise + fluxgraph.MakeEdge(workernode, subnet, "in") // this is rack otherwise + + // socket := fluxgraph.MakeSocket(0, "socket") + // fluxgraph.MakeEdge(workernode, socket, "contains") + // fluxgraph.MakeEdge(socket, workernode, "in") + + if hasGpuAllocatable { + fmt.Println("GPU Resource quantity ", gpuAllocatable.Value()) + //MakeGPU(index int, name string, size int) string { + for index := 0; index < int(gpuAllocatable.Value()); index++ { + gpu := fluxgraph.MakeGPU(index, "nvidiagpu", 1) + fluxgraph.MakeEdge(workernode, gpu, "contains") // workernode was socket + fluxgraph.MakeEdge(gpu, workernode, "in") } - for index := 0; index < int(totalcpu); index++ { - // MakeCore(index int, name string) - core := fluxgraph.MakeCore(index, "core") - fluxgraph.MakeEdge(workernode, core, "contains") // workernode was socket - fluxgraph.MakeEdge(core, workernode, "in") - - // Question from Vanessa: - // How can we get here and have vcores ever not equal to zero? - if vcores == 0 { - fluxgraph.MakeNFDProperties(core, index, "cpu-", &node.Labels) - // fluxgraph.MakeNFDProperties(core, index, "netmark-", &node.Labels) - } else { - for vc := 0; vc < vcores; vc++ { - vcore := fluxgraph.MakeVCore(core, vc, "vcore") - fluxgraph.MakeNFDProperties(vcore, index, "cpu-", &node.Labels) - } + } + + for index := 0; index < int(availCpu); index++ { + // MakeCore(index int, name string) + core := fluxgraph.MakeCore(index, "core") + fluxgraph.MakeEdge(workernode, core, "contains") // workernode was socket + fluxgraph.MakeEdge(core, workernode, "in") + + // Question from Vanessa: + // How can we get here and have vcores ever not equal to zero? + if vcores == 0 { + fluxgraph.MakeNFDProperties(core, index, "cpu-", &node.Labels) + // fluxgraph.MakeNFDProperties(core, index, "netmark-", &node.Labels) + } else { + for vc := 0; vc < vcores; vc++ { + vcore := fluxgraph.MakeVCore(core, vc, "vcore") + fluxgraph.MakeNFDProperties(vcore, index, "cpu-", &node.Labels) } } + } - // MakeMemory(index int, name string, unit string, size int) - fractionmem := totalmem >> 30 - // fractionmem := (totalmem/totalcpu) >> 20 - // fmt.Println("Creating ", fractionmem, " vertices with ", 1<<10, " MB of mem") - for i := 0; i < /*int(totalcpu)*/ int(fractionmem); i++ { - mem := fluxgraph.MakeMemory(i, "memory", "MB", int(1<<10)) - fluxgraph.MakeEdge(workernode, mem, "contains") - fluxgraph.MakeEdge(mem, workernode, "in") - } + // MakeMemory(index int, name string, unit string, size int) + fractionMem := availMem >> 30 + // fractionmem := (totalmem/totalcpu) >> 20 + // fmt.Println("Creating ", fractionmem, " vertices with ", 1<<10, " MB of mem") + for i := 0; i < /*int(totalcpu)*/ int(fractionMem); i++ { + mem := fluxgraph.MakeMemory(i, "memory", "MB", int(1<<10)) + fluxgraph.MakeEdge(workernode, mem, "contains") + fluxgraph.MakeEdge(mem, workernode, "in") } } - fmt.Println("Can request at most ", totalAllocCpu, " exclusive cpu") + fmt.Printf("\nCan request at most %d exclusive cpu", totalAllocCpu) err = fluxgraph.WriteJGF(filename) if err != nil { return err