From 7bb490f8c5dc4370fceb0c80a564f09b79342c2a Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Tue, 23 May 2023 19:41:08 -0500 Subject: [PATCH] updating dask plugin to use container resources with overrides (#351) --- .../flytek8s/container_helper.go | 48 ++++++++++++------- go/tasks/plugins/k8s/dask/dask.go | 15 +++--- go/tasks/plugins/k8s/dask/dask_test.go | 14 ++++-- 3 files changed, 47 insertions(+), 30 deletions(-) diff --git a/go/tasks/pluginmachinery/flytek8s/container_helper.go b/go/tasks/pluginmachinery/flytek8s/container_helper.go index 460d33b71..69ee075e0 100755 --- a/go/tasks/pluginmachinery/flytek8s/container_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/container_helper.go @@ -298,27 +298,39 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template. container.Env = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetEnvironmentVariables(), parameters.TaskExecMetadata.GetTaskExecutionID()) + // retrieve platformResources and overrideResources to use when aggregating container resources + platformResources := parameters.TaskExecMetadata.GetPlatformResources() + if platformResources == nil { + platformResources = &v1.ResourceRequirements{} + } + + var overrideResources *v1.ResourceRequirements if parameters.TaskExecMetadata.GetOverrides() != nil && parameters.TaskExecMetadata.GetOverrides().GetResources() != nil { - res := parameters.TaskExecMetadata.GetOverrides().GetResources() - platformResources := parameters.TaskExecMetadata.GetPlatformResources() - if platformResources == nil { - platformResources = &v1.ResourceRequirements{} - } + overrideResources = parameters.TaskExecMetadata.GetOverrides().GetResources() + } + if overrideResources == nil { + overrideResources = &v1.ResourceRequirements{} + } - logger.Infof(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+ - " Resources [%v] with mode [%v]", res, platformResources, container.Resources, mode) - - switch mode { - case ResourceCustomizationModeAssignResources: - container.Resources = ApplyResourceOverrides(*res, *platformResources, assignIfUnset) - case ResourceCustomizationModeMergeExistingResources: - MergeResources(*res, &container.Resources) - container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, assignIfUnset) - case ResourceCustomizationModeEnsureExistingResourcesInRange: - container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, !assignIfUnset) - } + logger.Infof(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+ + " Resources [%v] with mode [%v]", overrideResources, platformResources, container.Resources, mode) - logger.Infof(ctx, "Adjusted container resources [%v]", container.Resources) + switch mode { + case ResourceCustomizationModeAssignResources: + // this will use overrideResources to set container resources and fallback to the platformResource values. + // it is important to note that this ignores the existing container.Resources values. + container.Resources = ApplyResourceOverrides(*overrideResources, *platformResources, assignIfUnset) + case ResourceCustomizationModeMergeExistingResources: + // this merges the overrideResources on top of the existing container.Resources to apply the overrides, then it + // uses the platformResource values to set defaults for any missing resource. + MergeResources(*overrideResources, &container.Resources) + container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, assignIfUnset) + case ResourceCustomizationModeEnsureExistingResourcesInRange: + // this use the platformResources defaults to ensure that the container.Resources values are within the + // platformResources limits. it will not override any existing container.Resources values. + container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, !assignIfUnset) } + + logger.Infof(ctx, "Adjusted container resources [%v]", container.Resources) return nil } diff --git a/go/tasks/plugins/k8s/dask/dask.go b/go/tasks/plugins/k8s/dask/dask.go index aa820bce3..cbf61c09e 100755 --- a/go/tasks/plugins/k8s/dask/dask.go +++ b/go/tasks/plugins/k8s/dask/dask.go @@ -60,9 +60,9 @@ func getDefaults(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, } } - defaultResources := executionMetadata.GetPlatformResources() - if executionMetadata.GetOverrides() != nil && executionMetadata.GetOverrides().GetResources() != nil { - defaultResources = executionMetadata.GetOverrides().GetResources() + containerResources, err := flytek8s.ToK8sResourceRequirements(defaultContainerSpec.GetResources()) + if err != nil { + return nil, err } jobRunnerContainer := v1.Container{ @@ -70,7 +70,7 @@ func getDefaults(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, Image: defaultImage, Args: defaultContainerSpec.GetArgs(), Env: defaultEnvVars, - Resources: *defaultResources, + Resources: *containerResources, } templateParameters := template.Parameters{ @@ -79,15 +79,16 @@ func getDefaults(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, OutputPath: taskCtx.OutputWriter(), Task: taskCtx.TaskReader(), } - err := flytek8s.AddFlyteCustomizationsToContainer(ctx, templateParameters, flytek8s.ResourceCustomizationModeAssignResources, &jobRunnerContainer) - if err != nil { + if err = flytek8s.AddFlyteCustomizationsToContainer(ctx, templateParameters, + flytek8s.ResourceCustomizationModeMergeExistingResources, &jobRunnerContainer); err != nil { + return nil, err } return &defaults{ Image: defaultImage, JobRunnerContainer: jobRunnerContainer, - Resources: defaultResources, + Resources: &jobRunnerContainer.Resources, Env: defaultEnvVars, Annotations: executionMetadata.GetAnnotations(), IsInterruptible: executionMetadata.IsInterruptible(), diff --git a/go/tasks/plugins/k8s/dask/dask_test.go b/go/tasks/plugins/k8s/dask/dask_test.go index e60400219..a04292fef 100644 --- a/go/tasks/plugins/k8s/dask/dask_test.go +++ b/go/tasks/plugins/k8s/dask/dask_test.go @@ -49,6 +49,10 @@ var ( v1.ResourceMemory: resource.MustParse("17G"), }, } + defaultResources = v1.ResourceRequirements{ + Requests: testPlatformResources.Requests, + Limits: testPlatformResources.Requests, + } ) func dummyDaskJob(status daskAPI.JobStatus) *daskAPI.DaskJob { @@ -199,7 +203,7 @@ func TestBuildResourceDaskHappyPath(t *testing.T) { assert.Equal(t, "job-runner", jobSpec.Containers[0].Name) assert.Equal(t, defaultTestImage, jobSpec.Containers[0].Image) assert.Equal(t, testArgs, jobSpec.Containers[0].Args) - assert.Equal(t, testPlatformResources, jobSpec.Containers[0].Resources) + assert.Equal(t, defaultResources, jobSpec.Containers[0].Resources) assert.Equal(t, defaultTolerations, jobSpec.Tolerations) assert.Equal(t, defaultNodeSelector, jobSpec.NodeSelector) assert.Equal(t, defaultAffinity, jobSpec.Affinity) @@ -226,7 +230,7 @@ func TestBuildResourceDaskHappyPath(t *testing.T) { } assert.Equal(t, v1.RestartPolicyNever, schedulerSpec.RestartPolicy) assert.Equal(t, defaultTestImage, schedulerSpec.Containers[0].Image) - assert.Equal(t, testPlatformResources, schedulerSpec.Containers[0].Resources) + assert.Equal(t, defaultResources, schedulerSpec.Containers[0].Resources) assert.Equal(t, []string{"dask-scheduler"}, schedulerSpec.Containers[0].Args) assert.Equal(t, expectedPorts, schedulerSpec.Containers[0].Ports) assert.Equal(t, testEnvVars, schedulerSpec.Containers[0].Env) @@ -263,7 +267,7 @@ func TestBuildResourceDaskHappyPath(t *testing.T) { assert.Equal(t, "dask-worker", workerSpec.Containers[0].Name) assert.Equal(t, v1.PullIfNotPresent, workerSpec.Containers[0].ImagePullPolicy) assert.Equal(t, defaultTestImage, workerSpec.Containers[0].Image) - assert.Equal(t, testPlatformResources, workerSpec.Containers[0].Resources) + assert.Equal(t, defaultResources, workerSpec.Containers[0].Resources) assert.Equal(t, testEnvVars, workerSpec.Containers[0].Env) assert.Equal(t, defaultTolerations, workerSpec.Tolerations) assert.Equal(t, defaultNodeSelector, workerSpec.NodeSelector) @@ -273,9 +277,9 @@ func TestBuildResourceDaskHappyPath(t *testing.T) { "--name", "$(DASK_WORKER_NAME)", "--nthreads", - "5", + "4", "--memory-limit", - "17G", + "1Gi", }, workerSpec.Containers[0].Args) }