Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2879] [shim] yunikorn unschedulable pods pending forever #929

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 59 additions & 21 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,20 @@ import (
)

type Task struct {
taskID string
alias string
applicationID string
application *Application
podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons
context *Context
createTime time.Time
placeholder bool
originator bool
sm *fsm.FSM

taskID string
alias string
applicationID string
application *Application
podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons
context *Context
createTime time.Time
retryTimer *time.Timer // timer for task retry
placeholder bool
originator bool
sm *fsm.FSM
retryTimeInterval time.Duration
retryNum int
attempt int
// mutable resources, require locking
allocationKey string
nodeName string
Expand Down Expand Up @@ -109,6 +112,9 @@ func createTaskInternal(tid string, app *Application, resource *si.Resource,
if tgName := utils.GetTaskGroupFromPodSpec(pod); tgName != "" {
task.taskGroupName = tgName
}
task.retryNum = utils.GetTaskRetryNumFromPodSpec(pod)
task.retryTimeInterval = utils.GetTaskRetryTimeIntervalFromPodSpec(pod)

task.initialize()
return task
}
Expand Down Expand Up @@ -141,6 +147,13 @@ func (task *Task) GetTaskID() string {
return task.taskID
}

func (task *Task) GetTaskRetryNum() int {
return task.retryNum
}
func (task *Task) GetTaskRetryTimeInterval() time.Duration {
return task.retryTimeInterval
}

func (task *Task) IsPlaceholder() bool {
return task.placeholder
}
Expand Down Expand Up @@ -375,8 +388,7 @@ func (task *Task) postTaskAllocated() {
zap.String("podName", task.pod.Name),
zap.String("podUID", string(task.pod.UID)))
if err := task.context.bindPodVolumes(task.pod); err != nil {
log.Log(log.ShimCacheTask).Error("bind volumes to pod failed", zap.String("taskID", task.taskID), zap.Error(err))
task.failWithEvent(fmt.Sprintf("bind volumes to pod failed, name: %s, %s", task.alias, err.Error()), "PodVolumesBindFailure")
task.RetryThenFailTask(fmt.Sprintf("bind volumes to pod failed, name: %s, %s", task.alias, err.Error()), "PodVolumesBindFailure")
return
}

Expand All @@ -385,8 +397,7 @@ func (task *Task) postTaskAllocated() {
zap.String("podUID", string(task.pod.UID)))

if err := task.context.apiProvider.GetAPIs().KubeClient.Bind(task.pod, task.nodeName); err != nil {
log.Log(log.ShimCacheTask).Error("bind pod to node failed", zap.String("taskID", task.taskID), zap.Error(err))
task.failWithEvent(fmt.Sprintf("bind pod to node failed, name: %s, %s", task.alias, err.Error()), "PodBindFailure")
task.RetryThenFailTask(fmt.Sprintf("bind pod to node failed, name: %s, %s", task.alias, err.Error()), "PodBindFailure")
return
}

Expand Down Expand Up @@ -453,12 +464,8 @@ func (task *Task) postTaskBound() {
}

func (task *Task) postTaskRejected() {
// currently, once task is rejected by scheduler, we directly move task to failed state.
// so this function simply triggers the state transition when it is rejected.
// but further, we can introduce retry mechanism if necessary.
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias)))

// once task is rejected by scheduler, we retry before move task to failed state.
task.RetryThenFailTask(fmt.Sprintf("task %s failed because it is rejected by scheduler", task.alias), "TaskRejected")
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeWarning, "TaskRejected", "TaskRejected",
"Task %s is rejected by the scheduler", task.alias)
Expand Down Expand Up @@ -655,3 +662,34 @@ func (task *Task) SetTaskPod(pod *v1.Pod) {
task.updateAllocation()
}
}
func (task *Task) setRetryTimer(timeout time.Duration, currentState string, event RetryTaskEvent) {
log.Log(log.ShimContext).Debug("Task retry timer initiated",
zap.String("appID", task.applicationID),
zap.String("TaskID", task.taskID),
zap.String("state", task.sm.Current()),
zap.Duration("timeout", timeout))

task.retryTimer = time.AfterFunc(timeout, task.timeoutRetryTimer(currentState, event))
}

func (task *Task) timeoutRetryTimer(expectedState string, event RetryTaskEvent) func() {
return func() {
task.lock.Lock()
defer task.lock.Unlock()
if expectedState == task.sm.Current() {
dispatcher.Dispatch(event)
}

}
}

func (task *Task) RetryThenFailTask(errorMessage, actionReason string) {
if task.attempt < task.retryNum {
log.Log(log.ShimCacheTask).Info("task failed, task will retrying", zap.String("taskID", task.taskID), zap.Int("attempt", task.attempt), zap.Int("retryNum", task.retryNum), zap.Duration("retryTimeInterval", task.retryTimeInterval), zap.String("errorMessage", errorMessage))
task.attempt++
task.setRetryTimer(task.retryTimeInterval, task.sm.Current(), NewRetryTaskEvent(task.applicationID, task.taskID, "retrying task"))
} else {
log.Log(log.ShimCacheTask).Error("task failed ", zap.String("taskID", task.taskID), zap.String("errorMessage", errorMessage))
task.failWithEvent(errorMessage, actionReason)
}
}
53 changes: 52 additions & 1 deletion pkg/cache/task_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ const (
TaskFail
KillTask
TaskKilled
TaskRetry
)

func (ae TaskEventType) String() string {
return [...]string{"InitTask", "SubmitTask", "TaskAllocated", "TaskRejected", "TaskBound", "CompleteTask", "TaskFail", "KillTask", "TaskKilled"}[ae]
return [...]string{"InitTask", "SubmitTask", "TaskAllocated", "TaskRejected", "TaskBound", "CompleteTask", "TaskFail", "KillTask", "TaskKilled", "TaskRetry"}[ae]
}

// ------------------------
Expand Down Expand Up @@ -202,6 +203,20 @@ type FailTaskEvent struct {
message string
}

type RetryTaskEvent struct {
applicationID string
taskID string
event TaskEventType
message string
}

type RetryTaskEvent struct {
applicationID string
taskID string
event TaskEventType
message string
}

func NewFailTaskEvent(appID string, taskID string, failedMessage string) FailTaskEvent {
return FailTaskEvent{
applicationID: appID,
Expand All @@ -211,6 +226,33 @@ func NewFailTaskEvent(appID string, taskID string, failedMessage string) FailTas
}
}

func NewRetryTaskEvent(appID string, taskID string, retryMessage string) RetryTaskEvent {
return RetryTaskEvent{
applicationID: appID,
taskID: taskID,
event: TaskRetry,
message: retryMessage,
}
}

func (fe RetryTaskEvent) GetEvent() string {
return fe.event.String()
}

func (fe RetryTaskEvent) GetArgs() []interface{} {
args := make([]interface{}, 1)
args[0] = fe.message
return args
}

func (fe RetryTaskEvent) GetTaskID() string {
return fe.taskID
}

func (fe RetryTaskEvent) GetApplicationID() string {
return fe.applicationID
}

func (fe FailTaskEvent) GetEvent() string {
return fe.event.String()
}
Expand Down Expand Up @@ -371,6 +413,11 @@ func eventDesc(states *TStates) fsm.Events {
Src: []string{states.New, states.Pending, states.Scheduling, states.Rejected, states.Allocated},
Dst: states.Failed,
},
{
Name: TaskRetry.String(),
Src: []string{states.Rejected, states.Allocated},
Dst: states.Pending,
},
}
}

Expand Down Expand Up @@ -433,6 +480,10 @@ func callbacks(states *TStates) fsm.Callbacks {
task := event.Args[0].(*Task) //nolint:errcheck
task.beforeTaskCompleted()
},
beforeHook(TaskRetry): func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
task.releaseAllocation()
},
SubmitTask.String(): func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
task.handleSubmitTaskEvent()
Expand Down
7 changes: 7 additions & 0 deletions pkg/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package constants

import (
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"time"
)

// Common
Expand Down Expand Up @@ -56,6 +57,9 @@ const DefaultAppNamespace = "default"
const DefaultUserLabel = DomainYuniKorn + "username"
const DefaultUser = "nobody"

const DefaultTaskRetryTimeInterval = 5 * time.Second
const DefaultTaskRetryNum = 5

// Spark
const SparkLabelAppID = "spark-app-selector"

Expand Down Expand Up @@ -87,6 +91,9 @@ const SchedulingPolicyParamDelimiter = " "
const SchedulingPolicyStyleParam = "gangSchedulingStyle"
const SchedulingPolicyStyleParamDefault = "Soft"

const AnnotationTaskRetryName = DomainYuniKorn + "task-retry-num"
const AnnotationTaskRetryIntervalName = DomainYuniKorn + "task-retry-interval"

var SchedulingPolicyStyleParamValues = map[string]string{"Hard": "Hard", "Soft": "Soft"}

const ApplicationInsufficientResourcesFailure = "ResourceReservationTimeout"
Expand Down
18 changes: 18 additions & 0 deletions pkg/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,24 @@ func GetTaskGroupFromPodSpec(pod *v1.Pod) string {
return GetPodAnnotationValue(pod, constants.AnnotationTaskGroupName)
}

func GetTaskRetryNumFromPodSpec(pod *v1.Pod) int {
if value := GetPodAnnotationValue(pod, constants.AnnotationTaskRetryName); value != "" {
if v, err := strconv.Atoi(value); err == nil {
return v
}
}
return constants.DefaultTaskRetryNum
}

func GetTaskRetryTimeIntervalFromPodSpec(pod *v1.Pod) time.Duration {
if value := GetPodAnnotationValue(pod, constants.AnnotationTaskRetryIntervalName); value != "" {
if v, err := time.ParseDuration(value); err == nil {
return v
}
}
return constants.DefaultTaskRetryTimeInterval
}

func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool {
if value := GetPodAnnotationValue(pod, constants.AnnotationPlaceholderFlag); value != "" {
if v, err := strconv.ParseBool(value); err == nil {
Expand Down
31 changes: 30 additions & 1 deletion pkg/shim/scheduler_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,42 @@ func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, expe
}
}

func (fc *MockScheduler) waitAndAssertTaskStateWithRetryParam(t *testing.T, appID, taskID, expectedState string, retryNum int, retryTimeInterval time.Duration, timeInteval time.Duration) {
app := fc.context.GetApplication(appID)
assert.Equal(t, app != nil, true)
assert.Equal(t, app.GetApplicationID(), appID)

task := app.GetTask(taskID)
assert.Equal(t, task.GetTaskRetryNum(), retryNum)
assert.Equal(t, task.GetTaskRetryTimeInterval(), retryTimeInterval)
deadline := time.Now().Add(timeInteval)
for {
if task.GetTaskState() == expectedState {
break
}
log.Log(log.Test).Info("waiting for task state",
zap.String("expected", expectedState),
zap.String("actual", task.GetTaskState()))
time.Sleep(time.Second)
if time.Now().After(deadline) {
t.Errorf("task %s doesn't reach expected state in given time, expecting: %s, actual: %s",
taskID, expectedState, task.GetTaskState())
return
}
}
}

func (fc *MockScheduler) waitAndAssertTaskState(t *testing.T, appID, taskID, expectedState string) {
fc.waitAndAssertTaskStateAfterTimeInterval(t, appID, taskID, expectedState, 10*time.Second)
}

func (fc *MockScheduler) waitAndAssertTaskStateAfterTimeInterval(t *testing.T, appID, taskID, expectedState string, timeInteval time.Duration) {
app := fc.context.GetApplication(appID)
assert.Equal(t, app != nil, true)
assert.Equal(t, app.GetApplicationID(), appID)

task := app.GetTask(taskID)
deadline := time.Now().Add(10 * time.Second)
deadline := time.Now().Add(timeInteval)
for {
if task.GetTaskState() == expectedState {
break
Expand Down
Loading