Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Pipeline Schedules failing after upgrading #132

Merged
merged 5 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backend/src/apiserver/client/scheduled_workflow_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func (c *FakeScheduledWorkflowClient) Get(ctx context.Context, name string, opti
return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("scheduledworkflows.kubeflow.org"), name)
}

func (c *FakeScheduledWorkflowClient) Update(context.Context, *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
glog.Error("This fake method is not yet implemented.")
return nil, nil
func (c *FakeScheduledWorkflowClient) Update(_ context.Context, scheduledWorkflow *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
c.scheduledWorkflows[scheduledWorkflow.Name] = scheduledWorkflow
return scheduledWorkflow, nil
}

func (c *FakeScheduledWorkflowClient) DeleteCollection(ctx context.Context, options *v1.DeleteOptions, listOptions v1.ListOptions) error {
Expand Down
19 changes: 16 additions & 3 deletions backend/src/apiserver/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"math"
"reflect"
"strings"

Expand Down Expand Up @@ -97,6 +98,13 @@ type Options struct {
*token
}

func EmptyOptions() *Options {
return &Options{
math.MaxInt32,
&token{},
}
}

// Matches returns trues if the sorting and filtering criteria in o matches that
// of the one supplied in opts.
func (o *Options) Matches(opts *Options) bool {
Expand Down Expand Up @@ -213,9 +221,14 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild
if o.IsDesc {
order = "DESC"
}
sqlBuilder = sqlBuilder.
OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)).
OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))

if o.SortByFieldName != "" {
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order))
}

if o.KeyFieldName != "" {
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))
}

return sqlBuilder
}
Expand Down
7 changes: 7 additions & 0 deletions backend/src/apiserver/list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package list

import (
"fmt"
"math"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -645,6 +647,11 @@ func TestAddPaginationAndFilterToSelect(t *testing.T) {
wantSQL: "SELECT * FROM MyTable ORDER BY SortField DESC, KeyField DESC LIMIT 124",
wantArgs: nil,
},
{
in: EmptyOptions(),
wantSQL: fmt.Sprintf("SELECT * FROM MyTable LIMIT %d", math.MaxInt32+1),
wantArgs: nil,
},
{
in: &Options{
PageSize: 123,
Expand Down
21 changes: 18 additions & 3 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"crypto/tls"
"flag"
"fmt"
"google.golang.org/grpc/credentials"
"io"
"math"
"net"
"net/http"
"strconv"
"strings"

"google.golang.org/grpc/credentials"
"sync"

"github.com/fsnotify/fsnotify"
"github.com/golang/glog"
Expand Down Expand Up @@ -132,10 +132,25 @@ func main() {
}
log.SetLevel(level)

backgroundCtx, backgroundCancel := context.WithCancel(context.Background())
defer backgroundCancel()
wg := sync.WaitGroup{}
wg.Add(1)
go reconcileSwfCrs(resourceManager, backgroundCtx, &wg)
go startRpcServer(resourceManager, tlsConfig)
// This is blocking
startHttpProxy(resourceManager, tlsConfig)

backgroundCancel()
clientManager.Close()
wg.Wait()
}

func reconcileSwfCrs(resourceManager *resource.ResourceManager, ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
err := resourceManager.ReconcileSwfCrs(ctx)
if err != nil {
log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err)
}
}

// A custom http request header matcher to pass on the user identity
Expand Down
34 changes: 34 additions & 0 deletions backend/src/apiserver/model/cron_schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model

type CronSchedule struct {
// Time at which scheduling starts.
// If no start time is specified, the StartTime is the creation time of the schedule.
CronScheduleStartTimeInSec *int64 `gorm:"column:CronScheduleStartTimeInSec;"`

// Time at which scheduling ends.
// If no end time is specified, the EndTime is the end of time.
CronScheduleEndTimeInSec *int64 `gorm:"column:CronScheduleEndTimeInSec;"`

// Cron string describing when a workflow should be created within the
// time interval defined by StartTime and EndTime.
Cron *string `gorm:"column:Schedule;"`
}

func (c CronSchedule) IsEmpty() bool {
return (c.CronScheduleStartTimeInSec == nil || *c.CronScheduleStartTimeInSec == 0) &&
(c.CronScheduleEndTimeInSec == nil || *c.CronScheduleEndTimeInSec == 0) &&
(c.Cron == nil || *c.Cron == "")
}
28 changes: 0 additions & 28 deletions backend/src/apiserver/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,34 +189,6 @@ type Trigger struct {
PeriodicSchedule
}

type CronSchedule struct {
// Time at which scheduling starts.
// If no start time is specified, the StartTime is the creation time of the schedule.
CronScheduleStartTimeInSec *int64 `gorm:"column:CronScheduleStartTimeInSec;"`

// Time at which scheduling ends.
// If no end time is specified, the EndTime is the end of time.
CronScheduleEndTimeInSec *int64 `gorm:"column:CronScheduleEndTimeInSec;"`

// Cron string describing when a workflow should be created within the
// time interval defined by StartTime and EndTime.
Cron *string `gorm:"column:Schedule;"`
}

type PeriodicSchedule struct {
// Time at which scheduling starts.
// If no start time is specified, the StartTime is the creation time of the schedule.
PeriodicScheduleStartTimeInSec *int64 `gorm:"column:PeriodicScheduleStartTimeInSec;"`

// Time at which scheduling ends.
// If no end time is specified, the EndTime is the end of time.
PeriodicScheduleEndTimeInSec *int64 `gorm:"column:PeriodicScheduleEndTimeInSec;"`

// Interval describing when a workflow should be created within the
// time interval defined by StartTime and EndTime.
IntervalSecond *int64 `gorm:"column:IntervalSecond;"`
}

func (j Job) GetValueOfPrimaryKey() string {
return fmt.Sprint(j.UUID)
}
Expand Down
34 changes: 34 additions & 0 deletions backend/src/apiserver/model/periodic_schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model

type PeriodicSchedule struct {
// Time at which scheduling starts.
// If no start time is specified, the StartTime is the creation time of the schedule.
PeriodicScheduleStartTimeInSec *int64 `gorm:"column:PeriodicScheduleStartTimeInSec;"`

// Time at which scheduling ends.
// If no end time is specified, the EndTime is the end of time.
PeriodicScheduleEndTimeInSec *int64 `gorm:"column:PeriodicScheduleEndTimeInSec;"`

// Interval describing when a workflow should be created within the
// time interval defined by StartTime and EndTime.
IntervalSecond *int64 `gorm:"column:IntervalSecond;"`
}

func (p PeriodicSchedule) IsEmpty() bool {
return (p.PeriodicScheduleStartTimeInSec == nil || *p.PeriodicScheduleStartTimeInSec == 0) &&
(p.PeriodicScheduleEndTimeInSec == nil || *p.PeriodicScheduleEndTimeInSec == 0) &&
(p.IntervalSecond == nil || *p.IntervalSecond == 0)
}
108 changes: 92 additions & 16 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"context"
"encoding/json"
"fmt"
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"io"
"net"
"reflect"
"strconv"
"time"

Expand Down Expand Up @@ -582,6 +584,77 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
return newRun, nil
}

// ReconcileSwfCrs reconciles the ScheduledWorkflow CRs based on existing jobs.
func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
filterContext := &model.FilterContext{
ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: common.GetPodNamespace()},
}

opts := list.EmptyOptions()

jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts)

if err != nil {
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
}

for i := range jobs {
select {
case <-ctx.Done():
return nil
default:
}

tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
if err != nil {
return failedToReconcileSwfCrsError(err)
}

newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i], r.getOwnerReferences())
if err != nil {
return failedToReconcileSwfCrsError(err)
}

for {
currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{})
if err != nil {
if util.IsNotFound(err) {
break
}
return failedToReconcileSwfCrsError(err)
}

if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) {
currentScheduledWorkflow.Spec = newScheduledWorkflow.Spec
err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, currentScheduledWorkflow)
if err != nil {
if apierrors.IsConflict(errors.Unwrap(err)) {
continue
} else if util.IsNotFound(errors.Cause(err)) {
break
}
return failedToReconcileSwfCrsError(err)
}
}
break
}
}

return nil
}

func failedToReconcileSwfCrsError(err error) error {
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
}

func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error {
_, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow)
if err != nil {
return util.Wrap(err, "Failed to update ScheduledWorkflow")
}
return nil
}

// Fetches a run with a given id.
func (r *ResourceManager) GetRun(runId string) (*model.Run, error) {
run, err := r.runStore.GetRun(runId)
Expand Down Expand Up @@ -990,12 +1063,6 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest")
}

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job, r.getOwnerReferences())
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
}
// Create a new ScheduledWorkflow at the ScheduledWorkflow client.
k8sNamespace := job.Namespace
if k8sNamespace == "" {
Expand All @@ -1004,6 +1071,15 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
if k8sNamespace == "" {
return nil, util.NewInternalServerError(util.NewInvalidInputError("Namespace cannot be empty when creating an Argo scheduled workflow. Check if you have specified POD_NAMESPACE or try adding the parent namespace to the request"), "Failed to create a recurring run due to empty namespace")
}

job.Namespace = k8sNamespace

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job, r.getOwnerReferences())
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
}
newScheduledWorkflow, err := r.getScheduledWorkflowClient(k8sNamespace).Create(ctx, scheduledWorkflow)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
Expand All @@ -1015,23 +1091,23 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
swf := util.NewScheduledWorkflow(newScheduledWorkflow)
job.UUID = string(swf.UID)
job.K8SName = swf.Name
job.Namespace = swf.Namespace
job.Conditions = model.StatusState(swf.ConditionSummary()).ToString()
for _, modelRef := range job.ResourceReferences {
modelRef.ResourceUUID = string(swf.UID)
}
// Get the service account
serviceAccount := ""
if swf.Spec.Workflow != nil {
execSpec, err := util.ScheduleSpecToExecutionSpec(util.ArgoWorkflow, swf.Spec.Workflow)
if err == nil {
serviceAccount = execSpec.ServiceAccount()
}
}
job.ServiceAccount = serviceAccount
if tmpl.GetTemplateType() == template.V1 {
// Get the service account
serviceAccount := ""
if swf.Spec.Workflow != nil {
execSpec, err := util.ScheduleSpecToExecutionSpec(util.ArgoWorkflow, swf.Spec.Workflow)
if err == nil {
serviceAccount = execSpec.ServiceAccount()
}
}
job.ServiceAccount = serviceAccount
job.PipelineSpec.WorkflowSpecManifest = manifest
} else {
job.ServiceAccount = newScheduledWorkflow.Spec.ServiceAccount
job.PipelineSpec.PipelineSpecManifest = manifest
}
return r.jobStore.CreateJob(job)
Expand Down
Loading
Loading