Skip to content

Commit

Permalink
feat: add small logger just for fluence
Browse files Browse the repository at this point in the history
Problem: it is really hard using klog and parses through messy multi-threaded logs
Solution: make a little (likely temporary) filesystem logger for a single place
of truth!

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Mar 15, 2024
1 parent cbc66f7 commit e78973b
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 166 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ update: clone
prepare: clone
# These are entirely new directory structures
rm -rf $(CLONE_UPSTREAM)/pkg/fluence
rm -rf $(CLONE_UPSTREAM)/pkg/logger
# rm -rf $(CLONE_UPSTREAM)/cmd/app
rm -rf $(CLONE_UPSTREAM)/pkg/controllers/podgroup_controller.go
rm -rf $(CLONE_UPSTREAM)/cmd/controller/app/server.go
cp -R sig-scheduler-plugins/pkg/logger $(CLONE_UPSTREAM)/pkg/logger
cp -R sig-scheduler-plugins/pkg/fluence $(CLONE_UPSTREAM)/pkg/fluence
cp -R sig-scheduler-plugins/pkg/controllers/* $(CLONE_UPSTREAM)/pkg/controllers/
# This is the one exception not from sig-scheduler-plugins because it is needed in both spots
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,14 @@ The last step ensures we use the images we loaded! You can basically just do:

This sped up my development time immensely. If you want to manually do the steps, see that script for instructions.

#### Logging

For easier viewing of what fluence is doing (in the sig-scheduler-plugins) we have a file logger that can be seen in the container:

```bash
$ kubectl exec -it fluence-68c4c586c6-nktdl -c scheduler-plugins-scheduler -- cat /tmp/fluence.log
```

##### kubectl plugin

Note that if you want to enable extra endpoints for the fluence kubectl plugin and expose the GRPC as a service, you can do:
Expand Down
14 changes: 14 additions & 0 deletions sig-scheduler-plugins/pkg/controllers/podgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ func (r *PodGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

// ensurePodGroup ensures we create the pod group (or delete) when pod is deleted
// for delete, this would be better done as an owner reference., but I haven't gotten it working
func (r *PodGroupReconciler) ensurePodGroup(ctx context.Context, obj client.Object) []ctrl.Request {
pod, ok := obj.(*v1.Pod)
if !ok {
Expand All @@ -418,6 +420,18 @@ func (r *PodGroupReconciler) ensurePodGroup(ctx context.Context, obj client.Obje
return nil
}

// If we deleted the pod... assume we delete the group too
if !pod.ObjectMeta.DeletionTimestamp.IsZero() {
r.log.Info("Pod: ", "Name", pod.Name, "Status", pod.Status.Phase, "Action", "Deleted")

pg := &schedv1alpha1.PodGroup{}
err := r.Get(ctx, types.NamespacedName{Name: groupName, Namespace: pod.Namespace}, pg)
if err != nil {
r.Delete(ctx, pg)
}
return nil
}

// If we are watching the Pod and it's beyond pending, we hopefully already made a group
// and that group should be in the reconcile process.
if pod.Status.Phase != v1.PodPending {
Expand Down
62 changes: 19 additions & 43 deletions sig-scheduler-plugins/pkg/fluence/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
"sigs.k8s.io/scheduler-plugins/pkg/logger"
"sigs.k8s.io/scheduler-plugins/pkg/util"
)

Expand Down Expand Up @@ -84,10 +85,17 @@ type PodGroupManager struct {
// Probably should just choose one... oh well
sync.RWMutex
mutex sync.Mutex
log *logger.DebugLogger
}

// NewPodGroupManager creates a new operation object.
func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager {
func NewPodGroupManager(
client client.Client,
snapshotSharedLister framework.SharedLister,
scheduleTimeout *time.Duration,
podInformer informerv1.PodInformer,
log *logger.DebugLogger,
) *PodGroupManager {
pgMgr := &PodGroupManager{
client: client,
snapshotSharedLister: snapshotSharedLister,
Expand All @@ -97,6 +105,7 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha
backedOffPG: gochache.New(10*time.Second, 10*time.Second),
groupToJobId: map[string]uint64{},
podToNode: map[string]string{},
log: log,
}
return pgMgr
}
Expand Down Expand Up @@ -126,13 +135,14 @@ func (pgMgr *PodGroupManager) PreFilter(
state *framework.CycleState,
) error {

klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod))
pgMgr.log.Info("[PodGroup PreFilter] pod %s", klog.KObj(pod))
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
if pg == nil {
return nil
}

if _, exist := pgMgr.backedOffPG.Get(pgFullName); exist {
_, exist := pgMgr.backedOffPG.Get(pgFullName)
if exist {
return fmt.Errorf("podGroup %v failed recently", pgFullName)
}

Expand All @@ -147,7 +157,7 @@ func (pgMgr *PodGroupManager) PreFilter(
statuses := pgMgr.GetStatuses(pods)

// This shows us the number of pods we have in the set and their states
klog.Infof("Fluence Pre-filter", "group", pgFullName, "pods", statuses, "MinMember", pg.Spec.MinMember, "Size", len(pods))
pgMgr.log.Info("[PodGroup PreFilter] group: %s pods: %s MinMember: %d Size: %d", pgFullName, statuses, pg.Spec.MinMember, len(pods))
if len(pods) < int(pg.Spec.MinMember) {
return fmt.Errorf("pre-filter pod %v cannot find enough sibling pods, "+
"current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember)
Expand All @@ -164,7 +174,8 @@ func (pgMgr *PodGroupManager) PreFilter(
// TODO(cwdsuzhou): This resource check may not always pre-catch unschedulable pod group.
// It only tries to PreFilter resource constraints so even if a PodGroup passed here,
// it may not necessarily pass Filter due to other constraints such as affinity/taints.
if _, ok := pgMgr.permittedPG.Get(pgFullName); ok {
_, ok := pgMgr.permittedPG.Get(pgFullName)
if ok {
return nil
}

Expand All @@ -173,14 +184,14 @@ func (pgMgr *PodGroupManager) PreFilter(
repPod := pods[0]
nodes, err := pgMgr.AskFlux(ctx, *repPod, pg, pgFullName)
if err != nil {
klog.Infof("[Fluence] Fluxion returned an error %s, not schedulable", err.Error())
pgMgr.log.Info("[PodGroup PreFilter] Fluxion returned an error %s, not schedulable", err.Error())
return err
}
klog.Infof("Node Selected %s (pod group %s)", nodes, pgFullName)
pgMgr.log.Info("Node Selected %s (pod group %s)", nodes, pgFullName)

// Some reason fluxion gave us the wrong size?
if len(nodes) != len(pods) {
klog.Info("Warning - group %s needs %d nodes but Fluxion returned the wrong number nodes %d.", pgFullName, len(pods), len(nodes))
pgMgr.log.Warning("[PodGroup PreFilter] group %s needs %d nodes but Fluxion returned the wrong number nodes %d.", pgFullName, len(pods), len(nodes))
pgMgr.mutex.Lock()
pgMgr.cancelFluxJob(pgFullName, repPod)
pgMgr.mutex.Unlock()
Expand Down Expand Up @@ -236,38 +247,3 @@ func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod)
func GetNamespacedName(obj metav1.Object) string {
return fmt.Sprintf("%v/%v", obj.GetNamespace(), obj.GetName())
}

func getNodeResource(info *framework.NodeInfo, desiredPodGroupName string) *framework.Resource {
nodeClone := info.Clone()
for _, podInfo := range info.Pods {
if podInfo == nil || podInfo.Pod == nil {
continue
}
if util.GetPodGroupFullName(podInfo.Pod) != desiredPodGroupName {
continue
}
nodeClone.RemovePod(podInfo.Pod)
}

leftResource := framework.Resource{
ScalarResources: make(map[corev1.ResourceName]int64),
}
allocatable := nodeClone.Allocatable
requested := nodeClone.Requested

leftResource.AllowedPodNumber = allocatable.AllowedPodNumber - len(nodeClone.Pods)
leftResource.MilliCPU = allocatable.MilliCPU - requested.MilliCPU
leftResource.Memory = allocatable.Memory - requested.Memory
leftResource.EphemeralStorage = allocatable.EphemeralStorage - requested.EphemeralStorage

for k, allocatableEx := range allocatable.ScalarResources {
requestEx, ok := requested.ScalarResources[k]
if !ok {
leftResource.ScalarResources[k] = allocatableEx
} else {
leftResource.ScalarResources[k] = allocatableEx - requestEx
}
}
klog.V(4).InfoS("Node left resource", "node", klog.KObj(info.Node()), "resource", leftResource)
return &leftResource
}
46 changes: 22 additions & 24 deletions sig-scheduler-plugins/pkg/fluence/core/flux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/labels"
klog "k8s.io/klog/v2"
pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc"
fgroup "sigs.k8s.io/scheduler-plugins/pkg/fluence/group"

Expand Down Expand Up @@ -38,7 +37,7 @@ func (pgMgr *PodGroupManager) AskFlux(
// cancel in fluence. What we can do here is assume the previous pods are no longer running
// and cancel the flux job to create again.
if isAllocated {
klog.Info("Warning - group %s was previously allocated and is requesting again, so must have completed.", groupName)
pgMgr.log.Warning("[PodGroup AskFlux] group %s was previously allocated and is requesting again, so must have completed.", groupName)
pgMgr.mutex.Lock()
pgMgr.cancelFluxJob(groupName, &pod)
pgMgr.mutex.Unlock()
Expand All @@ -49,12 +48,12 @@ func (pgMgr *PodGroupManager) AskFlux(
// This obviously may not be true if we have a hetereogenous PodGroup.
// We name it based on the group, since it will represent the group
jobspec := utils.PreparePodJobSpec(&pod, groupName)
klog.Infof("[Fluence] Inspect pod info, jobspec: %s\n", jobspec)
pgMgr.log.Info("[PodGroup AskFlux] Inspect pod info, jobspec: %s\n", jobspec)
conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure())

// TODO change this to just return fmt.Errorf
if err != nil {
klog.Errorf("[Fluence] Error connecting to server: %v\n", err)
pgMgr.log.Error("[PodGroup AskFlux] Error connecting to server: %v\n", err)
return nodes, err
}
defer conn.Close()
Expand All @@ -72,20 +71,20 @@ func (pgMgr *PodGroupManager) AskFlux(
// An error here is an error with making the request
r, err := grpcclient.Match(context.Background(), request)
if err != nil {
klog.Errorf("[Fluence] did not receive any match response: %v\n", err)
pgMgr.log.Warning("[PodGroup AskFlux] did not receive any match response: %v\n", err)
return nodes, err
}

// TODO GetPodID should be renamed, because it will reflect the group
klog.Infof("[Fluence] Match response ID %s\n", r.GetPodID())
pgMgr.log.Info("[PodGroup AskFlux] Match response ID %s\n", r.GetPodID())

// Get the nodelist and inspect
nodelist := r.GetNodelist()
for _, node := range nodelist {
nodes = append(nodes, node.NodeID)
}
jobid := uint64(r.GetJobID())
klog.Infof("[Fluence] parsed node pods list %s for job id %d\n", nodes, jobid)
pgMgr.log.Info("[PodGroup AskFlux] parsed node pods list %s for job id %d\n", nodes, jobid)

// TODO would be nice to actually be able to ask flux jobs -a to fluence
// That way we can verify assignments, etc.
Expand All @@ -103,15 +102,15 @@ func (pgMgr *PodGroupManager) cancelFluxJob(groupName string, pod *corev1.Pod) e

// The job was already cancelled by another pod
if !ok {
klog.Infof("[Fluence] Request for cancel of group %s is already complete.", groupName)
pgMgr.log.Info("[PodGroup cancelFluxJob] Request for cancel of group %s is already complete.", groupName)
return nil
}
klog.Infof("[Fluence] Cancel flux job: %v for group %s", jobid, groupName)
pgMgr.log.Info("[PodGroup cancelFluxJob] Cancel flux job: %v for group %s", jobid, groupName)

// This first error is about connecting to the server
conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure())
if err != nil {
klog.Errorf("[Fluence] Error connecting to server: %v", err)
pgMgr.log.Error("[PodGroup cancelFluxJob] Error connecting to server: %v", err)
return err
}
defer conn.Close()
Expand All @@ -124,17 +123,17 @@ func (pgMgr *PodGroupManager) cancelFluxJob(groupName string, pod *corev1.Pod) e
request := &pb.CancelRequest{JobID: int64(jobid)}
res, err := grpcclient.Cancel(context.Background(), request)
if err != nil {
klog.Errorf("[Fluence] did not receive any cancel response: %v", err)
pgMgr.log.Error("[PodGroup cancelFluxJob] did not receive any cancel response: %v", err)
return err
}
klog.Infof("[Fluence] Job cancellation for group %s result: %d", groupName, res.Error)
pgMgr.log.Info("[PodGroup cancelFluxJob] Job cancellation for group %s result: %d", groupName, res.Error)

// And this error is if the cancel was successful or not
if res.Error == 0 {
klog.Infof("[Fluence] Successful cancel of flux job: %d for group %s", jobid, groupName)
pgMgr.log.Info("[PodGroup cancelFluxJob] Successful cancel of flux job: %d for group %s", jobid, groupName)
pgMgr.cleanup(pod, groupName)
} else {
klog.Warningf("[Fluence] Failed to cancel flux job %d for group %s", jobid, groupName)
pgMgr.log.Warning("[PodGroup cancelFluxJob] Failed to cancel flux job %d for group %s", jobid, groupName)
}
return nil
}
Expand Down Expand Up @@ -174,15 +173,15 @@ func (pgMgr *PodGroupManager) UpdatePod(oldObj, newObj interface{}) {
groupName = pg.Name
}

klog.Infof("[Fluence] Processing event for pod %s in group %s from %s to %s", newPod.Name, groupName, oldPod.Status.Phase, newPod.Status.Phase)
pgMgr.log.Verbose("[PodGroup UpdatePod] Processing event for pod %s in group %s from %s to %s", newPod.Name, groupName, oldPod.Status.Phase, newPod.Status.Phase)

switch newPod.Status.Phase {
case corev1.PodPending:
// in this state we don't know if a pod is going to be running, thus we don't need to update job map
case corev1.PodRunning:
// if a pod is start running, we can add it state to the delta graph if it is scheduled by other scheduler
case corev1.PodSucceeded:
klog.Infof("[Fluence] Pod %s succeeded, Fluence needs to free the resources", newPod.Name)
pgMgr.log.Info("[PodGroup UpdatePod] Pod %s succeeded, Fluence needs to free the resources", newPod.Name)

pgMgr.mutex.Lock()
defer pgMgr.mutex.Unlock()
Expand All @@ -194,13 +193,13 @@ func (pgMgr *PodGroupManager) UpdatePod(oldObj, newObj interface{}) {
if ok {
pgMgr.cancelFluxJob(groupName, oldPod)
} else {
klog.Infof("[Fluence] Succeeded pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName)
pgMgr.log.Verbose("[PodGroup UpdatePod] Succeeded pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName)
}

case corev1.PodFailed:

// a corner case need to be tested, the pod exit code is not 0, can be created with segmentation fault pi test
klog.Warningf("[Fluence] Pod %s in group %s failed, Fluence needs to free the resources", newPod.Name, groupName)
pgMgr.log.Warning("[PodGroup UpdatePod] Pod %s in group %s failed, Fluence needs to free the resources", newPod.Name, groupName)

pgMgr.mutex.Lock()
defer pgMgr.mutex.Unlock()
Expand All @@ -209,7 +208,7 @@ func (pgMgr *PodGroupManager) UpdatePod(oldObj, newObj interface{}) {
if ok {
pgMgr.cancelFluxJob(groupName, oldPod)
} else {
klog.Errorf("[Fluence] Failed pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName)
pgMgr.log.Error("[PodGroup UpdatePod] Failed pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName)
}
case corev1.PodUnknown:
// don't know how to deal with it as it's unknown phase
Expand All @@ -220,7 +219,6 @@ func (pgMgr *PodGroupManager) UpdatePod(oldObj, newObj interface{}) {

// DeletePod handles the delete event handler
func (pgMgr *PodGroupManager) DeletePod(podObj interface{}) {
klog.Info("[Fluence] Delete Pod event handler")
pod := podObj.(*corev1.Pod)
groupName, pg := pgMgr.GetPodGroup(context.TODO(), pod)

Expand All @@ -230,11 +228,11 @@ func (pgMgr *PodGroupManager) DeletePod(podObj interface{}) {
groupName = pg.Name
}

klog.Infof("[Fluence] Delete pod %s in group %s has status %s", pod.Status.Phase, pod.Name, groupName)
pgMgr.log.Verbose("[PodGroup DeletePod] Delete pod %s in group %s has status %s", pod.Status.Phase, pod.Name, groupName)
switch pod.Status.Phase {
case corev1.PodSucceeded:
case corev1.PodPending:
klog.Infof("[Fluence] Pod %s completed and is Pending termination, Fluence needs to free the resources", pod.Name)
pgMgr.log.Verbose("[PodGroup DeletePod] Pod %s completed and is Pending termination, Fluence needs to free the resources", pod.Name)

pgMgr.mutex.Lock()
defer pgMgr.mutex.Unlock()
Expand All @@ -243,7 +241,7 @@ func (pgMgr *PodGroupManager) DeletePod(podObj interface{}) {
if ok {
pgMgr.cancelFluxJob(groupName, pod)
} else {
klog.Infof("[Fluence] Terminating pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName)
pgMgr.log.Info("[PodGroup DeletePod] Terminating pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName)
}
case corev1.PodRunning:
pgMgr.mutex.Lock()
Expand All @@ -253,7 +251,7 @@ func (pgMgr *PodGroupManager) DeletePod(podObj interface{}) {
if ok {
pgMgr.cancelFluxJob(groupName, pod)
} else {
klog.Infof("[Fluence] Deleted pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName)
pgMgr.log.Info("[PodGroup DeletePod] Deleted pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName)
}
}
}
Loading

0 comments on commit e78973b

Please sign in to comment.