Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Update Deprecated DSL components to use V2 classes #10990

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backend/src/v2/test/components/build_go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ inputs:
type: Folder
description: |
Input context, it should be a folder.
- {name: destination, type: URI, description: destination should not contain tag}
- {name: destination, type: String, description: destination should not contain tag}
outputs:
- {name: digest_launcher_v2, type: URI, description: KFP launcher v2 image digest}
- {name: digest_driver, type: URI, description: KFP driver image digest}
- {name: digest_launcher_v2, type: String, description: KFP launcher v2 image digest}
- {name: digest_driver, type: String, description: KFP driver image digest}
- {name: backend_compiler, type: Binary, description: KFP v2 backend compiler}
metadata:
annotations:
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/test/components/download_gcs_tgz.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

name: Download Folder from Tarball on GCS
inputs:
- {name: GCS Path, type: URI}
- {name: GCS Path, type: String}
outputs:
- {name: Folder}
implementation:
Expand Down
6 changes: 3 additions & 3 deletions backend/src/v2/test/components/kaniko.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ name: Kaniko
inputs:
- {name: dockerfile, type: String}
- name: context_uri
type: URI
type: String
default: ""
description: |
URI to download context, e.g. gs://mybucket/context.tar.gz.
Expand All @@ -27,12 +27,12 @@ inputs:
description: |
Input context artifact, it should be a folder.
At least one of context_uri and context_artifact should be specified.
- {name: context_sub_path, type: Path, default: ""}
- {name: context_sub_path, type: String, default: ""}
- {name: destination, type: String, description: destination should not contain tag}
- {name: cache, type: String, description: enable caching one of true/false, default: "true"}
- {name: cache_ttl, type: String, description: 'cache TTL, format like XXh', default: 24h}
outputs:
- {name: digest, type: URI, description: Image URI with full digest}
- {name: digest, type: String, description: Image URI with full digest}
metadata:
annotations:
author: Yuan Gong <[email protected]>
Expand Down
12 changes: 6 additions & 6 deletions backend/src/v2/test/components/run_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
name: Run KFP Test Sample
inputs:
- {name: Name, type: String}
- {name: Sample Path, type: Path}
- {name: GCS Root, type: URI}
- {name: Host, type: URI, default: "http://ml-pipeline:8888"}
- {name: External Host, type: URI}
- {name: Launcher v2 Image, type: URI, default: "gcr.io/ml-pipeline/kfp-launcher-v2:latest"}
- {name: Driver Image, type: URI, default: "gcr.io/ml-pipeline/kfp-driver:latest"}
- {name: Sample Path, type: String}
- {name: GCS Root, type: String}
- {name: Host, type: String, default: "http://ml-pipeline:8888"}
- {name: External Host, type: String}
- {name: Launcher v2 Image, type: String, default: "gcr.io/ml-pipeline/kfp-launcher-v2:latest"}
- {name: Driver Image, type: String, default: "gcr.io/ml-pipeline/kfp-driver:latest"}
- {name: backend_compiler, type: Binary}
implementation:
container:
Expand Down
66 changes: 32 additions & 34 deletions backend/src/v2/test/sample_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import json
import yaml
from kubernetes import client as k8s_client
import kfp.deprecated as kfp
import kfp as kfp

download_gcs_tgz = kfp.components.load_component_from_file(
'components/download_gcs_tgz.yaml')
Expand All @@ -38,43 +38,44 @@ def v2_sample_test(
'path': 'samples.v2.hello_world_test'
}
],
context: 'URI' = 'gs://your-bucket/path/to/context.tar.gz',
gcs_root: 'URI' = 'gs://ml-pipeline-test/v2',
image_registry: 'URI' = 'gcr.io/ml-pipeline-test',
kfp_host: 'URI' = 'http://ml-pipeline:8888',
context: str = 'gs://your-bucket/path/to/context.tar.gz',
gcs_root: str = 'gs://ml-pipeline-test/v2',
image_registry: str = 'gcr.io/ml-pipeline-test',
kfp_host: str = 'http://ml-pipeline:8888',
kfp_package_path:
'URI' = 'git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python'
str = 'git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python'
):
download_src_op = download_gcs_tgz(gcs_path=context).set_cpu_limit(
'0.5').set_memory_limit('500Mi').set_display_name('download_src')
download_src_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
'0.5').set_memory_limit('500Mi')
# download_src_op.caching_strategy.max_cache_staleness = "P0D" # TODO(gfrasca): re-enable

def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp:
task: kfp.dsl.ContainerOp = kaniko(
def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerSpec:
task: kfp.dsl.ContainerSpec = kaniko(
context_artifact=download_src_op.outputs['folder'],
destination=f'{image_registry}/{name}',
dockerfile=dockerfile,
)
# CPU request/limit can be more flexible (request < limit), because being assigned to a node
# with insufficient CPU resource will only slow the task down, but not fail.
task.container.set_cpu_request('1').set_cpu_limit('2')
task.set_cpu_request('1').set_cpu_limit('2')
# Memory request/limit needs to be more rigid (request == limit), because in a node without
# enough memory, the task can hang indefinetely or OOM.
task.container.set_memory_request('4Gi').set_memory_limit('4Gi')
task.set_memory_request('4Gi').set_memory_limit('4Gi')
task.set_display_name(f'build-image-{name}')
task.set_retry(
1, policy='Always'
) # Always -> retry on both system error and user code failure.
1,
# policy='Always' # TODO(gfrasca): new RetryPolicy doesn't support policy, implement?
) # Always -> retry on both system error and user code failure.
return task

# build v2 go images
build_go_op = build_go(
destination=f'{image_registry}/kfp-',
context=download_src_op.outputs['folder'],
)
build_go_op.set_retry(1, policy='Always')
build_go_op.container.set_cpu_request('1').set_cpu_limit('2')
build_go_op.container.set_memory_request('4Gi').set_memory_limit('4Gi')
build_go_op.set_retry(1) # TODO(gfrasca): new RetryPolicy doesn't support policy, implement?
build_go_op.set_cpu_request('1').set_cpu_limit('2')
build_go_op.set_memory_request('4Gi').set_memory_limit('4Gi')

# build sample test image
build_samples_image_op = build_image(
Expand All @@ -84,7 +85,7 @@ def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp:

# run test samples in parallel
with kfp.dsl.ParallelFor(samples_config) as sample:
run_sample_op: kfp.dsl.ContainerOp = run_sample(
run_sample_op: kfp.dsl.ContainerSpec = run_sample(
name=sample.name,
sample_path=sample.path,
gcs_root=gcs_root,
Expand All @@ -93,14 +94,12 @@ def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp:
driver_image=build_go_op.outputs['digest_driver'],
backend_compiler=build_go_op.outputs['backend_compiler'],
)
run_sample_op.container.image = build_samples_image_op.outputs['digest']
run_sample_op.image = build_samples_image_op.outputs['digest']
run_sample_op.set_display_name(f'sample_{sample.name}')
run_sample_op.set_retry(1, policy='Always')

run_sample_op.container.add_env_variable(
k8s_client.V1EnvVar(
name='KFP_PACKAGE_PATH', value=kfp_package_path))
build_go_op.set_retry(1) # TODO(gfrasca): new RetryPolicy doesn't support policy, implement?

run_sample_op.set_env_variable(name='KFP_PACKAGE_PATH', value=kfp_package_path.value)


def main(
context: str,
Expand All @@ -125,10 +124,12 @@ def main(
name=experiment,
description='An experiment with Kubeflow Pipelines v2 sample test runs.'
)
conf = kfp.dsl.PipelineConf()
conf.set_timeout(
timeout_mins * _MINUTE
) # add timeout to avoid pipelines stuck in running leak indefinetely

# TODO(gfrasca): No timeout functionality in v2? https://github.com/kubeflow/pipelines/issues/9996
# conf = kfp.dsl.PipelineConf()
# conf.set_timeout(
# timeout_mins * _MINUTE
# ) # add timeout to avoid pipelines stuck in running leak indefinetely

print('Using KFP package path: {}'.format(kfp_package_path))
run_result = client.create_run_from_pipeline_func(
Expand All @@ -142,19 +143,16 @@ def main(
'kfp_package_path': kfp_package_path,
},
experiment_name=experiment,
pipeline_conf=conf,
#pipeline_conf=conf, #TODO(gfrasca): No longer an option
)
print("Run details page URL:")
print(f"{host}/#/runs/details/{run_result.run_id}")
run_response = run_result.wait_for_run_completion(timeout_mins * _MINUTE)
run = run_response.run
from pprint import pprint
# Hide verbose content
run_response.run.pipeline_spec.workflow_manifest = None
pprint(run_response.run)
pprint(run_response)
print("Run details page URL:")
print(f"{host}/#/runs/details/{run_result.run_id}")
assert run.status == 'Succeeded'
assert run_response.state == 'Succeeded'
# TODO(Bobgy): print debug info


Expand Down
3 changes: 3 additions & 0 deletions samples/WIP-REMOVEME
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
REMOVE BEFORE MERGING

Just to trigger the samples-v2 CI Check
Loading