diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 60d04d7f53a9..f58a9677ea14 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1353,7 +1353,7 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { } glog.V(4).Info("producer: ", producer) currentTask := producer - var outputArtifactKey string = taskOutput.GetOutputArtifactKey() + outputArtifactKey := taskOutput.GetOutputArtifactKey() currentSubTaskMaybeDAG := true // Continue looping until we reach a sub-task that is NOT a DAG. for currentSubTaskMaybeDAG { @@ -1371,9 +1371,9 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { glog.V(4).Infof("Deserialized outputArtifacts: %v", outputArtifacts) // Adding support for multiple output artifacts var subTaskName string - value := outputArtifacts[outputArtifactKey].GetArtifactSelectors() + artifactSelectors := outputArtifacts[outputArtifactKey].GetArtifactSelectors() - for _, v := range value { + for _, v := range artifactSelectors { glog.V(4).Infof("v: %v", v) glog.V(4).Infof("v.ProducerSubtask: %v", v.ProducerSubtask) glog.V(4).Infof("v.OutputArtifactKey: %v", v.OutputArtifactKey) diff --git a/samples/v2/sample_test.py b/samples/v2/sample_test.py index d34599a3c18e..ed5fa0da825c 100644 --- a/samples/v2/sample_test.py +++ b/samples/v2/sample_test.py @@ -11,20 +11,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os -import unittest -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import as_completed +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass +import inspect +import os from pprint import pprint from typing import List +import unittest +import component_with_optional_inputs +import hello_world import kfp from kfp.dsl.graph_component import GraphComponent -import component_with_optional_inputs +import pipeline_container_no_input import pipeline_with_env -import hello_world import producer_consumer_param -import pipeline_container_no_input import two_step_pipeline_containerized _MINUTE = 60 # seconds @@ -38,16 +40,21 @@ class TestCase: class SampleTest(unittest.TestCase): - _kfp_host_and_port = os.getenv('KFP_API_HOST_AND_PORT', 'http://localhost:8888') - _kfp_ui_and_port = os.getenv('KFP_UI_HOST_AND_PORT', 'http://localhost:8080') + _kfp_host_and_port = os.getenv('KFP_API_HOST_AND_PORT', + 'http://localhost:8888') + _kfp_ui_and_port = os.getenv('KFP_UI_HOST_AND_PORT', + 'http://localhost:8080') _client = kfp.Client(host=_kfp_host_and_port, ui_host=_kfp_ui_and_port) def test(self): test_cases: List[TestCase] = [ TestCase(pipeline_func=hello_world.pipeline_hello_world), - TestCase(pipeline_func=producer_consumer_param.producer_consumer_param_pipeline), - TestCase(pipeline_func=pipeline_container_no_input.pipeline_container_no_input), - TestCase(pipeline_func=two_step_pipeline_containerized.two_step_pipeline_containerized), + TestCase(pipeline_func=producer_consumer_param + .producer_consumer_param_pipeline), + TestCase(pipeline_func=pipeline_container_no_input + .pipeline_container_no_input), + TestCase(pipeline_func=two_step_pipeline_containerized + .two_step_pipeline_containerized), TestCase(pipeline_func=component_with_optional_inputs.pipeline), TestCase(pipeline_func=pipeline_with_env.pipeline_with_env), @@ -56,27 +63,51 @@ def test(self): # TestCase(pipeline_func=pipeline_with_volume.pipeline_with_volume), # TestCase(pipeline_func=pipeline_with_secret_as_volume.pipeline_secret_volume), # TestCase(pipeline_func=pipeline_with_secret_as_env.pipeline_secret_env), + + # This next set of tests needs to be commented out until issue + # https://github.com/kubeflow/pipelines/issues/11239#issuecomment-2374792592 + # is addressed or the driver image that is used in CI is updated + # because otherwise the tests are run against incompatible version + # of the driver. In the meantime, for local validation, these tests + # can be executed (once you've manually deployed an updated driver + # image). + + # TestCase(pipeline_func=subdagio.parameter.crust), + # TestCase(pipeline_func=subdagio.parameter_cache.crust), + # TestCase(pipeline_func=subdagio.artifact_cache.crust), + # TestCase(pipeline_func=subdagio.artifact.crust), + # TestCase(pipeline_func=subdagio.mixed_parameters.crust), + # TestCase(pipeline_func=subdagio.multiple_parameters_namedtuple.crust) + # TestCase(pipeline_func=subdagio.multiple_artifacts_namedtuple.crust), ] with ThreadPoolExecutor() as executor: futures = [ - executor.submit(self.run_test_case, test_case.pipeline_func, test_case.timeout) - for test_case in test_cases + executor.submit(self.run_test_case, test_case.pipeline_func, + test_case.timeout) for test_case in test_cases ] for future in as_completed(futures): future.result() def run_test_case(self, pipeline_func: GraphComponent, timeout: int): with self.subTest(pipeline=pipeline_func, msg=pipeline_func.name): - run_result = self._client.create_run_from_pipeline_func(pipeline_func=pipeline_func) + print( + f'Running pipeline: {inspect.getmodule(pipeline_func.pipeline_func).__name__}/{pipeline_func.name}.' + ) + run_result = self._client.create_run_from_pipeline_func( + pipeline_func=pipeline_func) run_response = run_result.wait_for_run_completion(timeout) pprint(run_response.run_details) - print("Run details page URL:") - print(f"{self._kfp_ui_and_port}/#/runs/details/{run_response.run_id}") + print('Run details page URL:') + print( + f'{self._kfp_ui_and_port}/#/runs/details/{run_response.run_id}') - self.assertEqual(run_response.state, "SUCCEEDED") + self.assertEqual(run_response.state, 'SUCCEEDED') + print( + f'Pipeline, {inspect.getmodule(pipeline_func.pipeline_func).__name__}/{pipeline_func.name}, succeeded.' + ) if __name__ == '__main__': diff --git a/samples/v2/subdagio/__init__.py b/samples/v2/subdagio/__init__.py new file mode 100644 index 000000000000..dc8f8b3ceaee --- /dev/null +++ b/samples/v2/subdagio/__init__.py @@ -0,0 +1,7 @@ +from subdagio import artifact +from subdagio import artifact_cache +from subdagio import mixed_parameters +from subdagio import multiple_artifacts_namedtuple +from subdagio import multiple_parameters_namedtuple +from subdagio import parameter +from subdagio import parameter_cache diff --git a/samples/v2/subdagio/artifact.py b/samples/v2/subdagio/artifact.py new file mode 100644 index 000000000000..8f425662a1b7 --- /dev/null +++ b/samples/v2/subdagio/artifact.py @@ -0,0 +1,47 @@ +import os + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp(dataset: dsl.Output[dsl.Dataset]): + with open(dataset.path, 'w') as f: + f.write('foo') + + +@dsl.component +def crust_comp(input: dsl.Dataset): + with open(input.path, 'r') as f: + print('input: ', f.read()) + + +@dsl.pipeline +def core() -> dsl.Dataset: + task = core_comp() + task.set_caching_options(False) + + return task.output + + +@dsl.pipeline +def mantle() -> dsl.Dataset: + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.output + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp(input=dag_task.output) + task.set_caching_options(False) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/artifact_cache.py b/samples/v2/subdagio/artifact_cache.py new file mode 100644 index 000000000000..5b52b25fb23d --- /dev/null +++ b/samples/v2/subdagio/artifact_cache.py @@ -0,0 +1,42 @@ +import os + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp(dataset: dsl.Output[dsl.Dataset]): + with open(dataset.path, 'w') as f: + f.write('foo') + + +@dsl.component +def crust_comp(input: dsl.Dataset): + with open(input.path, 'r') as f: + print('input: ', f.read()) + + +@dsl.pipeline +def core() -> dsl.Dataset: + task = core_comp() + + return task.output + + +@dsl.pipeline +def mantle() -> dsl.Dataset: + dag_task = core() + return dag_task.output + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + + task = crust_comp(input=dag_task.output) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/mixed_parameters.py b/samples/v2/subdagio/mixed_parameters.py new file mode 100644 index 000000000000..0a660d335d95 --- /dev/null +++ b/samples/v2/subdagio/mixed_parameters.py @@ -0,0 +1,48 @@ +import os + +from kfp import Client +from kfp import dsl +from kfp.compiler import Compiler + + +@dsl.component +def core_comp() -> int: + return 1 + + +@dsl.component +def crust_comp(x: int, y: int): + print('sum :', x + y) + + +@dsl.pipeline +def core() -> int: + task = core_comp() + task.set_caching_options(False) + + return task.output + + +@dsl.pipeline +def mantle() -> int: + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.output + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp(x=2, y=dag_task.output) + task.set_caching_options(False) + + +if __name__ == '__main__': + Compiler().compile( + pipeline_func=crust, + package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/multiple_artifacts_namedtuple.py b/samples/v2/subdagio/multiple_artifacts_namedtuple.py new file mode 100644 index 000000000000..7d2777d38b06 --- /dev/null +++ b/samples/v2/subdagio/multiple_artifacts_namedtuple.py @@ -0,0 +1,66 @@ +import os +from typing import NamedTuple + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp(ds1: dsl.Output[dsl.Dataset], ds2: dsl.Output[dsl.Dataset]): + with open(ds1.path, 'w') as f: + f.write('foo') + with open(ds2.path, 'w') as f: + f.write('bar') + + +@dsl.component +def crust_comp( + ds1: dsl.Dataset, + ds2: dsl.Dataset, +): + with open(ds1.path, 'r') as f: + print('ds1: ', f.read()) + with open(ds2.path, 'r') as f: + print('ds2: ', f.read()) + + +@dsl.pipeline +def core() -> NamedTuple( + 'outputs', + ds1=dsl.Dataset, + ds2=dsl.Dataset, +): # type: ignore + task = core_comp() + task.set_caching_options(False) + + return task.outputs + + +@dsl.pipeline +def mantle() -> NamedTuple( + 'outputs', + ds1=dsl.Dataset, + ds2=dsl.Dataset, +): # type: ignore + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.outputs + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp( + ds1=dag_task.outputs['ds1'], + ds2=dag_task.outputs['ds2'], + ) + task.set_caching_options(False) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/multiple_parameters_namedtuple.py b/samples/v2/subdagio/multiple_parameters_namedtuple.py new file mode 100644 index 000000000000..29699088554d --- /dev/null +++ b/samples/v2/subdagio/multiple_parameters_namedtuple.py @@ -0,0 +1,51 @@ +import os +from typing import NamedTuple + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp() -> NamedTuple('outputs', val1=str, val2=str): # type: ignore + outputs = NamedTuple('outputs', val1=str, val2=str) + return outputs('foo', 'bar') + + +@dsl.component +def crust_comp(val1: str, val2: str): + print('val1: ', val1) + print('val2: ', val2) + + +@dsl.pipeline +def core() -> NamedTuple('outputs', val1=str, val2=str): # type: ignore + task = core_comp() + task.set_caching_options(False) + + return task.outputs + + +@dsl.pipeline +def mantle() -> NamedTuple('outputs', val1=str, val2=str): # type: ignore + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.outputs + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp( + val1=dag_task.outputs['val1'], + val2=dag_task.outputs['val2'], + ) + task.set_caching_options(False) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/parameter.py b/samples/v2/subdagio/parameter.py new file mode 100644 index 000000000000..c00439dd1c80 --- /dev/null +++ b/samples/v2/subdagio/parameter.py @@ -0,0 +1,45 @@ +import os + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp() -> str: + return 'foo' + + +@dsl.component +def crust_comp(input: str): + print('input :', input) + + +@dsl.pipeline +def core() -> str: + task = core_comp() + task.set_caching_options(False) + + return task.output + + +@dsl.pipeline +def mantle() -> str: + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.output + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp(input=dag_task.output) + task.set_caching_options(False) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/parameter_cache.py b/samples/v2/subdagio/parameter_cache.py new file mode 100644 index 000000000000..9fe2402e2b8c --- /dev/null +++ b/samples/v2/subdagio/parameter_cache.py @@ -0,0 +1,40 @@ +import os + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp() -> str: + return 'foo' + + +@dsl.component +def crust_comp(input: str): + print('input :', input) + + +@dsl.pipeline +def core() -> str: + task = core_comp() + + return task.output + + +@dsl.pipeline +def mantle() -> str: + dag_task = core() + + return dag_task.output + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + task = crust_comp(input=dag_task.output) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 6e4bc4e8690c..fbc3bb463df3 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -1872,7 +1872,9 @@ def validate_pipeline_outputs_dict( f'Pipeline outputs may only be returned from the top level of the pipeline function scope. Got pipeline output from within the control flow group dsl.{channel.task.parent_task_group.__class__.__name__}.' ) else: - raise ValueError(f'Got unknown pipeline output: {channel}.') + raise ValueError( + f'Got unknown pipeline output, {channel}, of type {type(channel)}.' + ) def create_pipeline_spec( @@ -2006,13 +2008,18 @@ def convert_pipeline_outputs_to_dict( output name to PipelineChannel.""" if pipeline_outputs is None: return {} + elif isinstance(pipeline_outputs, dict): + # This condition is required to support pipelines that return NamedTuples. + return pipeline_outputs elif isinstance(pipeline_outputs, pipeline_channel.PipelineChannel): return {component_factory.SINGLE_OUTPUT_NAME: pipeline_outputs} elif isinstance(pipeline_outputs, tuple) and hasattr( pipeline_outputs, '_asdict'): return dict(pipeline_outputs._asdict()) else: - raise ValueError(f'Got unknown pipeline output: {pipeline_outputs}') + raise ValueError( + f'Got unknown pipeline output, {pipeline_outputs}, of type {type(pipeline_outputs)}.' + ) def write_pipeline_spec_to_file(