Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: feat(api/sdk/backend): implement PipelineConfig and setting ttl on pipelines. Fixes: #10899 #10942

Closed

Conversation

gregsheremeta
Copy link
Contributor

@gregsheremeta gregsheremeta commented Jun 21, 2024

Description of your changes:
Fixes: #10899

Note: This is functioning end to end. This is currently WIP only because I'd like to get an ack from a maintainer on the approach, at which point I'll split the proto changes into a separate PR, and then rebase this on top of that once it's merged. I'll then also add tests.

In KFP v1, there was a function kfp.dsl.PipelineConf().set_ttl_seconds_after_finished(seconds) that garbage collected completed pods after a specified time. However, in KFP v2, this function has been deprecated. The entire PipelineConf is deprecated, actually, and I don't see a replacement. This PR adds a replacement.

I add a section to the protobuf to store pipeline-level config options. The right place appears to be PipelineDeploymentConfig. That actually gets cast into the pipeline_spec struct later, which is a bit confusing (and also causes ints to get kind-of-cast to floats because of https://stackoverflow.com/questions/51818125/how-to-use-ints-in-a-protobuf-struct), but ultimately it works.

I add a new PipelineConfig class in the DSL. This class will store pipeline-level config options just like the old PipelineConf did. set_completed_pipeline_ttl_seconds is the first config (and the only one in there for now).

I modify @dsl.pipeline to take a PipelineConfig instance as a parameter. The compiler is enhanced to read those config options and apply them to the pipeline proto that the compiler generates.

I enhance the backend Argo Workflows compiler to look for known config options in the PipelineDeploymentConfig section of the proto. completedPipelineTtl is the only one for now. If found, it's added to the Workflow CR in the correct place (Workflow.Spec.TTLStrategy).

Upon using this enhancement, I can see that a Workflow is created with the proper TTL set. And as expected, after set_completed_pipeline_ttl_seconds seconds, the Workflow and all Pods are deleted by Argo Workflow's garbage collection mechanism.

test pipeline:

from kfp import dsl
from kfp.dsl import PipelineConfig


@dsl.component(base_image="docker.io/python:3.9.17")
def empty_component():
    pass

config = PipelineConfig()
config.set_completed_pipeline_ttl_seconds(120)

@dsl.pipeline(name='test-ttl-pipeline', pipeline_config=config)
def test_ttl_pipeline():
    task = empty_component()

resulting pipeline spec:

# PIPELINE DEFINITION
# Name: test-ttl-pipeline
components:
  comp-empty-component:
    executorLabel: exec-empty-component
deploymentSpec:
  completedPipelineTtl: 120.0
  executors:
    exec-empty-component:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - empty_component
<SNIP>

resulting Workflow:
Screenshot from 2024-07-01 14-41-23

workflow controller logs:

time="2024-07-01T18:41:44.216Z" level=info msg="Queueing Succeeded workflow dspa3/test-ttl-pipeline-cjfbk for delete in 1m59s due to TTL"
...
time="2024-07-01T18:43:44.000Z" level=info msg="Deleting garbage collected workflow 'dspa3/test-ttl-pipeline-cjfbk'"
time="2024-07-01T18:43:44.012Z" level=info msg="Delete workflows 200"
time="2024-07-01T18:43:44.012Z" level=info msg="Successfully deleted 'dspa3/test-ttl-pipeline-cjfbk'"

@chensun can you please review or recommend an alternate reviewer?

below is old -- keeping it for posterity
I am guessing that the platform config is the right place to put it (need guidance, please).

In this WIP, when I add the following to the end of a pipeline, the Workflow (and its pods) are deleted after 2 minutes:

---
platforms:
  kubernetes:
    deploymentSpec:
      completedPipelineTtl: 120

(completedPipelineTtl is the new field I added in the proto. Previously there was only a map of configs for the tasks, so it used to look like this:

---
platforms:
  kubernetes:
    deploymentSpec:
      executors:
        exec-comp-1:
            <snip>
        exec-comp-2:
            <snip>
        ...

)

Now the hard part: I'm not sure how to proceed with changing the SDK/DSL to get it to output that pipeline spec.

It seems like using kfp-kubernetes is the correct way to go because this is a platform config option we want to set. However, there are no examples of doing pipeline-scope config -- all of the current kfp-kubernetes functions work on tasks, not pipelines. I poked at using something like this

def get_existing_kubernetes_config_as_message(
task: 'PipelineTask') -> pb.KubernetesExecutorConfig:
cur_k8_config_dict = task.platform_config.get('kubernetes', {})
k8_config_msg = pb.KubernetesExecutorConfig()
return json_format.ParseDict(cur_k8_config_dict, k8_config_msg)

but for Pipelines, with the hopes of it looking like this in the DSL:

@dsl.pipeline
def my_ttl_pipelines():
    task = comp()
    kubernetes.set_completed_pipeline_ttl(
        pipeline, completed_pipeline_ttl=120)   # delete pods after 2 minutes

But, the Pipeline class is pretty barren and I don't see a way to get at platform_config from it.

I think I'm blocked until I get some pointers from folks much more familiar with the DSL and kfp-kubernetes.

@connor-mccarthy @chensun please help or suggest a DSL expert who can.

Copy link

Hi @gregsheremeta. Thanks for your PR.

I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@gregsheremeta
Copy link
Contributor Author

Hi, I need some pointers from folks much more familiar with the DSL and kfp-kubernetes and protobuf. See my initial PR message.

@connor-mccarthy @chensun @james-jwu @zijianjoy please help or suggest a DSL / protobuf / kfp-kubernetes expert who can assist. Thanks.

@HumairAK
Copy link
Collaborator

HumairAK commented Jun 26, 2024

This should not go in the k8s platform spec, by the name it may give that impression, but when we look at the proto file for it here we can see that the primary use case for this is to provide sdk api for manipulating k8s native objects (e.g. pod spec, pvc, secret, configmaps, etc).

The ttl should be a pipeline abstraction, the k8s specific implementation should be handled compiler side which you have already done. I think a better location would be to add this to the deployment_spec which seems like it's intended for pipeline level configurations

@gregsheremeta
Copy link
Contributor Author

Hm, ok, that line of thinking makes sense. Let me try that. Thanks!

@gregsheremeta gregsheremeta force-pushed the argo-workflows-ttl-backend branch from b4293da to 1090395 Compare July 1, 2024 18:21
Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign chensun for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow google-oss-prow bot added size/XL and removed size/L labels Jul 1, 2024
@gregsheremeta gregsheremeta changed the title WIP: feat(backend): implement setting ttl on pipelines. Fixes: #10899 WIP: feat(api/sdk/backend): implement PipelineConfig and setting ttl on pipelines. Fixes: #10899 Jul 1, 2024
@@ -1874,6 +1877,10 @@ def create_pipeline_spec(
# Schema version 2.1.0 is required for kfp-pipeline-spec>0.1.13
pipeline_spec.schema_version = '2.1.0'

# pipeline-level config options
if pipeline_config.completed_pipeline_ttl_seconds is not None and pipeline_config.completed_pipeline_ttl_seconds > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we raise an error if <= 0?
How will the user know that the argument was ignored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I think the behavior should be:
-1 = 0 = infinite timeout = same as not specifying anything (per https://cloud.google.com/apis/design/design_patterns#integer_types)
< -1 = should return an invalid argument error

Copy link
Contributor

@DharmitD DharmitD Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some error handling message would be nice to have here. For instance,

if ttl_seconds < =0:
            raise ValueError("Invalid TTL value: {}. The value must be 1 or greater.".format(ttl_seconds))

@HumairAK
Copy link
Collaborator

HumairAK commented Jul 2, 2024

Note: This is functioning end to end. This is currently WIP only because I'd like to get an ack from a maintainer on the approach, at which point I'll split the proto changes into a separate PR, and then rebase this on top of that once it's merged. I'll then also add tests.

@chensun / @connor-mccarthy could we please get an ack on this approach, nameply the proto addition here so we can proceed with the above?

@hbelmiro
Copy link
Contributor

hbelmiro commented Jul 3, 2024

/ok-to-test

Copy link
Contributor

@hbelmiro hbelmiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
Just left two nitpick comments. Totally optional.

sdk/python/kfp/dsl/pipeline_config.py Outdated Show resolved Hide resolved
@@ -27,7 +28,8 @@ def pipeline(func: Optional[Callable] = None,
name: Optional[str] = None,
description: Optional[str] = None,
pipeline_root: Optional[str] = None,
display_name: Optional[str] = None) -> Callable:
display_name: Optional[str] = None,
pipeline_config: pipeline_config.PipelineConfig = None) -> Callable:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional nitpick: Add the new pipeline_config parameter to the docstring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Signed-off-by: Greg Sheremeta <[email protected]>
Copy link

New changes are detected. LGTM label has been removed.

@google-oss-prow google-oss-prow bot removed the lgtm label Jul 4, 2024
Copy link

@gregsheremeta: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
kubeflow-pipeline-upgrade-test afa1286 link false /test kubeflow-pipeline-upgrade-test
kfp-kubernetes-test-python310 afa1286 link true /test kfp-kubernetes-test-python310
kfp-kubernetes-execution-tests afa1286 link false /test kfp-kubernetes-execution-tests
kubeflow-pipelines-sdk-docformatter afa1286 link true /test kubeflow-pipelines-sdk-docformatter
kubeflow-pipelines-sdk-isort afa1286 link true /test kubeflow-pipelines-sdk-isort
kubeflow-pipelines-sdk-yapf afa1286 link true /test kubeflow-pipelines-sdk-yapf
kubeflow-pipelines-samples-v2 afa1286 link false /test kubeflow-pipelines-samples-v2
kubeflow-pipelines-sdk-execution-tests afa1286 link true /test kubeflow-pipelines-sdk-execution-tests
kfp-kubernetes-test-python311 afa1286 link true /test kfp-kubernetes-test-python311
kfp-kubernetes-test-python38 afa1286 link true /test kfp-kubernetes-test-python38
kfp-kubernetes-test-python39 afa1286 link true /test kfp-kubernetes-test-python39
kfp-kubernetes-test-python312 afa1286 link true /test kfp-kubernetes-test-python312
kubeflow-pipelines-sdk-python310 afa1286 link true /test kubeflow-pipelines-sdk-python310
kubeflow-pipelines-sdk-python38 afa1286 link true /test kubeflow-pipelines-sdk-python38
kubeflow-pipelines-sdk-python39 afa1286 link true /test kubeflow-pipelines-sdk-python39
kubeflow-pipelines-sdk-python311 afa1286 link true /test kubeflow-pipelines-sdk-python311
kubeflow-pipelines-sdk-python312 afa1286 link true /test kubeflow-pipelines-sdk-python312

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

// begin platform level / workflow level config

// duration in seconds after which the pipeline platform will do garbage collection
optional int32 completed_pipeline_ttl = 2;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per @droctothorpe comment from yesterday's meeting, what do we think about providing a comment here about logging? something a long the lines of:

"Note: when persisted logging is unavailable, any tasks that are garbage collected will result in the loss of the associated logs for those tasks in the UI."

adding this here will surface them in api docs, fyi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HumairAK
Copy link
Collaborator

HumairAK commented Jul 4, 2024

To summarize my understanding of @zijianjoy 's comments from July 3rd KFP call (last topic discussed):

There are various levels where we could implement garbage collection of underlying pods:

  1. At the Deployment level (i.e. when deploying kfp, set a ttl option for all pods)
  2. At the Pipeline level (i.e. when uploading a pipeline, and/or for any runs created for a pipeline)
  3. At the Pipeline Run level (set ttl for individual Pipeline Runs)
  4. At the task level

As well as being able to do 2,3,4 via the SDK or UI, or some combination thereof.

What exists to day is to do (4) via the sdk.

This PR allows users to do do (2) via the sdk.

As it is very annoying for users to do this for every task. The implementation is extensible that we could easily add the capability to do (3)/(2) as well though this would likely require additional db columns for pipelines (for detecting ttl, then simply checking for this during compilation). For (4) we could simply have a config option (also check for this during compilation).

I think this addition is simple enough that we need not worry about growing complexity of adding support for these varying levels in the future.

@zijianjoy / @chensun please let us know your thoughts on the above

// begin platform level / workflow level config

// duration in seconds after which the pipeline platform will do garbage collection
optional int32 completed_pipeline_ttl = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would suggest changing this type to duration. Duration explicitly represents time intervals, making the intent of the field more apparent

@chensun
Copy link
Member

chensun commented Jul 31, 2024

I think TTL after completion at pod level makes most sense. This can be configured at deployment time. And if we want to override the deployment setting for individual pipelines/tasks, setting it via kfp-kubernetes extension and explicitly calling out it's TTL for pod would be my preference.

My rational is I think TTL after completion at pipeline (and pipeline run) level is kind of misleading. We're not cleaning up pipeline run records from the database, post TTL runs still shows up when listing runs via UI or SDK. The only thing get garbage collected is their pods (or maybe some Argo workflow record in addition to the pods).

@HumairAK
Copy link
Collaborator

Thanks @chensun, great points! I agree that completed_pipeline_ttl here is definitely misleading.

setting it via kfp-kubernetes extension and explicitly calling out it's TTL for pod would be my preference.

However while the end result is pod clean up, the implementation uses the backing engine's TTL api. I.e. it is not directly interacting with k8s native resources, but rather via the engine's crd api. Based on the kuberenetes-extension proto, it seems to me to be highly geared towards kubernetes native components at the task pod level. Adding a field that configures the backing pipeline engines ttl strategy doesn't seem to fit in with the rest of its api IMO.

As you mentioned in Argo's case the Workflow resource is also cleaned up. So something like pod_ttl wouldn't be fully accurate either.

I think @gregsheremeta plans to explore the kubernetes-extension approach again, but maybe we can use a different name for the api field so as it is more clear of its intentions, regardless of whether it's in kubernetes_platform or pipeline_spec.

While we look into this it would be useful to know how strongly opinionated you are towards this change going into pipelines_spec (with a less misleading field_name) vs kubernetes_platform.

@gregsheremeta
Copy link
Contributor Author

I've extracted the PipelineConfig addition to its own PR, #11112.

Regarding the TTL portion of this PR, I don't see anything in Argo Workflows or kubernetes to allow setting TTLs or self-destructs on individual pods. For pod-level TTL, I think we would have to implement a custom controller, or find one that already exists that we could reuse.

Alternatively, we can stick with pipeline-level TTL, but we'd have to clean up database rows (runs, maybe other stuff?) in addition to setting the Workflow TTL field. This option sounds preferable to me.

Closing this PR while I research the above and wait for #11112 to land.

@droctothorpe
Copy link
Contributor

I think TTL after completion at pod level makes most sense. This can be configured at deployment time. And if we want to override the deployment setting for individual pipelines/tasks, setting it via kfp-kubernetes extension and explicitly calling out it's TTL for pod would be my preference.

My rational is I think TTL after completion at pipeline (and pipeline run) level is kind of misleading. We're not cleaning up pipeline run records from the database, post TTL runs still shows up when listing runs via UI or SDK. The only thing get garbage collected is their pods (or maybe some Argo workflow record in addition to the pods).

Maybe we can disambiguate between etcd TTL (for runs) and KFP database TTL (for runs). I think it's important to be able to configure both separately.

We use the KFP database for long term storage, reproducibility, and model governance. As a result, we need either long-term or indefinite TTL in the KFP database.

In constrast, when it comes to etcd, reduced TTL is advisable. A large volume of non-GCed Argo Workflow custom resources can lead to K8s API latency and can even crash a cluster. Don't ask me how I know. Okay, fine: we were testing a scheduled run that created a new run / workflow every 10 seconds and there was no TTL.

etcd / Workflow CR TTL was configurable in v1. IMHO, that gap should be closed in v2, and is probably a smaller, self-contained issue that can be decoupled from database TTL and pod-specific TTL (both of which are meaningful issues in their own right ofc).

Side note: something to be aware of with pod TTL is that if Workflow TTL is less than pod TTL, the pod will be GCed earlier than anticipated since the pods have an owner reference to the Workflow CR, i.e. deletions cascade.

Also, FWIW, the people concerned about database cleanup are usually platform admins, not end users. As a result, end users will likely not really care about database TTL. With that in mind, it might make more sense to handle database cleanup through an external process that platform admins can control.

Apologies if any of the above is stating the obvious!

@gregsheremeta
Copy link
Contributor Author

Maybe we can disambiguate between etcd TTL (for runs) and KFP database TTL (for runs). I think it's important to be able to configure both separately.

This was in the back of my mind, but I didn't think it was important :) Happy to hear a different perspective, though.

That said, I strongly dislike when we leak implementation details into the KFP SDK. I already dislike that we have to leak setting a TTL into the SDK. I'll dislike it even more if we make it even leakier by going from set_ttl() to set_database_ttl() and set_argo_workflow_ttl() 🤮

@gregsheremeta
Copy link
Contributor Author

Design Doc:

Kubeflow Pipelines - Design - Implement TTL for pipeline runs - Google Docs
https://docs.google.com/document/d/1Aakg7udmM_YLEU7gqGi9PbowTZDscy_zzVRx0CcAXeQ/edit

@droctothorpe
Copy link
Contributor

Will continue the convo in the design doc.

@rimolive
Copy link
Member

rimolive commented Dec 5, 2024

I'll dislike it even more if we make it even leakier by going from set_ttl() to set_database_ttl() and set_argo_workflow_ttl()

I don't think DB Cleanup should be something to set in SDK, but an env var in the Persistence Agent. My idea is a flag to keep run history on database with a default value set to true, and admins who don't care much about the run history data persisted in DB they just change it to false.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[sdk] Missing Alternative for set_ttl_seconds_after_finished in KFP v2
8 participants