Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Writing errors for subtask failures (#260)
Browse files Browse the repository at this point in the history
* writing an error.pb file on error

Signed-off-by: Daniel Rammer <[email protected]>

* fixing

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Apr 13, 2022
1 parent 60ea4b6 commit 6db081f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 8 deletions.
30 changes: 26 additions & 4 deletions go/tasks/plugins/array/k8s/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/flyteorg/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyteplugins/go/tasks/logs"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/flyteorg/flyteplugins/go/tasks/plugins/array"
"github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus"
arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core"
Expand Down Expand Up @@ -211,6 +213,30 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon

if phaseInfo.Err() != nil {
messageCollector.Collect(childIdx, phaseInfo.Err().String())

// If the service reported an error but there is no error.pb written, write one with the
// service-provided error message.
or, err := array.ConstructOutputReader(ctx, dataStore, outputPrefix, baseOutputDataSandbox, originalIdx)
if err != nil {
return currentState, externalResources, err
}

if hasErr, err := or.IsError(ctx); err != nil {
return currentState, externalResources, err
} else if !hasErr {
// The subtask has not produced an error.pb, write one.
ow, err := array.ConstructOutputWriter(ctx, dataStore, outputPrefix, baseOutputDataSandbox, originalIdx)
if err != nil {
return currentState, externalResources, err
}

if err = ow.Put(ctx, ioutils.NewInMemoryOutputReader(nil, &io.ExecutionError{
ExecutionError: phaseInfo.Err(),
IsRecoverable: phaseInfo.Phase() != core.PhasePermanentFailure,
})); err != nil {
return currentState, externalResources, err
}
}
}

if phaseInfo.Err() != nil && phaseInfo.Err().GetKind() == idlCore.ExecutionError_SYSTEM {
Expand Down Expand Up @@ -250,10 +276,6 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
}

// process phaseInfo
if phaseInfo.Err() != nil {
messageCollector.Collect(childIdx, phaseInfo.Err().String())
}

var logLinks []*idlCore.TaskLog
if phaseInfo.Info() != nil {
logLinks = phaseInfo.Info().Logs
Expand Down
24 changes: 21 additions & 3 deletions go/tasks/plugins/array/k8s/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type metadata struct {
exists bool
size int64
}

func (m metadata) Exists() bool {
return m.exists
}

func (m metadata) Size() int64 {
return m.size
}

func createSampleContainerTask() *core2.Container {
return &core2.Container{
Command: []string{"cmd"},
Expand Down Expand Up @@ -106,8 +119,13 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta
ir.OnGetInputPath().Return("/prefix/inputs.pb")
ir.OnGetMatch(mock.Anything).Return(&core2.LiteralMap{}, nil)

composedProtobufStore := &stdmocks.ComposedProtobufStore{}
matchedBy := mock.MatchedBy(func(s storage.DataReference) bool {
return true
})
composedProtobufStore.On("Head", mock.Anything, matchedBy).Return(metadata{true, 0}, nil)
dataStore := &storage.DataStore{
ComposedProtobufStore: &stdmocks.ComposedProtobufStore{},
ComposedProtobufStore: composedProtobufStore,
ReferenceConstructor: &storage.URLPathConstructor{},
}

Expand Down Expand Up @@ -446,7 +464,7 @@ func TestCheckSubTasksState(t *testing.T) {
}

// execute
newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)
newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, tCtx.DataStore(), "/prefix/", "/prefix-sand/", currentState)

// validate results
assert.Nil(t, err)
Expand Down Expand Up @@ -495,7 +513,7 @@ func TestCheckSubTasksState(t *testing.T) {
}

// execute
newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", currentState)
newState, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, tCtx.DataStore(), "/prefix/", "/prefix-sand/", currentState)

// validate results
assert.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/k8s/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Conf
return pluginsCore.PhaseInfoRetryableFailure("RuntimeFailure", err.Error(), nil), nil
} else if k8serrors.IsBadRequest(err) || k8serrors.IsInvalid(err) {
logger.Errorf(ctx, "Badly formatted resource for plugin [%s], err %s", executorName, err)
// return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskFormat", err.Error(), nil)), nil
return pluginsCore.PhaseInfoFailure("BadTaskFormat", err.Error(), nil), nil
} else if k8serrors.IsRequestEntityTooLargeError(err) {
logger.Errorf(ctx, "Badly formatted resource for plugin [%s], err %s", executorName, err)
return pluginsCore.PhaseInfoFailure("EntityTooLarge", err.Error(), nil), nil
Expand Down

0 comments on commit 6db081f

Please sign in to comment.