From 9ffa565bc60e08936f7f80cb3f491cf53f256e7f Mon Sep 17 00:00:00 2001 From: Haodong Lyu Date: Thu, 8 Jul 2021 12:47:49 +0800 Subject: [PATCH] fix(expectation): more appropriate use of expectation (#139) --- pkg/controller.v1/common/job.go | 20 ++++++++++++++- pkg/controller.v1/common/pod.go | 39 ++++++++++++++++++++--------- pkg/controller.v1/common/service.go | 19 ++++++++------ 3 files changed, 57 insertions(+), 21 deletions(-) diff --git a/pkg/controller.v1/common/job.go b/pkg/controller.v1/common/job.go index ffbe92f0..606f0dc6 100644 --- a/pkg/controller.v1/common/job.go +++ b/pkg/controller.v1/common/job.go @@ -7,8 +7,10 @@ import ( "time" apiv1 "github.com/kubeflow/common/pkg/apis/common/v1" + "github.com/kubeflow/common/pkg/controller.v1/expectation" commonutil "github.com/kubeflow/common/pkg/util" "github.com/kubeflow/common/pkg/util/k8sutil" + log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -114,8 +116,14 @@ func (jc *JobController) ReconcileJobs( utilruntime.HandleError(fmt.Errorf("Couldn't get key for job object %#v: %v", job, err)) return err } - log.Infof("Reconciling for job %s", metaObject.GetName()) + // Reset expectations + // 1. Since `ReconcileJobs` is called, we expect that previous expectations are all satisfied, + // and it's safe to reset the expectations + // 2. Reset expectations can avoid dirty data such as `expectedDeletion = -1` + // (pod or service was deleted unexpectedly) + jc.ResetExpectations(jobKey, replicas) + log.Infof("Reconciling for job %s", metaObject.GetName()) pods, err := jc.Controller.GetPodsForJob(job) if err != nil { log.Warnf("GetPodsForJob error %v", err) @@ -315,6 +323,16 @@ func (jc *JobController) ReconcileJobs( return nil } +// ResetExpectations reset the expectation for creates and deletes of pod/service to zero. +func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) { + for rtype := range replicas { + expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype) + jc.Expectations.SetExpectations(expectationPodsKey, 0, 0) + expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype) + jc.Expectations.SetExpectations(expectationServicesKey, 0, 0) + } +} + // PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded. func (jc *JobController) PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool { if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil { diff --git a/pkg/controller.v1/common/pod.go b/pkg/controller.v1/common/pod.go index 1d00700f..507b33f7 100644 --- a/pkg/controller.v1/common/pod.go +++ b/pkg/controller.v1/common/pod.go @@ -16,8 +16,15 @@ package common import ( "fmt" + "reflect" + "strconv" + + apiv1 "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/kubeflow/common/pkg/controller.v1/control" "github.com/kubeflow/common/pkg/controller.v1/expectation" + commonutil "github.com/kubeflow/common/pkg/util" + trainutil "github.com/kubeflow/common/pkg/util/train" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" @@ -28,12 +35,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" - "reflect" - "strconv" - - apiv1 "github.com/kubeflow/common/pkg/apis/common/v1" - commonutil "github.com/kubeflow/common/pkg/util" - trainutil "github.com/kubeflow/common/pkg/util/train" ) const ( @@ -333,11 +334,17 @@ func (jc *JobController) ReconcilePods( if !ok { return fmt.Errorf("job is not a runtime.Object type") } + jobKey, err := KeyFunc(metaObject) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err)) + return err + } + expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype) // Convert ReplicaType to lower string. logger := commonutil.LoggerForReplica(metaObject, rtype) // Get all pods for the type rt. - pods, err := jc.FilterPodsForReplicaType(pods, rtype) + pods, err = jc.FilterPodsForReplicaType(pods, rtype) if err != nil { return err } @@ -375,6 +382,8 @@ func (jc *JobController) ReconcilePods( if err != nil { return err } + // Deletion is expected + jc.Expectations.RaiseExpectations(expectationPodsKey, 0, 1) } // Get the exit code of the container. @@ -395,6 +404,8 @@ func (jc *JobController) ReconcilePods( if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil { return err } + // Deletion is expected + jc.Expectations.RaiseExpectations(expectationPodsKey, 0, 1) } } @@ -421,11 +432,6 @@ func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, ind utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err)) return err } - expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt) - err = jc.Expectations.ExpectCreations(expectationPodsKey, 1) - if err != nil { - return err - } logger := commonutil.LoggerForReplica(metaObject, rt) // Set type and index for the worker. @@ -484,6 +490,11 @@ func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, ind } } + // Creation is expected when there is no error returned + // We use `RaiseExpectations` here to accumulate expectations since `SetExpectations` has no such kind of ability + expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt) + jc.Expectations.RaiseExpectations(expectationPodsKey, 1, 0) + controllerRef := jc.GenOwnerReference(metaObject) err = jc.PodControl.CreatePodsWithControllerRef(metaObject.GetNamespace(), podTemplate, runtimeObject, controllerRef) if err != nil && errors.IsTimeout(err) { @@ -496,6 +507,10 @@ func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, ind // pod when the expectation expires. return nil } else if err != nil { + // Since error occurred(the informer won't observe this pod), + // we decrement the expected number of creates + // and wait until next reconciliation + jc.Expectations.CreationObserved(expectationPodsKey) return err } createdPodsCount.Inc() diff --git a/pkg/controller.v1/common/service.go b/pkg/controller.v1/common/service.go index 07db5b0d..9d993f34 100644 --- a/pkg/controller.v1/common/service.go +++ b/pkg/controller.v1/common/service.go @@ -15,10 +15,13 @@ package common import ( "fmt" + "strconv" + apiv1 "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/kubeflow/common/pkg/controller.v1/control" "github.com/kubeflow/common/pkg/controller.v1/expectation" commonutil "github.com/kubeflow/common/pkg/util" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" @@ -28,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "strconv" ) var ( @@ -277,13 +279,6 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica return err } - // Convert ReplicaType to lower string. - expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype) - err = jc.Expectations.ExpectCreations(expectationServicesKey, 1) - if err != nil { - return err - } - // Append ReplicaTypeLabel and ReplicaIndexLabel labels. labels := jc.GenLabels(job.GetName()) labels[apiv1.ReplicaTypeLabel] = string(rtype) @@ -313,6 +308,10 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica // Create OwnerReference. controllerRef := jc.GenOwnerReference(job) + // Creation is expected when there is no error returned + expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype) + jc.Expectations.RaiseExpectations(expectationServicesKey, 1, 0) + err = jc.ServiceControl.CreateServicesWithControllerRef(job.GetNamespace(), service, job.(runtime.Object), controllerRef) if err != nil && errors.IsTimeout(err) { // Service is created but its initialization has timed out. @@ -325,6 +324,10 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica succeededServiceCreationCount.Inc() return nil } else if err != nil { + // Since error occurred(the informer won't observe this service), + // we decrement the expected number of creates + // and wait until next reconciliation + jc.Expectations.CreationObserved(expectationServicesKey) failedServiceCreationCount.Inc() return err }