Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
fix(expectation): more appropriate use of expectation (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
eggiter authored Jul 8, 2021
1 parent 5a4f6c5 commit 9ffa565
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 21 deletions.
20 changes: 19 additions & 1 deletion pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"time"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/common/pkg/util"
"github.com/kubeflow/common/pkg/util/k8sutil"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -114,8 +116,14 @@ func (jc *JobController) ReconcileJobs(
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job object %#v: %v", job, err))
return err
}
log.Infof("Reconciling for job %s", metaObject.GetName())
// Reset expectations
// 1. Since `ReconcileJobs` is called, we expect that previous expectations are all satisfied,
// and it's safe to reset the expectations
// 2. Reset expectations can avoid dirty data such as `expectedDeletion = -1`
// (pod or service was deleted unexpectedly)
jc.ResetExpectations(jobKey, replicas)

log.Infof("Reconciling for job %s", metaObject.GetName())
pods, err := jc.Controller.GetPodsForJob(job)
if err != nil {
log.Warnf("GetPodsForJob error %v", err)
Expand Down Expand Up @@ -315,6 +323,16 @@ func (jc *JobController) ReconcileJobs(
return nil
}

// ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) {
for rtype := range replicas {
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)
jc.Expectations.SetExpectations(expectationPodsKey, 0, 0)
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype)
jc.Expectations.SetExpectations(expectationServicesKey, 0, 0)
}
}

// PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func (jc *JobController) PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool {
if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil {
Expand Down
39 changes: 27 additions & 12 deletions pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ package common

import (
"fmt"
"reflect"
"strconv"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/common/pkg/util"
trainutil "github.com/kubeflow/common/pkg/util/train"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
Expand All @@ -28,12 +35,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"reflect"
"strconv"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
commonutil "github.com/kubeflow/common/pkg/util"
trainutil "github.com/kubeflow/common/pkg/util/train"
)

const (
Expand Down Expand Up @@ -333,11 +334,17 @@ func (jc *JobController) ReconcilePods(
if !ok {
return fmt.Errorf("job is not a runtime.Object type")
}
jobKey, err := KeyFunc(metaObject)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err))
return err
}
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)

// Convert ReplicaType to lower string.
logger := commonutil.LoggerForReplica(metaObject, rtype)
// Get all pods for the type rt.
pods, err := jc.FilterPodsForReplicaType(pods, rtype)
pods, err = jc.FilterPodsForReplicaType(pods, rtype)
if err != nil {
return err
}
Expand Down Expand Up @@ -375,6 +382,8 @@ func (jc *JobController) ReconcilePods(
if err != nil {
return err
}
// Deletion is expected
jc.Expectations.RaiseExpectations(expectationPodsKey, 0, 1)
}

// Get the exit code of the container.
Expand All @@ -395,6 +404,8 @@ func (jc *JobController) ReconcilePods(
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
// Deletion is expected
jc.Expectations.RaiseExpectations(expectationPodsKey, 0, 1)
}
}

Expand All @@ -421,11 +432,6 @@ func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, ind
utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err))
return err
}
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt)
err = jc.Expectations.ExpectCreations(expectationPodsKey, 1)
if err != nil {
return err
}
logger := commonutil.LoggerForReplica(metaObject, rt)

// Set type and index for the worker.
Expand Down Expand Up @@ -484,6 +490,11 @@ func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, ind
}
}

// Creation is expected when there is no error returned
// We use `RaiseExpectations` here to accumulate expectations since `SetExpectations` has no such kind of ability
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt)
jc.Expectations.RaiseExpectations(expectationPodsKey, 1, 0)

controllerRef := jc.GenOwnerReference(metaObject)
err = jc.PodControl.CreatePodsWithControllerRef(metaObject.GetNamespace(), podTemplate, runtimeObject, controllerRef)
if err != nil && errors.IsTimeout(err) {
Expand All @@ -496,6 +507,10 @@ func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, ind
// pod when the expectation expires.
return nil
} else if err != nil {
// Since error occurred(the informer won't observe this pod),
// we decrement the expected number of creates
// and wait until next reconciliation
jc.Expectations.CreationObserved(expectationPodsKey)
return err
}
createdPodsCount.Inc()
Expand Down
19 changes: 11 additions & 8 deletions pkg/controller.v1/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ package common

import (
"fmt"
"strconv"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/common/pkg/util"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
Expand All @@ -28,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"strconv"
)

var (
Expand Down Expand Up @@ -277,13 +279,6 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica
return err
}

// Convert ReplicaType to lower string.
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype)
err = jc.Expectations.ExpectCreations(expectationServicesKey, 1)
if err != nil {
return err
}

// Append ReplicaTypeLabel and ReplicaIndexLabel labels.
labels := jc.GenLabels(job.GetName())
labels[apiv1.ReplicaTypeLabel] = string(rtype)
Expand Down Expand Up @@ -313,6 +308,10 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica
// Create OwnerReference.
controllerRef := jc.GenOwnerReference(job)

// Creation is expected when there is no error returned
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype)
jc.Expectations.RaiseExpectations(expectationServicesKey, 1, 0)

err = jc.ServiceControl.CreateServicesWithControllerRef(job.GetNamespace(), service, job.(runtime.Object), controllerRef)
if err != nil && errors.IsTimeout(err) {
// Service is created but its initialization has timed out.
Expand All @@ -325,6 +324,10 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica
succeededServiceCreationCount.Inc()
return nil
} else if err != nil {
// Since error occurred(the informer won't observe this service),
// we decrement the expected number of creates
// and wait until next reconciliation
jc.Expectations.CreationObserved(expectationServicesKey)
failedServiceCreationCount.Inc()
return err
}
Expand Down

0 comments on commit 9ffa565

Please sign in to comment.