Skip to content

Commit

Permalink
remove tekton backend
Browse files Browse the repository at this point in the history
Signed-off-by: Your Name <[email protected]>
  • Loading branch information
akagami-harsh committed Jan 30, 2025
1 parent 3e423d8 commit bd4c6d2
Show file tree
Hide file tree
Showing 28 changed files with 10 additions and 9,217 deletions.
17 changes: 2 additions & 15 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"github.com/kubeflow/pipelines/backend/src/v2/compiler/argocompiler"
"github.com/kubeflow/pipelines/backend/src/v2/compiler/tektoncompiler"
"google.golang.org/protobuf/encoding/protojson"
goyaml "gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -77,12 +76,7 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
}
}

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher})
}
obj, err := argocompiler.Compile(job, kubernetesSpec, nil)
if err != nil {
return nil, util.Wrap(err, "Failed to compile job")
}
Expand Down Expand Up @@ -129,12 +123,7 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
Parameters: parameters,
Spec: executionSpec.ToStringForSchedule(),
},
NoCatchup: util.BoolPointer(modelJob.NoCatchup),
ExperimentId: modelJob.ExperimentId,
PipelineId: modelJob.PipelineId,
PipelineName: modelJob.PipelineName,
PipelineVersionId: modelJob.PipelineVersionId,
ServiceAccount: executionSpec.ServiceAccount(),
NoCatchup: util.BoolPointer(modelJob.NoCatchup),
},
}
return scheduledWorkflow, nil
Expand Down Expand Up @@ -305,8 +294,6 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil)
}
if err != nil {
return nil, util.Wrap(err, "Failed to compile job")
Expand Down
53 changes: 0 additions & 53 deletions backend/src/common/util/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/pkg/errors"
prclientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
prinformer "github.com/tektoncd/pipeline/pkg/client/informers/externalversions"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,7 +34,6 @@ type ExecutionSpecList []ExecutionSpec
// ExecutionClient is used to get a ExecutionInterface in specific namespace scope
type ExecutionClient interface {
Execution(namespace string) ExecutionInterface
Compare(old, new interface{}) bool
}

// Mini version of ExecutionSpec informer
Expand Down Expand Up @@ -103,27 +100,6 @@ func NewExecutionClientOrFatal(execType ExecutionType, initConnectionTimeout tim
glog.Fatalf("Failed to create ExecutionClient for Argo. Error: %v", err)
}
return &WorkflowClient{client: argoProjClient}
case TektonPipelineRun:
var prClient *prclientset.Clientset
operation := func() error {
restConfig, err := GetKubernetesConfig()
if err != nil {
return errors.Wrap(err, "Failed to initialize the RestConfig")
}
restConfig.QPS = float32(clientParams.QPS)
restConfig.Burst = clientParams.Burst
prClient = prclientset.NewForConfigOrDie(restConfig)
return nil
}

b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = initConnectionTimeout
err := backoff.Retry(operation, b)

if err != nil {
glog.Fatalf("Failed to create ExecutionClient for Argo. Error: %v", err)
}
return &PipelineRunClient{client: prClient}
default:
glog.Fatalf("Not supported type of Execution")
}
Expand Down Expand Up @@ -163,35 +139,6 @@ func NewExecutionInformerOrFatal(execType ExecutionType, namespace string,
return &WorkflowInformer{
informer: argoInformer.Argoproj().V1alpha1().Workflows(), factory: argoInformer,
}
case TektonPipelineRun:
var prInformer prinformer.SharedInformerFactory
var prClient *prclientset.Clientset
operation := func() error {
restConfig, err := GetKubernetesConfig()
if err != nil {
return errors.Wrap(err, "Failed to initialize the RestConfig")
}
restConfig.QPS = float32(clientParams.QPS)
restConfig.Burst = clientParams.Burst
prClient = prclientset.NewForConfigOrDie(restConfig)
if namespace == "" {
prInformer = prinformer.NewSharedInformerFactory(prClient, time.Second*30)
} else {
prInformer = prinformer.NewFilteredSharedInformerFactory(
prClient, time.Second*30, namespace, nil)
}
return nil
}

b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = initConnectionTimeout
err := backoff.Retry(operation, b)

if err != nil {
glog.Fatalf("Failed to create ExecutionInformer for Argo. Error: %v", err)
}
return &PipelineRunInformer{
informer: prInformer.Tekton().V1().PipelineRuns(), factory: prInformer, clientset: prClient}
default:
glog.Fatalf("Not supported type of Execution")
}
Expand Down
46 changes: 3 additions & 43 deletions backend/src/common/util/execution_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ import (

workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
pipelineapi "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
)

type ExecutionType string

const (
ArgoWorkflow ExecutionType = "Workflow"
TektonPipelineRun ExecutionType = "PipelineRun"
Unknown ExecutionType = "Unknown"
ArgoWorkflow ExecutionType = "Workflow"
Unknown ExecutionType = "Unknown"
)

var (
Expand Down Expand Up @@ -61,7 +59,7 @@ func SetExecutionType(newType ExecutionType) {
}

// Abastract interface to encapsulate the resource needed by the underlying execution runtime
// i.e Workflow is for Argo, PipelineRun is for Tekton and etc.
// i.e Workflow is for Argo.
// Status related information will go to ExecutionStatus interface.
// TODO: add more methods to make ExecutionSpec fullly represent Workflow. At the beginning
//
Expand Down Expand Up @@ -188,8 +186,6 @@ func NewExecutionSpec(bytes []byte) (ExecutionSpec, error) {
switch meta.Kind {
case string(ArgoWorkflow):
return NewWorkflowFromBytes(bytes)
case string(TektonPipelineRun):
return NewPipelineRunFromBytes(bytes)
default:
return nil, NewInvalidInputError("Unknown execution spec")
}
Expand All @@ -205,8 +201,6 @@ func NewExecutionSpecJSON(execType ExecutionType, bytes []byte) (ExecutionSpec,
switch execType {
case ArgoWorkflow:
return NewWorkflowFromBytesJSON(bytes)
case TektonPipelineRun:
return NewPipelineRunFromBytesJSON(bytes)
default:
return nil, NewInvalidInputError("Unknown execution spec")
}
Expand All @@ -220,8 +214,6 @@ func NewExecutionSpecFromInterface(execType ExecutionType, obj interface{}) (Exe
switch execType {
case ArgoWorkflow:
return NewWorkflowFromInterface(obj)
case TektonPipelineRun:
return NewPipelineRunFromInterface(obj)
default:
return nil, NewInternalServerError(
errors.New("ExecutionType is not supported"), "type:%s", execType)
Expand All @@ -234,8 +226,6 @@ func UnmarshalParameters(execType ExecutionType, paramsString string) (SpecParam
switch execType {
case ArgoWorkflow:
return UnmarshParametersWorkflow(paramsString)
case TektonPipelineRun:
return UnmarshParametersPipelineRun(paramsString)
default:
return nil, NewInternalServerError(
errors.New("ExecutionType is not supported"), "type:%s", execType)
Expand All @@ -248,8 +238,6 @@ func MarshalParameters(execType ExecutionType, params SpecParameters) (string, e
switch execType {
case ArgoWorkflow:
return MarshalParametersWorkflow(params)
case TektonPipelineRun:
return MarshalParametersPipelineRun(params)
default:
return "", NewInternalServerError(
errors.New("ExecutionType is not supported"), "type:%s", execType)
Expand Down Expand Up @@ -284,28 +272,6 @@ func ScheduleSpecToExecutionSpec(
workflow.APIVersion = "argoproj.io/v1alpha1"
workflow.Kind = "Workflow"
return NewWorkflow(workflow), nil
case TektonPipelineRun:
if executionSpecStr, ok := wfr.Spec.(string); ok {
return NewPipelineRunFromScheduleWorkflowSpecBytesJSON([]byte(executionSpecStr))
}
// fall back to Tekton PipelineRunSpec, need to marshal back to json string then unmarshal to
// Tekton PipelineRunSpec because wfr.Spec is a map at this moment
raw, err := json.Marshal(wfr.Spec)
if err != nil {
return nil, NewInternalServerError(
errors.New("can't marshal WorkflowResource.Spec"), "err:%v", err)
}
var spec pipelineapi.PipelineRunSpec
if err := json.Unmarshal(raw, &spec); err != nil {
return nil, NewInternalServerError(
errors.New("can't unmarshal WorkflowResource.Spec"), "err:%v", err)
}
pr := &pipelineapi.PipelineRun{
Spec: spec,
}
pr.APIVersion = "tekton.dev/v1"
pr.Kind = "PipelineRun"
return NewPipelineRun(pr), nil
default:
return nil, NewInternalServerError(
errors.New("ExecutionType is not supported"), "type:%s", execType)
Expand All @@ -320,12 +286,6 @@ func GetTerminatePatch(execType ExecutionType) interface{} {
"activeDeadlineSeconds": 0,
},
}
case TektonPipelineRun:
return map[string]interface{}{
"spec": map[string]interface{}{
"status": "Cancelled",
},
}
default:
return nil
}
Expand Down
6 changes: 0 additions & 6 deletions backend/src/common/util/execution_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,6 @@ func TestExecutionSpec_NewExecutionSpecFromInterface(t *testing.T) {
assert.Empty(t, err)
assert.NotEmpty(t, execSpec)

// unknown type
// TODO: fix this when PipelineRun get implemented
execSpec, err = NewExecutionSpecFromInterface(TektonPipelineRun, test)
assert.Empty(t, execSpec)
assert.Error(t, err)
assert.EqualError(t, err, "Invalid input error: not PipelineRun struct")
}

func TestExecutionSpec_UnmarshalParameters(t *testing.T) {
Expand Down
Loading

0 comments on commit bd4c6d2

Please sign in to comment.