diff --git a/pkg/controllers/operationjob/operationjob_controller.go b/pkg/controllers/operationjob/operationjob_controller.go index 4a3bbf60..fe8b6944 100644 --- a/pkg/controllers/operationjob/operationjob_controller.go +++ b/pkg/controllers/operationjob/operationjob_controller.go @@ -138,6 +138,10 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req return reconcile.Result{}, err } + if err := r.ensureFailedTargetsReleased(ctx, instance, candidates); err != nil { + return reconcile.Result{}, err + } + err = r.doReconcile(ctx, instance, candidates) return requeueResult(requeueAfter), err } diff --git a/pkg/controllers/operationjob/operationjob_manager.go b/pkg/controllers/operationjob/operationjob_manager.go index 721e5b62..54683a0d 100644 --- a/pkg/controllers/operationjob/operationjob_manager.go +++ b/pkg/controllers/operationjob/operationjob_manager.go @@ -174,7 +174,8 @@ func (r *ReconcileOperationJob) operateTargets( if len(candidates) == 0 { return nil } - return operator.OperateTargets(ctx, candidates, operationJob) + errMap := operator.OperateTargets(ctx, candidates, operationJob) + return ctrlutils.AggregateErrors(ojutils.ConvertErrMapToList(errMap)) } func (r *ReconcileOperationJob) getTargetsOpsStatus( @@ -244,11 +245,10 @@ func (r *ReconcileOperationJob) getTargetsOpsStatus( // ensureActiveDeadlineAndTTL calculate time to ActiveDeadlineSeconds and TTLSecondsAfterFinished and release targets func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, logger logr.Logger) (bool, *time.Duration, error) { if operationJob.Spec.ActiveDeadlineSeconds != nil { - var allowReleaseCandidates []*OpsCandidate for i := range candidates { candidate := candidates[i] - // just skip if target operation already finished, or not started - if IsCandidateOpsFinished(candidate) || candidate.OpsStatus.StartTime == nil { + // just skip if target not started + if candidate.OpsStatus.StartTime == nil { continue } leftTime := time.Duration(*operationJob.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(candidate.OpsStatus.StartTime.Time) @@ -257,17 +257,10 @@ func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, } else { logger.Info("should end but still processing") r.Recorder.Eventf(operationJob, corev1.EventTypeNormal, "Timeout", "Try to fail OperationJob for timeout...") - // mark operationjob and targets failed and release targets + // mark target failed if timeout MarkCandidateFailed(candidate) - allowReleaseCandidates = append(allowReleaseCandidates, candidate) } } - if len(allowReleaseCandidates) > 0 { - releaseErr := r.releaseTargets(ctx, operationJob, allowReleaseCandidates, false) - operationJob.Status = r.calculateStatus(operationJob, candidates) - updateErr := r.updateStatus(ctx, operationJob) - return false, nil, controllerutils.AggregateErrors([]error{releaseErr, updateErr}) - } } if operationJob.Spec.TTLSecondsAfterFinished != nil { @@ -286,19 +279,38 @@ func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, return false, nil, nil } +// ensureFailedTargetsReleased select failed but unreleased targets and call releaseTargets +func (r *ReconcileOperationJob) ensureFailedTargetsReleased(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate) error { + var allowReleaseCandidates []*OpsCandidate + for i := range candidates { + if IsCandidateOpsFailed(candidates[i]) && !IsCandidateOpsReleased(candidates[i]) { + allowReleaseCandidates = append(allowReleaseCandidates, candidates[i]) + } + } + if len(allowReleaseCandidates) > 0 { + releaseErr := r.releaseTargets(ctx, operationJob, allowReleaseCandidates, false) + operationJob.Status = r.calculateStatus(operationJob, candidates) + updateErr := r.updateStatus(ctx, operationJob) + return controllerutils.AggregateErrors([]error{releaseErr, updateErr}) + } + return nil +} + // releaseTargets try to release the targets from operation when the operationJob is deleted func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, needUpdateStatus bool) error { actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(operationJob) if err != nil { return err } - releaseErr := actionHandler.ReleaseTargets(ctx, candidates, operationJob) + + // start to release targets + releaseErrMap := actionHandler.ReleaseTargets(ctx, candidates, operationJob) _, _ = controllerutils.SlowStartBatch(len(candidates), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error { candidate := candidates[i] // cancel lifecycle if necessary if enablePodOpsLifecycle { err = r.cleanCandidateOpsLifecycle(ctx, true, candidate, operationJob) - releaseErr = controllerutils.AggregateErrors([]error{releaseErr, err}) + releaseErrMap[candidate.PodName] = controllerutils.AggregateErrors([]error{releaseErrMap[candidate.PodName], err}) } // mark candidate as failed if not finished if !IsCandidateOpsFinished(candidate) { @@ -306,6 +318,16 @@ func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob } return nil }) + + // mark target as released if error not occurred + for _, candidate := range candidates { + if releaseErrMap[candidate.PodName] == nil { + MarkCandidateReleased(candidate) + } + } + releaseErr := ctrlutils.AggregateErrors(ojutils.ConvertErrMapToList(releaseErrMap)) + + // update candidates status to job status if !needUpdateStatus { return releaseErr } diff --git a/pkg/controllers/operationjob/opscore/candidate.go b/pkg/controllers/operationjob/opscore/candidate.go index af7ecc85..95451ac3 100644 --- a/pkg/controllers/operationjob/opscore/candidate.go +++ b/pkg/controllers/operationjob/opscore/candidate.go @@ -32,6 +32,10 @@ type OpsCandidate struct { OpsStatus *appsv1alpha1.OpsStatus } +const ( + ExtraInfoReleased = "Released" +) + func DecideCandidateByPartition(instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) []*OpsCandidate { if instance.Spec.Partition == nil { return candidates @@ -81,6 +85,23 @@ func IsCandidateOpsFinished(candidate *OpsCandidate) bool { candidate.OpsStatus.Progress == appsv1alpha1.OperationProgressSucceeded } +func IsCandidateOpsReleased(candidate *OpsCandidate) bool { + if candidate.OpsStatus == nil || candidate.OpsStatus.ExtraInfo == nil { + return false + } + if val, exist := candidate.OpsStatus.ExtraInfo[ExtraInfoReleased]; exist && val == "true" { + return true + } + return false +} + +func IsCandidateOpsFailed(candidate *OpsCandidate) bool { + if candidate.OpsStatus == nil || candidate.OpsStatus.Progress == "" { + return false + } + return candidate.OpsStatus.Progress == appsv1alpha1.OperationProgressFailed +} + func IsCandidateServiceAvailable(candidate *OpsCandidate) bool { if candidate.Pod == nil || candidate.Pod.Labels == nil { return false @@ -94,3 +115,13 @@ func MarkCandidateFailed(candidate *OpsCandidate) { candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed } } + +func MarkCandidateReleased(candidate *OpsCandidate) { + if candidate.OpsStatus == nil { + candidate.OpsStatus = &appsv1alpha1.OpsStatus{} + } + if candidate.OpsStatus.ExtraInfo == nil { + candidate.OpsStatus.ExtraInfo = map[string]string{} + } + candidate.OpsStatus.ExtraInfo[ExtraInfoReleased] = "true" +} diff --git a/pkg/controllers/operationjob/opscore/handler.go b/pkg/controllers/operationjob/opscore/handler.go index 61219617..2dd49b9e 100644 --- a/pkg/controllers/operationjob/opscore/handler.go +++ b/pkg/controllers/operationjob/opscore/handler.go @@ -38,12 +38,12 @@ type ActionHandler interface { // Setup sets up action with manager in AddToMgr, i.e., watch, cache... Setup(controller.Controller, *mixin.ReconcilerMixin) error - // OperateTargets do real operation to targets - OperateTargets(context.Context, []*OpsCandidate, *appsv1alpha1.OperationJob) error + // OperateTargets do real operation to targets, and returns an error map to each target name + OperateTargets(context.Context, []*OpsCandidate, *appsv1alpha1.OperationJob) map[string]error // GetOpsProgress returns target's current opsStatus, e.g., progress, reason, message GetOpsProgress(context.Context, *OpsCandidate, *appsv1alpha1.OperationJob) (progress ActionProgress, err error) - // ReleaseTargets releases the target from operation when the operationJob is deleted - ReleaseTargets(context.Context, []*OpsCandidate, *appsv1alpha1.OperationJob) error + // ReleaseTargets releases the target from operation when failed, and returns an error map to each target name + ReleaseTargets(context.Context, []*OpsCandidate, *appsv1alpha1.OperationJob) map[string]error } diff --git a/pkg/controllers/operationjob/replace/replace.go b/pkg/controllers/operationjob/replace/replace.go index 73a5db59..7495ee36 100644 --- a/pkg/controllers/operationjob/replace/replace.go +++ b/pkg/controllers/operationjob/replace/replace.go @@ -19,6 +19,7 @@ package replace import ( "context" "fmt" + "sync" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -62,9 +63,13 @@ func (p *PodReplaceHandler) Setup(controller controller.Controller, reconcileMix return controller.Watch(&source.Kind{Type: &corev1.Pod{}}, &OriginPodHandler{Client: reconcileMixin.Client}) } -func (p *PodReplaceHandler) OperateTargets(ctx context.Context, candidates []*OpsCandidate, operationJob *appsv1alpha1.OperationJob) error { - _, err := controllerutils.SlowStartBatch(len(candidates), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error { +func (p *PodReplaceHandler) OperateTargets(ctx context.Context, candidates []*OpsCandidate, operationJob *appsv1alpha1.OperationJob) map[string]error { + errMap := &sync.Map{} + _, _ = controllerutils.SlowStartBatch(len(candidates), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) (err error) { candidate := candidates[i] + defer func() { + errMap.Store(candidate.PodName, err) + }() if candidate.Pod == nil || candidate.Pod.Labels == nil { return nil } @@ -99,7 +104,7 @@ func (p *PodReplaceHandler) OperateTargets(ctx context.Context, candidates []*Op } return nil }) - return err + return ojutils.ConvertSyncErrMap(errMap) } func (p *PodReplaceHandler) GetOpsProgress(ctx context.Context, candidate *OpsCandidate, operationJob *appsv1alpha1.OperationJob) (progress ActionProgress, err error) { @@ -173,9 +178,13 @@ func (p *PodReplaceHandler) GetOpsProgress(ctx context.Context, candidate *OpsCa return } -func (p *PodReplaceHandler) ReleaseTargets(ctx context.Context, candidates []*OpsCandidate, operationJob *appsv1alpha1.OperationJob) error { - _, err := controllerutils.SlowStartBatch(len(candidates), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error { +func (p *PodReplaceHandler) ReleaseTargets(ctx context.Context, candidates []*OpsCandidate, operationJob *appsv1alpha1.OperationJob) map[string]error { + errMap := &sync.Map{} + _, _ = controllerutils.SlowStartBatch(len(candidates), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) (err error) { candidate := candidates[i] + defer func() { + errMap.Store(candidate.PodName, err) + }() if candidate.Pod == nil || candidate.Pod.DeletionTimestamp != nil { return nil } @@ -196,7 +205,8 @@ func (p *PodReplaceHandler) ReleaseTargets(ctx context.Context, candidates []*Op ojutils.SetOpsStatusError(candidate, ojutils.ReasonUpdateObjectFailed, retErr.Error()) return retErr } + candidate.OpsStatus.ExtraInfo[ExtraInfoReleased] = "true" return nil }) - return err + return ojutils.ConvertSyncErrMap(errMap) } diff --git a/pkg/controllers/operationjob/utils/common.go b/pkg/controllers/operationjob/utils/common.go index 38acdb67..ffc9437d 100644 --- a/pkg/controllers/operationjob/utils/common.go +++ b/pkg/controllers/operationjob/utils/common.go @@ -18,6 +18,7 @@ package utils import ( "context" + "sync" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -89,3 +90,24 @@ func UpdatePodWithRetry(ctx context.Context, c client.Client, obj client.Object, return c.Update(ctx, pod) }) } + +func ConvertErrMapToList(errMap map[string]error) []error { + var errList []error + for _, v := range errMap { + errList = append(errList, v) + } + return errList +} + +func ConvertSyncErrMap(errMap *sync.Map) map[string]error { + ret := make(map[string]error) + errMap.Range(func(key, value any) bool { + if value == nil { + ret[key.(string)] = nil + } else { + ret[key.(string)] = value.(error) + } + return true + }) + return ret +}