Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOJIRA Break PodReconciler.Reconcile() into helpers #66

Merged
merged 7 commits into from
Jan 8, 2024
211 changes: 124 additions & 87 deletions pkg/controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,34 +127,41 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}

if apierrors.IsNotFound(err) {
images, ok := r.podImages[req.NamespacedName.String()]
if ok {
for _, image := range images {
if usage, ok := r.imagePlatforms[image]; ok {
usage.refcount--
for _, platform := range usage.platforms {
r.metrics.ImageCount.WithLabelValues(image, platform.OS, platform.Architecture, platform.Variant).Dec()
if usage.refcount == 0 {
r.metrics.ImageCount.DeleteLabelValues(image, platform.OS, platform.Architecture, platform.Variant)
}
}
if usage.refcount == 0 {
delete(r.imagePlatforms, image)
}
}
}
delete(r.podImages, req.NamespacedName.String())
}
r.deleteFromCache(req.NamespacedName.String())
return ctrl.Result{}, nil
}
if _, ok := r.podImages[req.NamespacedName.String()]; ok || podIsReady(ctx, pod) {
if r.isImageCached(req.NamespacedName.String()) || podIsReady(ctx, pod) {
log.DefaultLogger.WithContext(ctx).Info("pod was already processed")
return ctrl.Result{}, nil
}

podImages := arch.GetContainerImages(pod.Spec.InitContainers, pod.Spec.Containers)

imagePullSecret, err := arch.GetImagePullSecretFromPodSpec(ctx, r.Client, req.Namespace, &pod.Spec)
ctx, podScheduledOnMatchingNode, err := r.podScheduledOnMatchingNode(ctx, req.Namespace, pod, podImages)
if err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}

r.addToCache(req.NamespacedName.String(), podImages)

if podScheduledOnMatchingNode {
return ctrl.Result{}, nil
}

if _, ok := arch.PodSpecHasNodeArchitectureSelection(ctx, &pod.Spec); ok {
log.DefaultLogger.WithContext(ctx).Info("pod has node architecture selection")
return ctrl.Result{}, nil
}
log.DefaultLogger.WithContext(ctx).Warnf("pod scheduled on node with no matching platform")

// TODO: add some checks whether this is really a problem (errors, ...)
r.deletePodAndNotifyUser(ctx, pod)

return ctrl.Result{}, nil
}

func (r *PodReconciler) podScheduledOnMatchingNode(ctx context.Context, namespace string, pod *v1.Pod, podImages []string) (context.Context, bool, error) {
imagePullSecret, err := arch.GetImagePullSecretFromPodSpec(ctx, r.Client, namespace, &pod.Spec)
if err != nil {
log.DefaultLogger.WithContext(ctx).WithError(err).Error("Failed to get image pull secret from pod spec")
}
Expand All @@ -168,7 +175,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
if err != nil {
ctx = log.AddLogFieldsToContext(ctx, logrus.Fields{"node": pod.Spec.NodeName, "pod": pod.Name})
log.DefaultLogger.WithContext(ctx).WithError(err).Error("Failed to get node spec")
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
return ctx, false, err
}
if value, ok := node.Labels["kubernetes.io/arch"]; ok {
nodeArch = value
Expand All @@ -187,27 +194,9 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
for _, image := range podImages {
platforms, err := r.Registry.ListArchs(ctx, imagePullSecret, image)
if err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}
if _, ok := r.imagePlatforms[image]; !ok {
r.imagePlatforms[image] = &imageUsage{platforms: platforms}
} else {
for _, oldPlatform := range r.imagePlatforms[image].platforms {
found := false
for _, newPlatform := range platforms {
if oldPlatform.OS == newPlatform.OS && oldPlatform.Architecture == newPlatform.Architecture && oldPlatform.Variant == newPlatform.Variant {
found = true
break
}
}
if !found {
// An image currently in use has been updated and no longer supports the platform.
r.metrics.ImageCount.DeleteLabelValues(image, oldPlatform.OS, oldPlatform.Architecture, oldPlatform.Variant)
}
}
r.imagePlatforms[image].platforms = platforms
return ctx, false, err
}

r.incrementPlatformStatistics(image, platforms)
if nodeOs != "" && nodeArch != "" {
hasMatchingPlatform := false
for _, platform := range platforms {
Expand All @@ -219,61 +208,109 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
podScheduledOnMatchingNode = false
}
}
}
return ctx, podScheduledOnMatchingNode, nil
}

func (r *PodReconciler) addToCache(namespacedName string, podImages []string) {
r.podImages[namespacedName] = podImages
}

func (r *PodReconciler) isImageCached(namespacedName string) bool {
_, ok := r.podImages[namespacedName]
return ok
}

func (r *PodReconciler) deleteFromCache(namespacedName string) {
images, ok := r.podImages[namespacedName]
if ok {
for _, image := range images {
r.decrementPlatformStatistics(image)
}

r.imagePlatforms[image].refcount++
for _, platform := range platforms {
r.metrics.ImageCount.WithLabelValues(image, platform.OS, platform.Architecture, platform.Variant).Inc()
delete(r.podImages, namespacedName)
}
}

func (r *PodReconciler) incrementPlatformStatistics(image string, platforms []registry.Platform) {
if _, ok := r.imagePlatforms[image]; !ok {
r.imagePlatforms[image] = &imageUsage{platforms: platforms}
} else {
for _, oldPlatform := range r.imagePlatforms[image].platforms {
found := false
for _, newPlatform := range platforms {
if oldPlatform.OS == newPlatform.OS && oldPlatform.Architecture == newPlatform.Architecture && oldPlatform.Variant == newPlatform.Variant {
found = true
break
}
}
if !found {
// An image currently in use has been updated and no longer supports the platform.
r.metrics.ImageCount.DeleteLabelValues(image, oldPlatform.OS, oldPlatform.Architecture, oldPlatform.Variant)
}
}
r.imagePlatforms[image].platforms = platforms
}
r.imagePlatforms[image].refcount++
for _, platform := range platforms {
r.metrics.ImageCount.WithLabelValues(image, platform.OS, platform.Architecture, platform.Variant).Inc()
}
r.podImages[req.NamespacedName.String()] = podImages
if !podScheduledOnMatchingNode {
if _, ok := arch.PodSpecHasNodeArchitectureSelection(ctx, &pod.Spec); ok {
log.DefaultLogger.WithContext(ctx).Info("pod has node architecture selection")
return ctrl.Result{}, nil
}

func (r *PodReconciler) decrementPlatformStatistics(image string) {
if usage, ok := r.imagePlatforms[image]; ok {
usage.refcount--
for _, platform := range usage.platforms {
r.metrics.ImageCount.WithLabelValues(image, platform.OS, platform.Architecture, platform.Variant).Dec()
if usage.refcount == 0 {
r.metrics.ImageCount.DeleteLabelValues(image, platform.OS, platform.Architecture, platform.Variant)
}
}
log.DefaultLogger.WithContext(ctx).Warnf("pod scheduled on node with no matching platform")
if usage.refcount == 0 {
delete(r.imagePlatforms, image)
}
}
}

// TODO: add some checks whether this is really a problem (errors, ...)
err = r.Client.Delete(ctx, pod)
func (r *PodReconciler) deletePodAndNotifyUser(ctx context.Context, pod *v1.Pod) {
err := r.Client.Delete(ctx, pod)

eventType := "Normal"
nameSuffix := "-deleted-pod"
messagePrefix := "Pod(s) was deleted because it was scheduled on a node with a platform that is not supported by the image:"
if err != nil {
eventType = "Warning"
nameSuffix = "-failed-to-delete-pod"
messagePrefix = "Failed to delete pod(s) scheduled on a node with a platform that is not supported by the image. Pod(s):"
r.metrics.PodDeletedTotal.WithLabelValues(pod.Namespace, "failed").Inc()
log.DefaultLogger.WithContext(ctx).WithError(err).Error("Failed to delete pod scheduled on node with no matching platform")
} else {
r.metrics.PodDeletedTotal.WithLabelValues(pod.Namespace, "success").Inc()
log.DefaultLogger.WithContext(ctx).Info("Deleted pod scheduled on node with no matching platform")
}
// give visibility to the user that the pod has been deleted for both the pod and its owner
upsertPodDeletionEvent(ctx, r.Client, pod, pod.GetName(), eventType, nameSuffix, messagePrefix)
if len(pod.OwnerReferences) > 0 {
for _, ref := range pod.OwnerReferences {
if ref.Controller != nil && *ref.Controller {
u := &unstructured.Unstructured{}
u.SetAPIVersion(ref.APIVersion)
u.SetKind(ref.Kind)
u.SetName(ref.Name)
u.SetNamespace(pod.Namespace)
u.SetUID(ref.UID)
upsertPodDeletionEvent(
ctx,
r.Client,
u,
pod.GetName(),
eventType,
nameSuffix,
messagePrefix,
)
}
eventType := "Normal"
nameSuffix := "-deleted-pod"
messagePrefix := "Pod(s) was deleted because it was scheduled on a node with a platform that is not supported by the image:"
if err != nil {
eventType = "Warning"
nameSuffix = "-failed-to-delete-pod"
messagePrefix = "Failed to delete pod(s) scheduled on a node with a platform that is not supported by the image. Pod(s):"
r.metrics.PodDeletedTotal.WithLabelValues(pod.Namespace, "failed").Inc()
log.DefaultLogger.WithContext(ctx).WithError(err).Error("Failed to delete pod scheduled on node with no matching platform")
} else {
r.metrics.PodDeletedTotal.WithLabelValues(pod.Namespace, "success").Inc()
log.DefaultLogger.WithContext(ctx).Info("Deleted pod scheduled on node with no matching platform")
}
// give visibility to the user that the pod has been deleted for both the pod and its owner
upsertPodDeletionEvent(ctx, r.Client, pod, pod.GetName(), eventType, nameSuffix, messagePrefix)
if len(pod.OwnerReferences) > 0 {
for _, ref := range pod.OwnerReferences {
if ref.Controller != nil && *ref.Controller {
u := &unstructured.Unstructured{}
u.SetAPIVersion(ref.APIVersion)
u.SetKind(ref.Kind)
u.SetName(ref.Name)
u.SetNamespace(pod.Namespace)
u.SetUID(ref.UID)
upsertPodDeletionEvent(
ctx,
r.Client,
u,
pod.GetName(),
eventType,
nameSuffix,
messagePrefix,
)
}
}
}
return ctrl.Result{}, nil
}

// podIsReady checks if a pod condition is ready which means the readiness probe of all containers are OK.
Expand Down
Loading