Skip to content

Commit

Permalink
NOJIRA Break PodReconciler.Reconcile() into helpers (#66)
Browse files Browse the repository at this point in the history
* NOJIRA Break PodReconciler.Reconcile() into helpers

 # Context

The `PodReconciler.Reconcile()` is long and convoluted, making it hard
to understand and parse. As there are distinct steps in its operation,
let's break them into helpers named after the logical operation to
simplify understanding it.

 # What this changes

- Extract interface for caching and platform statistics into its own
private methods, starting a new internal API.

- Group all the code required to determine if an existing Pod is
already scheduled properly into its own private method.

- Streamline the main method's flow to reduce indentation and clarify
the logical process.

 # Non-goals

- Modify the behaviour of the reconcile method

Change-Id: Id39d5caf4120b1f20a138b6dfecd5ae8be344eab
Co-authored-by: João Alves <[email protected]>
  • Loading branch information
miguelbernadi and joaoqalves authored Jan 8, 2024
1 parent 7fbdc30 commit 62ea3e9
Showing 1 changed file with 124 additions and 87 deletions.
211 changes: 124 additions & 87 deletions pkg/controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,34 +127,41 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}

if apierrors.IsNotFound(err) {
images, ok := r.podImages[req.NamespacedName.String()]
if ok {
for _, image := range images {
if usage, ok := r.imagePlatforms[image]; ok {
usage.refcount--
for _, platform := range usage.platforms {
r.metrics.ImageCount.WithLabelValues(image, platform.OS, platform.Architecture, platform.Variant).Dec()
if usage.refcount == 0 {
r.metrics.ImageCount.DeleteLabelValues(image, platform.OS, platform.Architecture, platform.Variant)
}
}
if usage.refcount == 0 {
delete(r.imagePlatforms, image)
}
}
}
delete(r.podImages, req.NamespacedName.String())
}
r.deleteFromCache(req.NamespacedName.String())
return ctrl.Result{}, nil
}
if _, ok := r.podImages[req.NamespacedName.String()]; ok || podIsReady(ctx, pod) {
if r.isImageCached(req.NamespacedName.String()) || podIsReady(ctx, pod) {
log.DefaultLogger.WithContext(ctx).Info("pod was already processed")
return ctrl.Result{}, nil
}

podImages := arch.GetContainerImages(pod.Spec.InitContainers, pod.Spec.Containers)

imagePullSecret, err := arch.GetImagePullSecretFromPodSpec(ctx, r.Client, req.Namespace, &pod.Spec)
ctx, podScheduledOnMatchingNode, err := r.podScheduledOnMatchingNode(ctx, req.Namespace, pod, podImages)
if err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}

r.addToCache(req.NamespacedName.String(), podImages)

if podScheduledOnMatchingNode {
return ctrl.Result{}, nil
}

if _, ok := arch.PodSpecHasNodeArchitectureSelection(ctx, &pod.Spec); ok {
log.DefaultLogger.WithContext(ctx).Info("pod has node architecture selection")
return ctrl.Result{}, nil
}
log.DefaultLogger.WithContext(ctx).Warnf("pod scheduled on node with no matching platform")

// TODO: add some checks whether this is really a problem (errors, ...)
r.deletePodAndNotifyUser(ctx, pod)

return ctrl.Result{}, nil
}

func (r *PodReconciler) podScheduledOnMatchingNode(ctx context.Context, namespace string, pod *v1.Pod, podImages []string) (context.Context, bool, error) {
imagePullSecret, err := arch.GetImagePullSecretFromPodSpec(ctx, r.Client, namespace, &pod.Spec)
if err != nil {
log.DefaultLogger.WithContext(ctx).WithError(err).Error("Failed to get image pull secret from pod spec")
}
Expand All @@ -168,7 +175,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
if err != nil {
ctx = log.AddLogFieldsToContext(ctx, logrus.Fields{"node": pod.Spec.NodeName, "pod": pod.Name})
log.DefaultLogger.WithContext(ctx).WithError(err).Error("Failed to get node spec")
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
return ctx, false, err
}
if value, ok := node.Labels["kubernetes.io/arch"]; ok {
nodeArch = value
Expand All @@ -187,27 +194,9 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
for _, image := range podImages {
platforms, err := r.Registry.ListArchs(ctx, imagePullSecret, image)
if err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}
if _, ok := r.imagePlatforms[image]; !ok {
r.imagePlatforms[image] = &imageUsage{platforms: platforms}
} else {
for _, oldPlatform := range r.imagePlatforms[image].platforms {
found := false
for _, newPlatform := range platforms {
if oldPlatform.OS == newPlatform.OS && oldPlatform.Architecture == newPlatform.Architecture && oldPlatform.Variant == newPlatform.Variant {
found = true
break
}
}
if !found {
// An image currently in use has been updated and no longer supports the platform.
r.metrics.ImageCount.DeleteLabelValues(image, oldPlatform.OS, oldPlatform.Architecture, oldPlatform.Variant)
}
}
r.imagePlatforms[image].platforms = platforms
return ctx, false, err
}

r.incrementPlatformStatistics(image, platforms)
if nodeOs != "" && nodeArch != "" {
hasMatchingPlatform := false
for _, platform := range platforms {
Expand All @@ -219,61 +208,109 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
podScheduledOnMatchingNode = false
}
}
}
return ctx, podScheduledOnMatchingNode, nil
}

func (r *PodReconciler) addToCache(namespacedName string, podImages []string) {
r.podImages[namespacedName] = podImages
}

func (r *PodReconciler) isImageCached(namespacedName string) bool {
_, ok := r.podImages[namespacedName]
return ok
}

func (r *PodReconciler) deleteFromCache(namespacedName string) {
images, ok := r.podImages[namespacedName]
if ok {
for _, image := range images {
r.decrementPlatformStatistics(image)
}

r.imagePlatforms[image].refcount++
for _, platform := range platforms {
r.metrics.ImageCount.WithLabelValues(image, platform.OS, platform.Architecture, platform.Variant).Inc()
delete(r.podImages, namespacedName)
}
}

func (r *PodReconciler) incrementPlatformStatistics(image string, platforms []registry.Platform) {
if _, ok := r.imagePlatforms[image]; !ok {
r.imagePlatforms[image] = &imageUsage{platforms: platforms}
} else {
for _, oldPlatform := range r.imagePlatforms[image].platforms {
found := false
for _, newPlatform := range platforms {
if oldPlatform.OS == newPlatform.OS && oldPlatform.Architecture == newPlatform.Architecture && oldPlatform.Variant == newPlatform.Variant {
found = true
break
}
}
if !found {
// An image currently in use has been updated and no longer supports the platform.
r.metrics.ImageCount.DeleteLabelValues(image, oldPlatform.OS, oldPlatform.Architecture, oldPlatform.Variant)
}
}
r.imagePlatforms[image].platforms = platforms
}
r.imagePlatforms[image].refcount++
for _, platform := range platforms {
r.metrics.ImageCount.WithLabelValues(image, platform.OS, platform.Architecture, platform.Variant).Inc()
}
r.podImages[req.NamespacedName.String()] = podImages
if !podScheduledOnMatchingNode {
if _, ok := arch.PodSpecHasNodeArchitectureSelection(ctx, &pod.Spec); ok {
log.DefaultLogger.WithContext(ctx).Info("pod has node architecture selection")
return ctrl.Result{}, nil
}

func (r *PodReconciler) decrementPlatformStatistics(image string) {
if usage, ok := r.imagePlatforms[image]; ok {
usage.refcount--
for _, platform := range usage.platforms {
r.metrics.ImageCount.WithLabelValues(image, platform.OS, platform.Architecture, platform.Variant).Dec()
if usage.refcount == 0 {
r.metrics.ImageCount.DeleteLabelValues(image, platform.OS, platform.Architecture, platform.Variant)
}
}
log.DefaultLogger.WithContext(ctx).Warnf("pod scheduled on node with no matching platform")
if usage.refcount == 0 {
delete(r.imagePlatforms, image)
}
}
}

// TODO: add some checks whether this is really a problem (errors, ...)
err = r.Client.Delete(ctx, pod)
func (r *PodReconciler) deletePodAndNotifyUser(ctx context.Context, pod *v1.Pod) {
err := r.Client.Delete(ctx, pod)

eventType := "Normal"
nameSuffix := "-deleted-pod"
messagePrefix := "Pod(s) was deleted because it was scheduled on a node with a platform that is not supported by the image:"
if err != nil {
eventType = "Warning"
nameSuffix = "-failed-to-delete-pod"
messagePrefix = "Failed to delete pod(s) scheduled on a node with a platform that is not supported by the image. Pod(s):"
r.metrics.PodDeletedTotal.WithLabelValues(pod.Namespace, "failed").Inc()
log.DefaultLogger.WithContext(ctx).WithError(err).Error("Failed to delete pod scheduled on node with no matching platform")
} else {
r.metrics.PodDeletedTotal.WithLabelValues(pod.Namespace, "success").Inc()
log.DefaultLogger.WithContext(ctx).Info("Deleted pod scheduled on node with no matching platform")
}
// give visibility to the user that the pod has been deleted for both the pod and its owner
upsertPodDeletionEvent(ctx, r.Client, pod, pod.GetName(), eventType, nameSuffix, messagePrefix)
if len(pod.OwnerReferences) > 0 {
for _, ref := range pod.OwnerReferences {
if ref.Controller != nil && *ref.Controller {
u := &unstructured.Unstructured{}
u.SetAPIVersion(ref.APIVersion)
u.SetKind(ref.Kind)
u.SetName(ref.Name)
u.SetNamespace(pod.Namespace)
u.SetUID(ref.UID)
upsertPodDeletionEvent(
ctx,
r.Client,
u,
pod.GetName(),
eventType,
nameSuffix,
messagePrefix,
)
}
eventType := "Normal"
nameSuffix := "-deleted-pod"
messagePrefix := "Pod(s) was deleted because it was scheduled on a node with a platform that is not supported by the image:"
if err != nil {
eventType = "Warning"
nameSuffix = "-failed-to-delete-pod"
messagePrefix = "Failed to delete pod(s) scheduled on a node with a platform that is not supported by the image. Pod(s):"
r.metrics.PodDeletedTotal.WithLabelValues(pod.Namespace, "failed").Inc()
log.DefaultLogger.WithContext(ctx).WithError(err).Error("Failed to delete pod scheduled on node with no matching platform")
} else {
r.metrics.PodDeletedTotal.WithLabelValues(pod.Namespace, "success").Inc()
log.DefaultLogger.WithContext(ctx).Info("Deleted pod scheduled on node with no matching platform")
}
// give visibility to the user that the pod has been deleted for both the pod and its owner
upsertPodDeletionEvent(ctx, r.Client, pod, pod.GetName(), eventType, nameSuffix, messagePrefix)
if len(pod.OwnerReferences) > 0 {
for _, ref := range pod.OwnerReferences {
if ref.Controller != nil && *ref.Controller {
u := &unstructured.Unstructured{}
u.SetAPIVersion(ref.APIVersion)
u.SetKind(ref.Kind)
u.SetName(ref.Name)
u.SetNamespace(pod.Namespace)
u.SetUID(ref.UID)
upsertPodDeletionEvent(
ctx,
r.Client,
u,
pod.GetName(),
eventType,
nameSuffix,
messagePrefix,
)
}
}
}
return ctrl.Result{}, nil
}

// podIsReady checks if a pod condition is ready which means the readiness probe of all containers are OK.
Expand Down

0 comments on commit 62ea3e9

Please sign in to comment.