diff --git a/backend/src/v2/test/components/build_go.yaml b/backend/src/v2/test/components/build_go.yaml index ee748458a50..6941bbd678e 100644 --- a/backend/src/v2/test/components/build_go.yaml +++ b/backend/src/v2/test/components/build_go.yaml @@ -18,10 +18,10 @@ inputs: type: Folder description: | Input context, it should be a folder. -- {name: destination, type: URI, description: destination should not contain tag} +- {name: destination, type: String, description: destination should not contain tag} outputs: -- {name: digest_launcher_v2, type: URI, description: KFP launcher v2 image digest} -- {name: digest_driver, type: URI, description: KFP driver image digest} +- {name: digest_launcher_v2, type: String, description: KFP launcher v2 image digest} +- {name: digest_driver, type: String, description: KFP driver image digest} - {name: backend_compiler, type: Binary, description: KFP v2 backend compiler} metadata: annotations: diff --git a/backend/src/v2/test/components/download_gcs_tgz.yaml b/backend/src/v2/test/components/download_gcs_tgz.yaml index cdae99f1011..06eca9bc48e 100644 --- a/backend/src/v2/test/components/download_gcs_tgz.yaml +++ b/backend/src/v2/test/components/download_gcs_tgz.yaml @@ -14,7 +14,7 @@ name: Download Folder from Tarball on GCS inputs: -- {name: GCS Path, type: URI} +- {name: GCS Path, type: String} outputs: - {name: Folder} implementation: diff --git a/backend/src/v2/test/components/kaniko.yaml b/backend/src/v2/test/components/kaniko.yaml index fa9cd5e779c..1bc3178ec90 100644 --- a/backend/src/v2/test/components/kaniko.yaml +++ b/backend/src/v2/test/components/kaniko.yaml @@ -16,7 +16,7 @@ name: Kaniko inputs: - {name: dockerfile, type: String} - name: context_uri - type: URI + type: String default: "" description: | URI to download context, e.g. gs://mybucket/context.tar.gz. @@ -27,12 +27,12 @@ inputs: description: | Input context artifact, it should be a folder. At least one of context_uri and context_artifact should be specified. -- {name: context_sub_path, type: Path, default: ""} +- {name: context_sub_path, type: String, default: ""} - {name: destination, type: String, description: destination should not contain tag} - {name: cache, type: String, description: enable caching one of true/false, default: "true"} - {name: cache_ttl, type: String, description: 'cache TTL, format like XXh', default: 24h} outputs: -- {name: digest, type: URI, description: Image URI with full digest} +- {name: digest, type: String, description: Image URI with full digest} metadata: annotations: author: Yuan Gong diff --git a/backend/src/v2/test/components/run_sample.yaml b/backend/src/v2/test/components/run_sample.yaml index d4b4c9b7e97..bf41cc11fb2 100644 --- a/backend/src/v2/test/components/run_sample.yaml +++ b/backend/src/v2/test/components/run_sample.yaml @@ -15,12 +15,12 @@ name: Run KFP Test Sample inputs: - {name: Name, type: String} -- {name: Sample Path, type: Path} -- {name: GCS Root, type: URI} -- {name: Host, type: URI, default: "http://ml-pipeline:8888"} -- {name: External Host, type: URI} -- {name: Launcher v2 Image, type: URI, default: "gcr.io/ml-pipeline/kfp-launcher-v2:latest"} -- {name: Driver Image, type: URI, default: "gcr.io/ml-pipeline/kfp-driver:latest"} +- {name: Sample Path, type: String} +- {name: GCS Root, type: String} +- {name: Host, type: String, default: "http://ml-pipeline:8888"} +- {name: External Host, type: String} +- {name: Launcher v2 Image, type: String, default: "gcr.io/ml-pipeline/kfp-launcher-v2:latest"} +- {name: Driver Image, type: String, default: "gcr.io/ml-pipeline/kfp-driver:latest"} - {name: backend_compiler, type: Binary} implementation: container: diff --git a/backend/src/v2/test/sample_test.py b/backend/src/v2/test/sample_test.py index 6cb4c12a309..60d73079058 100644 --- a/backend/src/v2/test/sample_test.py +++ b/backend/src/v2/test/sample_test.py @@ -18,7 +18,7 @@ import json import yaml from kubernetes import client as k8s_client -import kfp.deprecated as kfp +import kfp as kfp download_gcs_tgz = kfp.components.load_component_from_file( 'components/download_gcs_tgz.yaml') @@ -38,33 +38,34 @@ def v2_sample_test( 'path': 'samples.v2.hello_world_test' } ], - context: 'URI' = 'gs://your-bucket/path/to/context.tar.gz', - gcs_root: 'URI' = 'gs://ml-pipeline-test/v2', - image_registry: 'URI' = 'gcr.io/ml-pipeline-test', - kfp_host: 'URI' = 'http://ml-pipeline:8888', + context: str = 'gs://your-bucket/path/to/context.tar.gz', + gcs_root: str = 'gs://ml-pipeline-test/v2', + image_registry: str = 'gcr.io/ml-pipeline-test', + kfp_host: str = 'http://ml-pipeline:8888', kfp_package_path: - 'URI' = 'git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python' + str = 'git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python' ): download_src_op = download_gcs_tgz(gcs_path=context).set_cpu_limit( - '0.5').set_memory_limit('500Mi').set_display_name('download_src') - download_src_op.execution_options.caching_strategy.max_cache_staleness = "P0D" + '0.5').set_memory_limit('500Mi') + # download_src_op.caching_strategy.max_cache_staleness = "P0D" # TODO(gfrasca): re-enable - def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp: - task: kfp.dsl.ContainerOp = kaniko( + def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerSpec: + task: kfp.dsl.ContainerSpec = kaniko( context_artifact=download_src_op.outputs['folder'], destination=f'{image_registry}/{name}', dockerfile=dockerfile, ) # CPU request/limit can be more flexible (request < limit), because being assigned to a node # with insufficient CPU resource will only slow the task down, but not fail. - task.container.set_cpu_request('1').set_cpu_limit('2') + task.set_cpu_request('1').set_cpu_limit('2') # Memory request/limit needs to be more rigid (request == limit), because in a node without # enough memory, the task can hang indefinetely or OOM. - task.container.set_memory_request('4Gi').set_memory_limit('4Gi') + task.set_memory_request('4Gi').set_memory_limit('4Gi') task.set_display_name(f'build-image-{name}') task.set_retry( - 1, policy='Always' - ) # Always -> retry on both system error and user code failure. + 1, + # policy='Always' # TODO(gfrasca): new RetryPolicy doesn't support policy, implement? + ) # Always -> retry on both system error and user code failure. return task # build v2 go images @@ -72,9 +73,9 @@ def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp: destination=f'{image_registry}/kfp-', context=download_src_op.outputs['folder'], ) - build_go_op.set_retry(1, policy='Always') - build_go_op.container.set_cpu_request('1').set_cpu_limit('2') - build_go_op.container.set_memory_request('4Gi').set_memory_limit('4Gi') + build_go_op.set_retry(1) # TODO(gfrasca): new RetryPolicy doesn't support policy, implement? + build_go_op.set_cpu_request('1').set_cpu_limit('2') + build_go_op.set_memory_request('4Gi').set_memory_limit('4Gi') # build sample test image build_samples_image_op = build_image( @@ -84,7 +85,7 @@ def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp: # run test samples in parallel with kfp.dsl.ParallelFor(samples_config) as sample: - run_sample_op: kfp.dsl.ContainerOp = run_sample( + run_sample_op: kfp.dsl.ContainerSpec = run_sample( name=sample.name, sample_path=sample.path, gcs_root=gcs_root, @@ -93,14 +94,12 @@ def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp: driver_image=build_go_op.outputs['digest_driver'], backend_compiler=build_go_op.outputs['backend_compiler'], ) - run_sample_op.container.image = build_samples_image_op.outputs['digest'] + run_sample_op.image = build_samples_image_op.outputs['digest'] run_sample_op.set_display_name(f'sample_{sample.name}') - run_sample_op.set_retry(1, policy='Always') - - run_sample_op.container.add_env_variable( - k8s_client.V1EnvVar( - name='KFP_PACKAGE_PATH', value=kfp_package_path)) + build_go_op.set_retry(1) # TODO(gfrasca): new RetryPolicy doesn't support policy, implement? + run_sample_op.set_env_variable(name='KFP_PACKAGE_PATH', value=kfp_package_path.value) + def main( context: str, @@ -125,10 +124,12 @@ def main( name=experiment, description='An experiment with Kubeflow Pipelines v2 sample test runs.' ) - conf = kfp.dsl.PipelineConf() - conf.set_timeout( - timeout_mins * _MINUTE - ) # add timeout to avoid pipelines stuck in running leak indefinetely + + # TODO(gfrasca): No timeout functionality in v2? https://github.com/kubeflow/pipelines/issues/9996 + # conf = kfp.dsl.PipelineConf() + # conf.set_timeout( + # timeout_mins * _MINUTE + # ) # add timeout to avoid pipelines stuck in running leak indefinetely print('Using KFP package path: {}'.format(kfp_package_path)) run_result = client.create_run_from_pipeline_func( @@ -142,19 +143,16 @@ def main( 'kfp_package_path': kfp_package_path, }, experiment_name=experiment, - pipeline_conf=conf, + #pipeline_conf=conf, #TODO(gfrasca): No longer an option ) print("Run details page URL:") print(f"{host}/#/runs/details/{run_result.run_id}") run_response = run_result.wait_for_run_completion(timeout_mins * _MINUTE) - run = run_response.run from pprint import pprint - # Hide verbose content - run_response.run.pipeline_spec.workflow_manifest = None - pprint(run_response.run) + pprint(run_response) print("Run details page URL:") print(f"{host}/#/runs/details/{run_result.run_id}") - assert run.status == 'Succeeded' + assert run_response.state == 'Succeeded' # TODO(Bobgy): print debug info diff --git a/samples/WIP-REMOVEME b/samples/WIP-REMOVEME new file mode 100644 index 00000000000..ae7a823c545 --- /dev/null +++ b/samples/WIP-REMOVEME @@ -0,0 +1,3 @@ +REMOVE BEFORE MERGING + +Just to trigger the samples-v2 CI Check