From 68cd4a57cd98fe08f4f4c67fc9c8d261825861e0 Mon Sep 17 00:00:00 2001 From: Helber Belmiro Date: Fri, 20 Dec 2024 12:05:12 -0300 Subject: [PATCH 1/5] UPSTREAM: 11475: fix(backend): modelToCRDTrigger was not including periodic schedule correctly (#11475) --------- Signed-off-by: Helber Belmiro (cherry picked from commit 97acacbd2a0b72d442398ca04382ac1e6d9aa37f) --- backend/src/apiserver/model/cron_schedule.go | 34 +++++++++++++++++++ backend/src/apiserver/model/job.go | 28 --------------- .../src/apiserver/model/periodic_schedule.go | 34 +++++++++++++++++++ backend/src/apiserver/template/template.go | 4 +-- 4 files changed, 70 insertions(+), 30 deletions(-) create mode 100644 backend/src/apiserver/model/cron_schedule.go create mode 100644 backend/src/apiserver/model/periodic_schedule.go diff --git a/backend/src/apiserver/model/cron_schedule.go b/backend/src/apiserver/model/cron_schedule.go new file mode 100644 index 00000000000..1ee535acaa3 --- /dev/null +++ b/backend/src/apiserver/model/cron_schedule.go @@ -0,0 +1,34 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package model + +type CronSchedule struct { + // Time at which scheduling starts. + // If no start time is specified, the StartTime is the creation time of the schedule. + CronScheduleStartTimeInSec *int64 `gorm:"column:CronScheduleStartTimeInSec;"` + + // Time at which scheduling ends. + // If no end time is specified, the EndTime is the end of time. + CronScheduleEndTimeInSec *int64 `gorm:"column:CronScheduleEndTimeInSec;"` + + // Cron string describing when a workflow should be created within the + // time interval defined by StartTime and EndTime. + Cron *string `gorm:"column:Schedule;"` +} + +func (c CronSchedule) IsEmpty() bool { + return (c.CronScheduleStartTimeInSec == nil || *c.CronScheduleStartTimeInSec == 0) && + (c.CronScheduleEndTimeInSec == nil || *c.CronScheduleEndTimeInSec == 0) && + (c.Cron == nil || *c.Cron == "") +} diff --git a/backend/src/apiserver/model/job.go b/backend/src/apiserver/model/job.go index a9f0f7a6177..7cdead7e8bd 100644 --- a/backend/src/apiserver/model/job.go +++ b/backend/src/apiserver/model/job.go @@ -189,34 +189,6 @@ type Trigger struct { PeriodicSchedule } -type CronSchedule struct { - // Time at which scheduling starts. - // If no start time is specified, the StartTime is the creation time of the schedule. - CronScheduleStartTimeInSec *int64 `gorm:"column:CronScheduleStartTimeInSec;"` - - // Time at which scheduling ends. - // If no end time is specified, the EndTime is the end of time. - CronScheduleEndTimeInSec *int64 `gorm:"column:CronScheduleEndTimeInSec;"` - - // Cron string describing when a workflow should be created within the - // time interval defined by StartTime and EndTime. - Cron *string `gorm:"column:Schedule;"` -} - -type PeriodicSchedule struct { - // Time at which scheduling starts. - // If no start time is specified, the StartTime is the creation time of the schedule. - PeriodicScheduleStartTimeInSec *int64 `gorm:"column:PeriodicScheduleStartTimeInSec;"` - - // Time at which scheduling ends. - // If no end time is specified, the EndTime is the end of time. - PeriodicScheduleEndTimeInSec *int64 `gorm:"column:PeriodicScheduleEndTimeInSec;"` - - // Interval describing when a workflow should be created within the - // time interval defined by StartTime and EndTime. - IntervalSecond *int64 `gorm:"column:IntervalSecond;"` -} - func (j Job) GetValueOfPrimaryKey() string { return fmt.Sprint(j.UUID) } diff --git a/backend/src/apiserver/model/periodic_schedule.go b/backend/src/apiserver/model/periodic_schedule.go new file mode 100644 index 00000000000..af28ca61cff --- /dev/null +++ b/backend/src/apiserver/model/periodic_schedule.go @@ -0,0 +1,34 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package model + +type PeriodicSchedule struct { + // Time at which scheduling starts. + // If no start time is specified, the StartTime is the creation time of the schedule. + PeriodicScheduleStartTimeInSec *int64 `gorm:"column:PeriodicScheduleStartTimeInSec;"` + + // Time at which scheduling ends. + // If no end time is specified, the EndTime is the end of time. + PeriodicScheduleEndTimeInSec *int64 `gorm:"column:PeriodicScheduleEndTimeInSec;"` + + // Interval describing when a workflow should be created within the + // time interval defined by StartTime and EndTime. + IntervalSecond *int64 `gorm:"column:IntervalSecond;"` +} + +func (p PeriodicSchedule) IsEmpty() bool { + return (p.PeriodicScheduleStartTimeInSec == nil || *p.PeriodicScheduleStartTimeInSec == 0) && + (p.PeriodicScheduleEndTimeInSec == nil || *p.PeriodicScheduleEndTimeInSec == 0) && + (p.IntervalSecond == nil || *p.IntervalSecond == 0) +} diff --git a/backend/src/apiserver/template/template.go b/backend/src/apiserver/template/template.go index 4753dadc9fd..c026abeb2df 100644 --- a/backend/src/apiserver/template/template.go +++ b/backend/src/apiserver/template/template.go @@ -232,7 +232,7 @@ func modelToParametersMap(modelParameters string) (map[string]string, error) { func modelToCRDTrigger(modelTrigger model.Trigger) (scheduledworkflow.Trigger, error) { crdTrigger := scheduledworkflow.Trigger{} // CronSchedule and PeriodicSchedule can have at most one being non-empty - if modelTrigger.CronSchedule != (model.CronSchedule{}) { + if !modelTrigger.CronSchedule.IsEmpty() { // Check if CronSchedule is non-empty crdCronSchedule := scheduledworkflow.CronSchedule{} if modelTrigger.Cron != nil { @@ -247,7 +247,7 @@ func modelToCRDTrigger(modelTrigger model.Trigger) (scheduledworkflow.Trigger, e crdCronSchedule.EndTime = &endTime } crdTrigger.CronSchedule = &crdCronSchedule - } else if modelTrigger.PeriodicSchedule != (model.PeriodicSchedule{}) { + } else if !modelTrigger.PeriodicSchedule.IsEmpty() { // Check if PeriodicSchedule is non-empty crdPeriodicSchedule := scheduledworkflow.PeriodicSchedule{} if modelTrigger.IntervalSecond != nil { From 2d9cdfce6209b815562470384ddc8cefe5733567 Mon Sep 17 00:00:00 2001 From: Helber Belmiro Date: Wed, 8 Jan 2025 11:05:21 -0300 Subject: [PATCH 2/5] UPSTREAM: 11480: chore(backend): Fixed namespace in job creation Signed-off-by: Helber Belmiro (cherry picked from commit 72f11d9801512fae6523c651cf659591099503f3) Signed-off-by: Helber Belmiro (cherry picked from commit 34bf2f8d6b5b2f35fb3e6ff641e6bda09cb4a24e) --- .../src/apiserver/resource/resource_manager.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index ef062517ace..43556371200 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -990,12 +990,6 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest") } - // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). - // Convert modelJob into scheduledWorkflow. - scheduledWorkflow, err := tmpl.ScheduledWorkflow(job, r.getOwnerReferences()) - if err != nil { - return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") - } // Create a new ScheduledWorkflow at the ScheduledWorkflow client. k8sNamespace := job.Namespace if k8sNamespace == "" { @@ -1004,6 +998,15 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model if k8sNamespace == "" { return nil, util.NewInternalServerError(util.NewInvalidInputError("Namespace cannot be empty when creating an Argo scheduled workflow. Check if you have specified POD_NAMESPACE or try adding the parent namespace to the request"), "Failed to create a recurring run due to empty namespace") } + + job.Namespace = k8sNamespace + + // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). + // Convert modelJob into scheduledWorkflow. + scheduledWorkflow, err := tmpl.ScheduledWorkflow(job, r.getOwnerReferences()) + if err != nil { + return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") + } newScheduledWorkflow, err := r.getScheduledWorkflowClient(k8sNamespace).Create(ctx, scheduledWorkflow) if err != nil { if err, ok := err.(net.Error); ok && err.Timeout() { @@ -1015,7 +1018,6 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model swf := util.NewScheduledWorkflow(newScheduledWorkflow) job.UUID = string(swf.UID) job.K8SName = swf.Name - job.Namespace = swf.Namespace job.Conditions = model.StatusState(swf.ConditionSummary()).ToString() for _, modelRef := range job.ResourceReferences { modelRef.ResourceUUID = string(swf.UID) From cd71c283bfaa0864c6eed002f2149c9f97004d53 Mon Sep 17 00:00:00 2001 From: Helber Belmiro Date: Wed, 8 Jan 2025 11:06:21 -0300 Subject: [PATCH 3/5] UPSTREAM: 11481: chore(backend): Fixed ServiceAccount in job creation Signed-off-by: Helber Belmiro (cherry picked from commit 2686e017ceca21671d47a6f8d5703ad94b7f0615) --- .../apiserver/resource/resource_manager.go | 19 ++++++++++--------- .../src/apiserver/template/template_test.go | 7 ++++--- backend/src/apiserver/template/v2_template.go | 6 ++++-- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 43556371200..b5954690a1e 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -1022,18 +1022,19 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model for _, modelRef := range job.ResourceReferences { modelRef.ResourceUUID = string(swf.UID) } - // Get the service account - serviceAccount := "" - if swf.Spec.Workflow != nil { - execSpec, err := util.ScheduleSpecToExecutionSpec(util.ArgoWorkflow, swf.Spec.Workflow) - if err == nil { - serviceAccount = execSpec.ServiceAccount() - } - } - job.ServiceAccount = serviceAccount if tmpl.GetTemplateType() == template.V1 { + // Get the service account + serviceAccount := "" + if swf.Spec.Workflow != nil { + execSpec, err := util.ScheduleSpecToExecutionSpec(util.ArgoWorkflow, swf.Spec.Workflow) + if err == nil { + serviceAccount = execSpec.ServiceAccount() + } + } + job.ServiceAccount = serviceAccount job.PipelineSpec.WorkflowSpecManifest = manifest } else { + job.ServiceAccount = newScheduledWorkflow.Spec.ServiceAccount job.PipelineSpec.PipelineSpecManifest = manifest } return r.jobStore.CreateJob(job) diff --git a/backend/src/apiserver/template/template_test.go b/backend/src/apiserver/template/template_test.go index db417293632..0c0cdc729ec 100644 --- a/backend/src/apiserver/template/template_test.go +++ b/backend/src/apiserver/template/template_test.go @@ -220,9 +220,10 @@ func TestScheduledWorkflow(t *testing.T) { Parameters: []scheduledworkflow.Parameter{{Name: "y", Value: "\"world\""}}, Spec: "", }, - PipelineId: "1", - PipelineName: "pipeline name", - NoCatchup: util.BoolPointer(true), + PipelineId: "1", + PipelineName: "pipeline name", + NoCatchup: util.BoolPointer(true), + ServiceAccount: "pipeline-runner", }, } diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index bc9477bb4a5..22ad326fbe9 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -97,7 +97,9 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1 if modelJob.Namespace != "" { executionSpec.SetExecutionNamespace(modelJob.Namespace) } - setDefaultServiceAccount(executionSpec, modelJob.ServiceAccount) + if executionSpec.ServiceAccount() == "" { + setDefaultServiceAccount(executionSpec, modelJob.ServiceAccount) + } // Disable istio sidecar injection if not specified executionSpec.SetAnnotationsToAllTemplatesIfKeyNotExist(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled) swfGeneratedName, err := toSWFCRDResourceGeneratedName(modelJob.K8SName) @@ -135,7 +137,7 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1 PipelineId: modelJob.PipelineId, PipelineName: modelJob.PipelineName, PipelineVersionId: modelJob.PipelineVersionId, - ServiceAccount: modelJob.ServiceAccount, + ServiceAccount: executionSpec.ServiceAccount(), }, } return scheduledWorkflow, nil From aeb0151c171f812d6ca0d2fc3cb5a38ff76e47fc Mon Sep 17 00:00:00 2001 From: Helber Belmiro Date: Wed, 8 Jan 2025 12:24:22 -0300 Subject: [PATCH 4/5] UPSTREAM: 11469: fix(backend): Synced ScheduledWorkflow CRs on apiserver startup Signed-off-by: Helber Belmiro (cherry picked from commit 8ab8dae806e55e00b9e8c3d2c1f0244cb1636903) --- .../client/scheduled_workflow_fake.go | 6 +- backend/src/apiserver/list/list.go | 19 ++++- backend/src/apiserver/list/list_test.go | 7 ++ backend/src/apiserver/main.go | 21 +++++- .../apiserver/resource/resource_manager.go | 73 +++++++++++++++++++ .../resource/resource_manager_test.go | 29 ++++++++ 6 files changed, 146 insertions(+), 9 deletions(-) diff --git a/backend/src/apiserver/client/scheduled_workflow_fake.go b/backend/src/apiserver/client/scheduled_workflow_fake.go index 5b81722ee35..970fd26e658 100644 --- a/backend/src/apiserver/client/scheduled_workflow_fake.go +++ b/backend/src/apiserver/client/scheduled_workflow_fake.go @@ -66,9 +66,9 @@ func (c *FakeScheduledWorkflowClient) Get(ctx context.Context, name string, opti return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("scheduledworkflows.kubeflow.org"), name) } -func (c *FakeScheduledWorkflowClient) Update(context.Context, *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil +func (c *FakeScheduledWorkflowClient) Update(_ context.Context, scheduledWorkflow *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) { + c.scheduledWorkflows[scheduledWorkflow.Name] = scheduledWorkflow + return scheduledWorkflow, nil } func (c *FakeScheduledWorkflowClient) DeleteCollection(ctx context.Context, options *v1.DeleteOptions, listOptions v1.ListOptions) error { diff --git a/backend/src/apiserver/list/list.go b/backend/src/apiserver/list/list.go index e38be8f7339..174eff961d8 100644 --- a/backend/src/apiserver/list/list.go +++ b/backend/src/apiserver/list/list.go @@ -22,6 +22,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "math" "reflect" "strings" @@ -97,6 +98,13 @@ type Options struct { *token } +func EmptyOptions() *Options { + return &Options{ + math.MaxInt32, + &token{}, + } +} + // Matches returns trues if the sorting and filtering criteria in o matches that // of the one supplied in opts. func (o *Options) Matches(opts *Options) bool { @@ -213,9 +221,14 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild if o.IsDesc { order = "DESC" } - sqlBuilder = sqlBuilder. - OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)). - OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order)) + + if o.SortByFieldName != "" { + sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)) + } + + if o.KeyFieldName != "" { + sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order)) + } return sqlBuilder } diff --git a/backend/src/apiserver/list/list_test.go b/backend/src/apiserver/list/list_test.go index 1806e158eec..e207cd900ab 100644 --- a/backend/src/apiserver/list/list_test.go +++ b/backend/src/apiserver/list/list_test.go @@ -15,6 +15,8 @@ package list import ( + "fmt" + "math" "reflect" "strings" "testing" @@ -645,6 +647,11 @@ func TestAddPaginationAndFilterToSelect(t *testing.T) { wantSQL: "SELECT * FROM MyTable ORDER BY SortField DESC, KeyField DESC LIMIT 124", wantArgs: nil, }, + { + in: EmptyOptions(), + wantSQL: fmt.Sprintf("SELECT * FROM MyTable LIMIT %d", math.MaxInt32+1), + wantArgs: nil, + }, { in: &Options{ PageSize: 123, diff --git a/backend/src/apiserver/main.go b/backend/src/apiserver/main.go index e226123510b..02bcd889b73 100644 --- a/backend/src/apiserver/main.go +++ b/backend/src/apiserver/main.go @@ -19,14 +19,14 @@ import ( "crypto/tls" "flag" "fmt" + "google.golang.org/grpc/credentials" "io" "math" "net" "net/http" "strconv" "strings" - - "google.golang.org/grpc/credentials" + "sync" "github.com/fsnotify/fsnotify" "github.com/golang/glog" @@ -132,10 +132,25 @@ func main() { } log.SetLevel(level) + backgroundCtx, backgroundCancel := context.WithCancel(context.Background()) + defer backgroundCancel() + wg := sync.WaitGroup{} + wg.Add(1) + go reconcileSwfCrs(resourceManager, backgroundCtx, &wg) go startRpcServer(resourceManager, tlsConfig) + // This is blocking startHttpProxy(resourceManager, tlsConfig) - + backgroundCancel() clientManager.Close() + wg.Wait() +} + +func reconcileSwfCrs(resourceManager *resource.ResourceManager, ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + err := resourceManager.ReconcileSwfCrs(ctx) + if err != nil { + log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err) + } } // A custom http request header matcher to pass on the user identity diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index b5954690a1e..ac509c52c22 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -18,8 +18,10 @@ import ( "context" "encoding/json" "fmt" + scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" "io" "net" + "reflect" "strconv" "time" @@ -582,6 +584,77 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model return newRun, nil } +// ReconcileSwfCrs reconciles the ScheduledWorkflow CRs based on existing jobs. +func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error { + filterContext := &model.FilterContext{ + ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: common.GetPodNamespace()}, + } + + opts := list.EmptyOptions() + + jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts) + + if err != nil { + return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources") + } + + for i := range jobs { + select { + case <-ctx.Done(): + return nil + default: + } + + tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec) + if err != nil { + return failedToReconcileSwfCrsError(err) + } + + newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i], r.getOwnerReferences()) + if err != nil { + return failedToReconcileSwfCrsError(err) + } + + for { + currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{}) + if err != nil { + if util.IsNotFound(err) { + break + } + return failedToReconcileSwfCrsError(err) + } + + if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) { + currentScheduledWorkflow.Spec = newScheduledWorkflow.Spec + err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, currentScheduledWorkflow) + if err != nil { + if apierrors.IsConflict(errors.Unwrap(err)) { + continue + } else if util.IsNotFound(errors.Cause(err)) { + break + } + return failedToReconcileSwfCrsError(err) + } + } + break + } + } + + return nil +} + +func failedToReconcileSwfCrsError(err error) error { + return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources") +} + +func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error { + _, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow) + if err != nil { + return util.Wrap(err, "Failed to update ScheduledWorkflow") + } + return nil +} + // Fetches a run with a given id. func (r *ResourceManager) GetRun(runId string) (*model.Run, error) { run, err := r.runStore.GetRun(runId) diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 5049a6b2bae..7a69344ddd2 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -3200,6 +3200,35 @@ func TestReportScheduledWorkflowResource_Success_withRuntimeParamsV2(t *testing. assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1()) } +func TestReconcileSwfCrs(t *testing.T) { + store, manager, job := initWithJobV2(t) + defer store.Close() + + fetchedJob, err := manager.GetJob(job.UUID) + require.Nil(t, err) + require.NotNil(t, fetchedJob) + + swfClient := store.SwfClient().ScheduledWorkflow("ns1") + + options := v1.GetOptions{} + ctx := context.Background() + + swf, err := swfClient.Get(ctx, "job-", options) + require.Nil(t, err) + + // emulates an invalid/outdated spec + swf.Spec.Workflow.Spec = nil + swf, err = swfClient.Update(ctx, swf) + require.Nil(t, swf.Spec.Workflow.Spec) + + err = manager.ReconcileSwfCrs(ctx) + require.Nil(t, err) + + swf, err = swfClient.Get(ctx, "job-", options) + require.Nil(t, err) + require.NotNil(t, swf.Spec.Workflow.Spec) +} + func TestReportScheduledWorkflowResource_Error(t *testing.T) { store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) defer store.Close() From 91de12d3f7574649ae47274bacf8cb7a1de12024 Mon Sep 17 00:00:00 2001 From: Helber Belmiro Date: Wed, 5 Feb 2025 12:28:37 -0300 Subject: [PATCH 5/5] UPSTREAM: 11578: fix(backend): Replaced hardcoded ServiceAccount with default config Signed-off-by: Helber Belmiro --- .../resource/resource_manager_test.go | 34 +++++++++++++++++++ backend/src/v2/compiler/argocompiler/argo.go | 5 +-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 7a69344ddd2..08cc852b1ef 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -2401,6 +2401,40 @@ func TestCreateJob_ThroughWorkflowSpecV2(t *testing.T) { assert.Equal(t, expectedJob.ToV1(), fetchedJob.ToV1(), "CreateJob stored invalid data in database") } +func TestCreateJobDifferentDefaultServiceAccountName_ThroughWorkflowSpecV2(t *testing.T) { + originalDefaultServiceAccount := viper.Get(common.DefaultPipelineRunnerServiceAccountFlag) + + viper.Set(common.DefaultPipelineRunnerServiceAccountFlag, "my-service-account") + defer viper.Set(common.DefaultPipelineRunnerServiceAccountFlag, originalDefaultServiceAccount) + + store, manager, job := initWithJobV2(t) + defer store.Close() + expectedJob := &model.Job{ + UUID: "123e4567-e89b-12d3-a456-426655440000", + DisplayName: "j1", + K8SName: "job-", + Namespace: "ns1", + ServiceAccount: "my-service-account", + Enabled: true, + ExperimentId: DefaultFakeUUID, + CreatedAtInSec: 2, + UpdatedAtInSec: 2, + Conditions: "STATUS_UNSPECIFIED", + PipelineSpec: model.PipelineSpec{ + PipelineSpecManifest: v2SpecHelloWorld, + RuntimeConfig: model.RuntimeConfig{ + Parameters: "{\"text\":\"world\"}", + PipelineRoot: "job-1-root", + }, + }, + } + expectedJob.PipelineSpec.PipelineName = job.PipelineSpec.PipelineName + require.Equal(t, expectedJob.ToV1(), job.ToV1()) + fetchedJob, err := manager.GetJob(job.UUID) + require.Nil(t, err) + require.Equal(t, expectedJob.ToV1(), fetchedJob.ToV1(), "CreateJob stored invalid data in database") +} + func TestCreateJob_ThroughPipelineID(t *testing.T) { store, manager, pipeline, _ := initWithPipeline(t) defer store.Close() diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index c8dca58bef1..0c5260aa659 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -16,6 +16,7 @@ package argocompiler import ( "fmt" + "github.com/kubeflow/pipelines/backend/src/apiserver/common" "strings" wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -63,7 +64,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S if err != nil { return nil, err } - // fill root component default paramters to PipelineJob + // fill root component default parameters to PipelineJob specParams := spec.GetRoot().GetInputDefinitions().GetParameters() for name, param := range specParams { _, ok := job.RuntimeConfig.ParameterValues[name] @@ -108,7 +109,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S "pipelines.kubeflow.org/v2_component": "true", }, }, - ServiceAccountName: "pipeline-runner", + ServiceAccountName: common.GetStringConfigWithDefault(common.DefaultPipelineRunnerServiceAccountFlag, common.DefaultPipelineRunnerServiceAccount), Entrypoint: tmplEntrypoint, }, }