Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
Simplify interfaces and remove useless interfaces (#85)
Browse files Browse the repository at this point in the history
* Remove interfaces that operator has common impl

These are very common implementations and actually doesn’t need interface for different operators. We already have PodControl and ServiceControl which is every reliable to create pods/services. JobController will provde common implementation for them.

- GetPodsForJob
- GetServicesForJob
- CreateService
- DeleteService
- CreatePod
- DeletePod

Signed-off-by: Jiaxin Shan <[email protected]>

* Add PodControl and ServiceControl back into JobController

* Replace pod/service CURD with methods from control

* Remove unused GetDefaultContainerPortNumber() interface

* Format code base before check in

* Add prometheus metrics back

* Add gang-scheduling annotation only if operator enable it
  • Loading branch information
Jeffwan authored May 14, 2020
1 parent 23b2612 commit 78a65a3
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 173 deletions.
23 changes: 0 additions & 23 deletions pkg/apis/common/v1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ type ControllerInterface interface {
// Returns the Job from API server
GetJobFromAPIClient(namespace, name string) (metav1.Object, error)

// GetPodsForJob returns the pods managed by the job. This can be achieved by selecting pods using label key "job-name"
// i.e. all pods created by the job will come with label "job-name" = <this_job_name>
GetPodsForJob(job interface{}) ([]*v1.Pod, error)

// GetServicesForJob returns the services managed by the job. This can be achieved by selecting services using label key "job-name"
// i.e. all services created by the job will come with label "job-name" = <this_job_name>
GetServicesForJob(job interface{}) ([]*v1.Service, error)

// DeleteJob deletes the job
DeleteJob(job interface{}) error

Expand All @@ -43,18 +35,6 @@ type ControllerInterface interface {
// UpdateJobStatusInApiServer updates the job status in API server
UpdateJobStatusInApiServer(job interface{}, jobStatus *JobStatus) error

// CreateService creates the service
CreateService(job interface{}, service *v1.Service) error

// DeleteService deletes the service
DeleteService(job interface{}, name string, namespace string) error

// CreatePod creates the pod
CreatePod(job interface{}, pod *v1.Pod) error

// DeletePod deletes the pod
DeletePod(job interface{}, pod *v1.Pod) error

// SetClusterSpec sets the cluster spec for the pod
SetClusterSpec(job interface{}, podTemplate *v1.PodTemplateSpec, rtype, index string) error

Expand All @@ -64,9 +44,6 @@ type ControllerInterface interface {
// Get the default container port name
GetDefaultContainerPortName() string

// Get the default container port number
GetDefaultContainerPortNumber() int32

// Returns if this replica type with index specified is a master role.
// MasterRole pod will have "job-role=master" set in its label
IsMasterRole(replicas map[ReplicaType]*ReplicaSpec, rtype ReplicaType, index int) bool
Expand Down
14 changes: 5 additions & 9 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func (jc *JobController) deletePodsAndServices(runPolicy *apiv1.RunPolicy, job i
if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != v1.PodRunning {
continue
}
if err := jc.Controller.DeletePod(job, pod); err != nil {
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
return err
}
// Pod and service have the same name, thus the service could be deleted using pod's name.
if err := jc.Controller.DeleteService(job, pod.Name, pod.Namespace); err != nil {
if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
return err
}
}
Expand Down Expand Up @@ -93,14 +93,13 @@ func (jc *JobController) ReconcileJobs(

oldStatus := jobStatus.DeepCopy()

pods, err := jc.Controller.GetPodsForJob(job)

pods, err := jc.GetPodsForJob(job)
if err != nil {
log.Warnf("GetPodsForJob error %v", err)
return err
}

services, err := jc.Controller.GetServicesForJob(job)
services, err := jc.GetServicesForJob(job)

if err != nil {
log.Warnf("GetServicesForJob error %v", err)
Expand Down Expand Up @@ -191,12 +190,9 @@ func (jc *JobController) ReconcileJobs(
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
}

// Save the current state of the replicas
replicasStatus := make(map[string]v1.PodPhase)

// Diff current active pods/services with replicas.
for rtype, spec := range replicas {
err := jc.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicasStatus, replicas)
err := jc.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas)
if err != nil {
log.Warnf("ReconcilePods error %v", err)
return err
Expand Down
19 changes: 19 additions & 0 deletions pkg/controller.v1/common/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
log "github.com/sirupsen/logrus"
policyapi "k8s.io/api/policy/v1beta1"
Expand Down Expand Up @@ -84,6 +85,12 @@ type JobController struct {

Config JobControllerConfiguration

// podControl is used to add or delete pods.
PodControl control.PodControlInterface

// serviceControl is used to add or delete services.
ServiceControl control.ServiceControlInterface

// KubeClientSet is a standard kubernetes clientset.
KubeClientSet kubeclientset.Interface

Expand Down Expand Up @@ -147,6 +154,16 @@ func NewJobController(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerImpl.ControllerName()})

podControl := control.RealPodControl{
KubeClient: kubeClientSet,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerImpl.ControllerName()}),
}

serviceControl := control.RealServiceControl{
KubeClient: kubeClientSet,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerImpl.ControllerName()}),
}

jobControllerConfig := JobControllerConfiguration{
ReconcilerSyncLoopPeriod: reconcilerSyncPeriod,
EnableGangScheduling: enableGangScheduling,
Expand All @@ -155,6 +172,8 @@ func NewJobController(
jc := JobController{
Controller: controllerImpl,
Config: jobControllerConfig,
PodControl: podControl,
ServiceControl: serviceControl,
KubeClientSet: kubeClientSet,
VolcanoClientSet: volcanoClientSet,
Expectations: expectation.NewControllerExpectations(),
Expand Down
34 changes: 20 additions & 14 deletions pkg/controller.v1/common/job_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package common

import (
testjob "github.com/kubeflow/common/test_job/controller.v1/test_job"
"strconv"
"testing"
"time"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/test_job/apis/test_job/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
testjobv1 "github.com/kubeflow/common/test_job/apis/test_job/v1"
testjob "github.com/kubeflow/common/test_job/controller.v1/test_job"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -51,35 +52,40 @@ func TestDeletePodsAndServices(T *testing.T) {
Services: allServices,
}

fakePodControl := &control.FakePodControl{}
fakeServiceControl := &control.FakeServiceControl{}

mainJobController := JobController{
Controller: &testJobController,
Controller: &testJobController,
PodControl: fakePodControl,
ServiceControl: fakeServiceControl,
}
runPolicy := apiv1.RunPolicy{
CleanPodPolicy: &tc.cleanPodPolicy,
}

var job interface{}
job := &testjobv1.TestJob{}
err := mainJobController.deletePodsAndServices(&runPolicy, job, allPods)

if assert.NoError(T, err) {
if tc.deleteRunningPodAndService {
// should delete the running pod and its service
assert.NotContains(T, testJobController.Pods, runningPod)
assert.NotContains(T, testJobController.Services, runningPodService)
assert.Contains(T, fakePodControl.DeletePodName, runningPod.Name)
assert.Contains(T, fakeServiceControl.DeleteServiceName, runningPodService.Name)
} else {
// should NOT delete the running pod and its service
assert.Contains(T, testJobController.Pods, runningPod)
assert.Contains(T, testJobController.Services, runningPodService)
assert.NotContains(T, fakePodControl.DeletePodName, runningPod.Name)
assert.NotContains(T, fakeServiceControl.DeleteServiceName, runningPodService.Name)
}

if tc.deleteSucceededPodAndService {
// should delete the SUCCEEDED pod and its service
assert.NotContains(T, testJobController.Pods, succeededPod)
assert.NotContains(T, testJobController.Services, succeededPodService)
assert.Contains(T, fakePodControl.DeletePodName, succeededPod.Name)
assert.Contains(T, fakeServiceControl.DeleteServiceName, succeededPodService.Name)
} else {
// should NOT delete the SUCCEEDED pod and its service
assert.Contains(T, testJobController.Pods, succeededPod)
assert.Contains(T, testJobController.Services, succeededPodService)
assert.NotContains(T, fakePodControl.DeletePodName, succeededPod.Name)
assert.NotContains(T, fakeServiceControl.DeleteServiceName, succeededPodService.Name)
}
}
}
Expand Down Expand Up @@ -177,7 +183,7 @@ func TestCleanupJobIfTTL(T *testing.T) {
}

testJobController := &testjob.TestJobController{
Job: &v1.TestJob{},
Job: &testjobv1.TestJob{},
}
mainJobController := JobController{
Controller: testJobController,
Expand All @@ -203,7 +209,7 @@ func TestCleanupJob(T *testing.T) {
}

testJobController := &testjob.TestJobController{
Job: &v1.TestJob{},
Job: &testjobv1.TestJob{},
}
mainJobController := JobController{
Controller: testJobController,
Expand Down
100 changes: 56 additions & 44 deletions pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@ import (
"strconv"
"strings"

"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
commonutil "github.com/kubeflow/common/pkg/util"
Expand All @@ -50,6 +49,8 @@ const (
// podTemplateSchedulerNameReason is the warning reason when other scheduler name is set
// in pod templates with gang-scheduling enabled
podTemplateSchedulerNameReason = "SettedPodTemplateSchedulerName"
// gangSchedulingPodGroupAnnotation is the annotation key used by batch schedulers
gangSchedulingPodGroupAnnotation = "scheduling.k8s.io/group-name"
)

var (
Expand Down Expand Up @@ -212,6 +213,46 @@ func (jc *JobController) DeletePod(obj interface{}) {
jc.WorkQueue.Add(jobKey)
}

// getPodsForJob returns the set of pods that this job should manage.
// It also reconciles ControllerRef by adopting/orphaning.
// Note that the returned Pods are pointers into the cache.
func (jc *JobController) GetPodsForJob(jobObject interface{}) ([]*v1.Pod, error) {
job, ok := jobObject.(metav1.Object)
if !ok {
return nil, fmt.Errorf("job is not of type metav1.Object")
}

// Create selector.
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: jc.GenLabels(job.GetName()),
})

if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
}
// List all pods to include those that don't match the selector anymore
// but have a ControllerRef pointing to this controller.
pods, err := jc.PodLister.Pods(job.GetNamespace()).List(labels.Everything())
if err != nil {
return nil, err
}

// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing Pods (see #42639).
canAdoptFunc := RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := jc.Controller.GetJobFromAPIClient(job.GetNamespace(), job.GetName())
if err != nil {
return nil, err
}
if fresh.GetUID() != job.GetUID() {
return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", job.GetNamespace(), job.GetName(), fresh.GetUID(), job.GetUID())
}
return fresh, nil
})
cm := controller.NewPodControllerRefManager(jc.PodControl, job, selector, jc.Controller.GetAPIGroupVersionKind(), canAdoptFunc)
return cm.ClaimPods(pods)
}

// FilterPodsForReplicaType returns pods belong to a replicaType.
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error) {
var result []*v1.Pod
Expand Down Expand Up @@ -284,7 +325,6 @@ func (jc *JobController) ReconcilePods(
pods []*v1.Pod,
rtype apiv1.ReplicaType,
spec *apiv1.ReplicaSpec,
rstatus map[string]v1.PodPhase,
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {

metaObject, ok := job.(metav1.Object)
Expand Down Expand Up @@ -334,7 +374,7 @@ func (jc *JobController) ReconcilePods(

// check if the index is in the valid range, if not, we should kill the pod
if index < 0 || index >= numReplicas {
err = jc.Controller.DeletePod(job, pod)
err = jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject)
if err != nil {
return err
}
Expand All @@ -355,7 +395,7 @@ func (jc *JobController) ReconcilePods(
if pod.Status.Phase == v1.PodFailed && trainutil.IsRetryableExitCode(exitCode) {
failedPodsCount.Inc()
logger.Infof("Need to restart the pod: %v.%v", pod.Namespace, pod.Name)
if err := jc.Controller.DeletePod(job, pod); err != nil {
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
}
Expand Down Expand Up @@ -437,10 +477,18 @@ func (jc *JobController) createNewPod(job interface{}, rt, index string, spec *a
} else {
podTemplate.Spec.SchedulerName = gangSchedulerName
}

if podTemplate.Annotations == nil {
podTemplate.Annotations = map[string]string{}
}

if jc.Config.EnableGangScheduling {
podTemplate.Annotations[gangSchedulingPodGroupAnnotation] = metaObject.GetName()
}
}
controllerRef := jc.GenOwnerReference(metaObject)

err = jc.createPodWithControllerRef(metaObject.GetNamespace(), podTemplate, runtimeObject, controllerRef)
controllerRef := jc.GenOwnerReference(metaObject)
err = jc.PodControl.CreatePodsWithControllerRef(metaObject.GetNamespace(), podTemplate, runtimeObject, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
Expand All @@ -453,43 +501,7 @@ func (jc *JobController) createNewPod(job interface{}, rt, index string, spec *a
} else if err != nil {
return err
}
return nil
}

func (jc *JobController) createPodWithControllerRef(namespace string, template *v1.PodTemplateSpec,
controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := control.ValidateControllerRef(controllerRef); err != nil {
return err
}
return jc.createPod("", namespace, template, controllerObject, controllerRef)
}

func (jc *JobController) createPod(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
pod, err := control.GetPodFromTemplate(template, object, controllerRef)
pod.Namespace = namespace
if err != nil {
return err
}
if len(nodeName) != 0 {
pod.Spec.NodeName = nodeName
}
if labels.Set(pod.Labels).AsSelectorPreValidated().Empty() {
return fmt.Errorf("unable to create pods, no labels")
}
if err := jc.Controller.CreatePod(object, pod); err != nil {
jc.Recorder.Eventf(object, v1.EventTypeWarning, control.FailedCreatePodReason, "Error creating: %v", err)
return err
} else {
logger := commonutil.LoggerForPod(pod, jc.Controller.GetAPIGroupVersionKind().Kind)
accessor, err := meta.Accessor(object)
if err != nil {
logger.Errorf("parentObject does not have ObjectMeta, %v", err)
return nil
}
createdPodsCount.Inc()
logger.Infof("Controller %v created pod %v", accessor.GetName(), pod.Name)
jc.Recorder.Eventf(object, v1.EventTypeNormal, control.SuccessfulCreatePodReason, "Created pod: %v", pod.Name)
}
createdPodsCount.Inc()
return nil
}

Expand Down
Loading

0 comments on commit 78a65a3

Please sign in to comment.