Skip to content

Commit

Permalink
WIP: test(backend) Use non-deprecated KFP for v2 samples test
Browse files Browse the repository at this point in the history
Signed-off-by: Giulio Frasca <[email protected]>
  • Loading branch information
gmfrasca committed Jul 10, 2024
1 parent 0eae430 commit 2333568
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 46 deletions.
6 changes: 3 additions & 3 deletions backend/src/v2/test/components/build_go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ inputs:
type: Folder
description: |
Input context, it should be a folder.
- {name: destination, type: URI, description: destination should not contain tag}
- {name: destination, type: String, description: destination should not contain tag}
outputs:
- {name: digest_launcher_v2, type: URI, description: KFP launcher v2 image digest}
- {name: digest_driver, type: URI, description: KFP driver image digest}
- {name: digest_launcher_v2, type: String, description: KFP launcher v2 image digest}
- {name: digest_driver, type: String, description: KFP driver image digest}
- {name: backend_compiler, type: Binary, description: KFP v2 backend compiler}
metadata:
annotations:
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/test/components/download_gcs_tgz.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

name: Download Folder from Tarball on GCS
inputs:
- {name: GCS Path, type: URI}
- {name: GCS Path, type: String}
outputs:
- {name: Folder}
implementation:
Expand Down
6 changes: 3 additions & 3 deletions backend/src/v2/test/components/kaniko.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ name: Kaniko
inputs:
- {name: dockerfile, type: String}
- name: context_uri
type: URI
type: String
default: ""
description: |
URI to download context, e.g. gs://mybucket/context.tar.gz.
Expand All @@ -27,12 +27,12 @@ inputs:
description: |
Input context artifact, it should be a folder.
At least one of context_uri and context_artifact should be specified.
- {name: context_sub_path, type: Path, default: ""}
- {name: context_sub_path, type: String, default: ""}
- {name: destination, type: String, description: destination should not contain tag}
- {name: cache, type: String, description: enable caching one of true/false, default: "true"}
- {name: cache_ttl, type: String, description: 'cache TTL, format like XXh', default: 24h}
outputs:
- {name: digest, type: URI, description: Image URI with full digest}
- {name: digest, type: String, description: Image URI with full digest}
metadata:
annotations:
author: Yuan Gong <[email protected]>
Expand Down
12 changes: 6 additions & 6 deletions backend/src/v2/test/components/run_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
name: Run KFP Test Sample
inputs:
- {name: Name, type: String}
- {name: Sample Path, type: Path}
- {name: GCS Root, type: URI}
- {name: Host, type: URI, default: "http://ml-pipeline:8888"}
- {name: External Host, type: URI}
- {name: Launcher v2 Image, type: URI, default: "gcr.io/ml-pipeline/kfp-launcher-v2:latest"}
- {name: Driver Image, type: URI, default: "gcr.io/ml-pipeline/kfp-driver:latest"}
- {name: Sample Path, type: String}
- {name: GCS Root, type: String}
- {name: Host, type: String, default: "http://ml-pipeline:8888"}
- {name: External Host, type: String}
- {name: Launcher v2 Image, type: String, default: "gcr.io/ml-pipeline/kfp-launcher-v2:latest"}
- {name: Driver Image, type: String, default: "gcr.io/ml-pipeline/kfp-driver:latest"}
- {name: backend_compiler, type: Binary}
implementation:
container:
Expand Down
64 changes: 31 additions & 33 deletions backend/src/v2/test/sample_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import json
import yaml
from kubernetes import client as k8s_client
import kfp.deprecated as kfp
import kfp as kfp

download_gcs_tgz = kfp.components.load_component_from_file(
'components/download_gcs_tgz.yaml')
Expand All @@ -38,43 +38,44 @@ def v2_sample_test(
'path': 'samples.v2.hello_world_test'
}
],
context: 'URI' = 'gs://your-bucket/path/to/context.tar.gz',
gcs_root: 'URI' = 'gs://ml-pipeline-test/v2',
image_registry: 'URI' = 'gcr.io/ml-pipeline-test',
kfp_host: 'URI' = 'http://ml-pipeline:8888',
context: str = 'gs://your-bucket/path/to/context.tar.gz',
gcs_root: str = 'gs://ml-pipeline-test/v2',
image_registry: str = 'gcr.io/ml-pipeline-test',
kfp_host: str = 'http://ml-pipeline:8888',
kfp_package_path:
'URI' = 'git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python'
str = 'git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python'
):
download_src_op = download_gcs_tgz(gcs_path=context).set_cpu_limit(
'0.5').set_memory_limit('500Mi').set_display_name('download_src')
download_src_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
# download_src_op.caching_strategy.max_cache_staleness = "P0D" # TODO(gfrasca): re-enable

def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp:
task: kfp.dsl.ContainerOp = kaniko(
def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerSpec:
task: kfp.dsl.ContainerSpec = kaniko(
context_artifact=download_src_op.outputs['folder'],
destination=f'{image_registry}/{name}',
dockerfile=dockerfile,
)
# CPU request/limit can be more flexible (request < limit), because being assigned to a node
# with insufficient CPU resource will only slow the task down, but not fail.
task.container.set_cpu_request('1').set_cpu_limit('2')
task.set_cpu_request('1').set_cpu_limit('2')
# Memory request/limit needs to be more rigid (request == limit), because in a node without
# enough memory, the task can hang indefinetely or OOM.
task.container.set_memory_request('4Gi').set_memory_limit('4Gi')
task.set_memory_request('4Gi').set_memory_limit('4Gi')
task.set_display_name(f'build-image-{name}')
task.set_retry(
1, policy='Always'
) # Always -> retry on both system error and user code failure.
1,
# policy='Always' # TODO(gfrasca): new RetryPolicy doesn't support policy, implement?
) # Always -> retry on both system error and user code failure.
return task

# build v2 go images
build_go_op = build_go(
destination=f'{image_registry}/kfp-',
context=download_src_op.outputs['folder'],
)
build_go_op.set_retry(1, policy='Always')
build_go_op.container.set_cpu_request('1').set_cpu_limit('2')
build_go_op.container.set_memory_request('4Gi').set_memory_limit('4Gi')
build_go_op.set_retry(1) # TODO(gfrasca): new RetryPolicy doesn't support policy, implement?
build_go_op.set_cpu_request('1').set_cpu_limit('2')
build_go_op.set_memory_request('4Gi').set_memory_limit('4Gi')

# build sample test image
build_samples_image_op = build_image(
Expand All @@ -84,7 +85,7 @@ def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp:

# run test samples in parallel
with kfp.dsl.ParallelFor(samples_config) as sample:
run_sample_op: kfp.dsl.ContainerOp = run_sample(
run_sample_op: kfp.dsl.ContainerSpec = run_sample(
name=sample.name,
sample_path=sample.path,
gcs_root=gcs_root,
Expand All @@ -93,14 +94,12 @@ def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp:
driver_image=build_go_op.outputs['digest_driver'],
backend_compiler=build_go_op.outputs['backend_compiler'],
)
run_sample_op.container.image = build_samples_image_op.outputs['digest']
run_sample_op.image = build_samples_image_op.outputs['digest']
run_sample_op.set_display_name(f'sample_{sample.name}')
run_sample_op.set_retry(1, policy='Always')

run_sample_op.container.add_env_variable(
k8s_client.V1EnvVar(
name='KFP_PACKAGE_PATH', value=kfp_package_path))
build_go_op.set_retry(1) # TODO(gfrasca): new RetryPolicy doesn't support policy, implement?

run_sample_op.set_env_variable(name='KFP_PACKAGE_PATH', value=kfp_package_path.value)


def main(
context: str,
Expand All @@ -125,10 +124,12 @@ def main(
name=experiment,
description='An experiment with Kubeflow Pipelines v2 sample test runs.'
)
conf = kfp.dsl.PipelineConf()
conf.set_timeout(
timeout_mins * _MINUTE
) # add timeout to avoid pipelines stuck in running leak indefinetely

# TODO(gfrasca): No timeout functionality in v2? https://github.com/kubeflow/pipelines/issues/9996
# conf = kfp.dsl.PipelineConf()
# conf.set_timeout(
# timeout_mins * _MINUTE
# ) # add timeout to avoid pipelines stuck in running leak indefinetely

print('Using KFP package path: {}'.format(kfp_package_path))
run_result = client.create_run_from_pipeline_func(
Expand All @@ -142,19 +143,16 @@ def main(
'kfp_package_path': kfp_package_path,
},
experiment_name=experiment,
pipeline_conf=conf,
#pipeline_conf=conf, #TODO(gfrasca): No longer an option
)
print("Run details page URL:")
print(f"{host}/#/runs/details/{run_result.run_id}")
run_response = run_result.wait_for_run_completion(timeout_mins * _MINUTE)
run = run_response.run
from pprint import pprint
# Hide verbose content
run_response.run.pipeline_spec.workflow_manifest = None
pprint(run_response.run)
pprint(run_response)
print("Run details page URL:")
print(f"{host}/#/runs/details/{run_result.run_id}")
assert run.status == 'Succeeded'
assert run_response.state == 'Succeeded'
# TODO(Bobgy): print debug info


Expand Down
3 changes: 3 additions & 0 deletions samples/WIP-REMOVEME
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
REMOVE BEFORE MERGING

Just to trigger the samples-v2 CI Check

0 comments on commit 2333568

Please sign in to comment.