From b9bd0eed60da7feab30f8ce19807f42c382d4875 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 6 Oct 2022 18:38:00 -0400 Subject: [PATCH] first impl to handle CWL Directory type (relates to #466) --- CHANGES.rst | 2 +- .../DirectoryProcess/deploy.yml | 23 +++ .../DirectoryProcess/package.cwl | 27 ++++ tests/functional/test_wps_package.py | 148 +++++++++++++++++- tests/test_formats.py | 9 ++ tests/utils.py | 17 +- weaver/formats.py | 14 +- weaver/processes/constants.py | 2 +- weaver/processes/convert.py | 10 +- 9 files changed, 241 insertions(+), 11 deletions(-) create mode 100644 tests/functional/application-packages/DirectoryProcess/deploy.yml create mode 100644 tests/functional/application-packages/DirectoryProcess/package.cwl diff --git a/CHANGES.rst b/CHANGES.rst index 29d7eb9de..f81b98d21 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,7 +12,7 @@ Changes Changes: -------- -- No change. +- Add support of `CWL` ``Directory`` type (resolves `#466 `_). Fixes: ------ diff --git a/tests/functional/application-packages/DirectoryProcess/deploy.yml b/tests/functional/application-packages/DirectoryProcess/deploy.yml new file mode 100644 index 000000000..5579e7c69 --- /dev/null +++ b/tests/functional/application-packages/DirectoryProcess/deploy.yml @@ -0,0 +1,23 @@ +processDescription: + id: DirectoryProcess + title: List contents of an input directory to the output text file. + version: "0.0.1" + inputs: + - id: input_dir + formats: + - mimeType: application/directory + default: true + minOccurs: 1 + maxOccurs: 1 + outputs: + - id: output_file + formats: + - mimeType: text/plain + default: true + minOccurs: 1 + maxOccurs: 1 + visibility: public +executionUnit: + - test: DirectoryProcess.cwl +immediateDeployment: true +deploymentProfileName: "http://www.opengis.net/profiles/eoc/dockerizedApplication" diff --git a/tests/functional/application-packages/DirectoryProcess/package.cwl b/tests/functional/application-packages/DirectoryProcess/package.cwl new file mode 100644 index 000000000..d51a7feab --- /dev/null +++ b/tests/functional/application-packages/DirectoryProcess/package.cwl @@ -0,0 +1,27 @@ +cwlVersion: v1.0 +class: CommandLineTool +baseCommand: + - bash + - script.sh +requirements: + DockerRequirement: + dockerPull: debian:stretch-slim + InitialWorkDirRequirement: + listing: + - entryname: script.sh + entry: | + set -x + echo "Input: $2" + echo "Output: $1" + find "$1" ! -type d -exec ls -lht {} + > output.txt + cat output.txt +inputs: + input_dir: + type: Directory + inputBinding: + position: 1 +outputs: + output_file: + type: File + outputBinding: + glob: "output.txt" diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 3253910b4..fce8cce32 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -50,7 +50,7 @@ from weaver.processes.constants import CWL_REQUIREMENT_APP_DOCKER, CWL_REQUIREMENT_INIT_WORKDIR, ProcessSchema from weaver.processes.types import ProcessType from weaver.status import Status -from weaver.utils import get_any_value +from weaver.utils import get_any_value, fetch_file, load_file from weaver.wps.utils import get_wps_output_dir, map_wps_output_location if TYPE_CHECKING: @@ -713,6 +713,20 @@ def test_deploy_merge_literal_io_from_package_and_offering(self): "Additional detail only within WPS output", \ "Additional details defined only in WPS matching CWL I/O by ID should be preserved." + def test_deploy_resolve_complex_io_format_directory(self): + """ + Test that directory complex type is resolved from CWL. + + .. versionadded:: 4.26 + """ + body = self.retrieve_payload("DirectoryProcess", "deploy", local=True) + pkg = self.retrieve_payload("DirectoryProcess", "package", local=True) + # remove definitions in deploy body to evaluate auto-resolution from CWL 'type: Directory' + body["processDescription"].pop("inputs") + body["executionUnit"] = [{"unit": pkg}] + desc, _ = self.deploy_process(body, describe_schema=ProcessSchema.OGC) + assert desc["inputs"]["input_dir"]["formats"][0]["mediaType"] == ContentType.APP_DIR + def test_deploy_merge_complex_io_format_references(self): """ Test validates that known `WPS` I/O formats (i.e.: `MIME-type`) considered as valid, but not corresponding to @@ -2179,6 +2193,132 @@ def test_execute_job_with_custom_file_name(self): f"Original: [{tmp_name_random}]" ) + def test_execute_with_browsable_directory(self): + """ + Test that HTTP browsable directory-like structure retrieves children files recursively for the process. + + .. versionadded:: 4.26 + """ + body = self.retrieve_payload("DirectoryProcess", "deploy", local=True) + pkg = self.retrieve_payload("DirectoryProcess", "package", local=True) + body["executionUnit"] = [{"unit": pkg}] + desc, _ = self.deploy_process(body, describe_schema=ProcessSchema.OGC) + assert desc["inputs"]["input_dir"]["formats"][0]["mediaType"] == ContentType.APP_DIR + + with contextlib.ExitStack() as stack: + tmp_host = "https://mocked-file-server.com" # must match in 'Execute_WorkflowSelectCopyNestedOutDir.json' + tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) + stack.enter_context(mocked_file_server(tmp_dir, tmp_host, self.settings)) + test_http_dir = f"{tmp_host}/dir/" + test_http_dir_files = [ + "dir/file.txt", + "dir/sub/file.txt", + "dir/sub/nested/file.txt", + "other/file.txt", + "root.file.txt" + ] + for file in test_http_dir_files: + path = os.path.join(tmp_dir, file) + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, mode="w", encoding="utf-8") as f: + f.write("test data") + + exec_body = { + "mode": ExecuteMode.ASYNC, + "response": ExecuteResponse.DOCUMENT, + "inputs": [ + {"id": "input_dir", "href": test_http_dir}, + ], + "outputs": [ + {"id": "output_file", "transmissionMode": ExecuteTransmissionMode.REFERENCE}, + ] + } + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + proc_url = f"/processes/DirectoryProcess/jobs" + resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5, + data=exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + status_url = resp.json["location"] + + results = self.monitor_job(status_url) + assert "output_file" in results + stack.enter_context(mocked_wps_output(self.settings)) + tmpdir = stack.enter_context(tempfile.TemporaryDirectory()) + output_file = fetch_file(results["output_file"]["href"], tmpdir, self.settings) + output_data = load_file(output_file, text=True) + + # because files under dir are fetched and mounted in stage dir, random sub-dir from CWL is generated + # ignore this part of the paths for testing invariant results + # ignore prefixed file metadata generated by the process listing + cwl_stage_dir = "/var/lib/cwl/" # /stg/ + output_listing = [file.rsplit(" ")[-1] for file in output_data.split("\n")] + expect_http_files = "\n".join([file for file in test_http_dir_files if file.startswith("dir/")]) + assert len(output_listing) == len(expect_http_files) + assert all(file.startswith(cwl_stage_dir) for file in output_listing) + assert all(any(file.endswith(dir_file) for file in output_listing) for dir_file in expect_http_files) + + @mocked_aws_credentials + @mocked_aws_s3 + def test_execute_with_bucket_directory(self): + """ + Test that directory pointing at a S3 bucket downloads all children files recursively for the process. + + .. versionadded:: 4.26 + """ + body = self.retrieve_payload("DirectoryProcess", "deploy", local=True) + pkg = self.retrieve_payload("DirectoryProcess", "package", local=True) + body["executionUnit"] = [{"unit": pkg}] + desc, _ = self.deploy_process(body, describe_schema=ProcessSchema.OGC) + assert desc["inputs"]["input_dir"]["formats"][0]["mediaType"] == ContentType.APP_DIR + + test_bucket_files = [ + "dir/file.txt", + "dir/sub/file.txt", + "dir/sub/nested/file.txt", + "other/file.txt", + "root.file.txt" + ] + test_bucket_ref = "wps-process-test-bucket" + for file in test_bucket_files: + mocked_aws_s3_bucket_test_file(test_bucket_ref, file) + test_bucket_dir = f"s3://{test_bucket_ref}/dir/" + exec_body = { + "mode": ExecuteMode.ASYNC, + "response": ExecuteResponse.DOCUMENT, + "inputs": [ + {"id": "input_dir", "href": test_bucket_dir}, + ], + "outputs": [ + {"id": "output_file", "transmissionMode": ExecuteTransmissionMode.REFERENCE}, + ] + } + with contextlib.ExitStack() as stack: + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + proc_url = f"/processes/DirectoryProcess/jobs" + resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5, + data=exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + status_url = resp.json["location"] + + results = self.monitor_job(status_url) + assert "output_file" in results + stack.enter_context(mocked_wps_output(self.settings)) + tmpdir = stack.enter_context(tempfile.TemporaryDirectory()) + output_file = fetch_file(results["output_file"]["href"], tmpdir, self.settings) + output_data = load_file(output_file, text=True) + + # because files under dir are fetched and mounted in stage dir, random sub-dir from CWL is generated + # ignore this part of the paths for testing invariant results + # ignore prefixed file metadata generated by the process listing + cwl_stage_dir = "/var/lib/cwl/" # /stg/ + output_listing = [file.rsplit(" ")[-1] for file in output_data.split("\n")] + expect_bucket_files = "\n".join([file for file in test_bucket_files if file.startswith("dir/")]) + assert len(output_listing) == len(expect_bucket_files) + assert all(file.startswith(cwl_stage_dir) for file in output_listing) + assert all(any(file.endswith(dir_file) for file in output_listing) for dir_file in expect_bucket_files) + # FIXME: create a real async test (threading/multiprocess) to evaluate this correctly def test_dismiss_job(self): """ @@ -2688,6 +2828,10 @@ def test_deploy_multi_outputs_file_from_wps_xml_reference(self): @pytest.mark.functional class WpsPackageAppWithS3BucketTest(WpsConfigBase): + """ + Test with output results uploaded to S3 bucket. + """ + @classmethod def setUpClass(cls): cls.settings = { @@ -2704,7 +2848,7 @@ def setUpClass(cls): @mocked_aws_credentials @mocked_aws_s3 - def test_execute_application_package_process_with_bucket(self): + def test_execute_application_package_process_with_bucket_results(self): """ Test validates: diff --git a/tests/test_formats.py b/tests/test_formats.py index eb24d2b11..3d09d6ccb 100644 --- a/tests/test_formats.py +++ b/tests/test_formats.py @@ -20,6 +20,11 @@ def test_get_extension(): assert f.get_extension("application/unknown") == ".unknown" +def test_get_extension_directory(): + assert f.get_extension(f.ContentType.APP_DIR, dot=True) == "/" + assert f.get_extension(f.ContentType.APP_DIR, dot=False) == "/" + + def test_get_extension_glob_any(): assert f.get_extension(f.ContentType.ANY) == ".*" @@ -32,6 +37,10 @@ def test_get_content_type(): assert f.get_content_type(".yaml") == f.ContentType.APP_YAML +def test_get_content_type_directory(): + assert f.get_content_type("/") == f.ContentType.APP_DIR + + def test_get_content_type_extra_parameters(): assert f.get_content_type(".unknown") is None assert f.get_content_type(".unknown", default=f.ContentType.TEXT_PLAIN) == f.ContentType.TEXT_PLAIN diff --git a/tests/utils.py b/tests/utils.py index be3a8d185..1d4bfa2e0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -81,6 +81,15 @@ # [WPS1-URL, GetCapPathXML, [DescribePathXML], [ExecutePathXML]] MockConfigWPS1 = Sequence[str, str, Optional[Sequence[str]], Optional[Sequence[str]]] MockReturnType = TypeVar("MockReturnType") + MockHttpMethod = Union[ + responses.HEAD, + responses.GET, + responses.POST, + responses.PATCH, + responses.PUT, + responses.DELETE, + responses.OPTIONS, + ] CommandType = Callable[[Union[str, Tuple[str]]], int] @@ -758,14 +767,16 @@ def get_xml(ref): # type: (str) -> str all_request.add((responses.GET, getcap_with_proc_id_url + version_query, get_cap_xml)) def apply_mocks(_mock_resp, _requests): + # type: (responses.RequestsMock, Iterable[Tuple[MockHttpMethod, str, str]]) -> None xml_header = {"Content-Type": ContentType.APP_XML} for meth, url, body in _requests: _mock_resp.add(meth, url, body=body, headers=xml_header) def mocked_remote_server_wrapper(test): - # type: (Callable) -> Callable + # type: (Callable[[..., Any], Any]) -> Callable[[..., Any], Any] @functools.wraps(test) def mock_requests_wps1(*args, **kwargs): + # type: (*Any, **Any) -> Any """ Mock ``requests`` responses fetching ``test_server_wps`` WPS reference. """ @@ -1028,6 +1039,7 @@ def mocked_aws_credentials(test_func): mistakenly overriding real bucket files. """ def wrapped(*args, **kwargs): + # type: (*Any, **Any) -> Any with mock.patch.dict(os.environ, { "AWS_ACCESS_KEY_ID": "testing", "AWS_SECRET_ACCESS_KEY": "testing", @@ -1048,6 +1060,7 @@ def mocked_aws_s3(test_func): attempt writing to real bucket. """ def wrapped(*args, **kwargs): + # type: (*Any, **Any) -> Any with moto.mock_s3(): return test_func(*args, **kwargs) return wrapped @@ -1092,12 +1105,14 @@ def mocked_http_file(test_func): - :func:`mocked_reference_test_file` """ def mocked_file_request(file_reference, file_outdir, **kwargs): + # type: (str, str, **Any) -> str if file_reference and file_reference.startswith(MOCK_HTTP_REF): file_reference = file_reference.replace(MOCK_HTTP_REF, "") file_path = fetch_file(file_reference, file_outdir, **kwargs) return file_path def wrapped(*args, **kwargs): + # type: (*Any, **Any) -> Any with mock.patch("weaver.processes.wps_package.fetch_file", side_effect=mocked_file_request): return test_func(*args, **kwargs) return wrapped diff --git a/weaver/formats.py b/weaver/formats.py index b62890a0d..a96c4daa7 100644 --- a/weaver/formats.py +++ b/weaver/formats.py @@ -43,6 +43,7 @@ class ContentType(Constants): "/" [x- | "."] ["+" suffix] *[";" parameter=value] """ + APP_DIR = "application/directory" APP_CWL = "application/cwl" APP_CWL_JSON = "application/cwl+json" APP_CWL_YAML = "application/cwl+yaml" @@ -204,7 +205,8 @@ class SchemaRole(Constants): ContentType.APP_TAR_GZ: ".tar.gz", ContentType.APP_YAML: ".yml", ContentType.IMAGE_TIFF: ".tif", # common alternate to .tiff - ContentType.ANY: ".*", # any for glob + ContentType.ANY: ".*", # any for glob + ContentType.APP_DIR: "/", # force href to finish with explicit '/' to mark directory ContentType.APP_OCTET_STREAM: "", ContentType.APP_FORM: "", ContentType.MULTI_PART_FORM: "", @@ -440,6 +442,8 @@ def _handle_dot(_ext): fmt = _CONTENT_TYPE_FORMAT_MAPPING.get(mime_type) if fmt: + if not fmt.extension.startswith("."): + return fmt.extension return _handle_dot(fmt.extension) ext = _CONTENT_TYPE_EXTENSION_MAPPING.get(mime_type) if ext: @@ -462,11 +466,15 @@ def get_content_type(extension, charset=None, default=None): :param default: Default Content-Type to return if no extension is matched. :return: Matched or default Content-Type. """ + ctype = None if not extension: return default if not extension.startswith("."): - extension = f".{extension}" - ctype = _EXTENSION_CONTENT_TYPES_MAPPING.get(extension) + ctype = _EXTENSION_CONTENT_TYPES_MAPPING.get(extension) + if not ctype: + extension = f".{extension}" + if not ctype: + ctype = _EXTENSION_CONTENT_TYPES_MAPPING.get(extension) if not ctype: return default return add_content_type_charset(ctype, charset) diff --git a/weaver/processes/constants.py b/weaver/processes/constants.py index f3fb44e47..5661584e6 100644 --- a/weaver/processes/constants.py +++ b/weaver/processes/constants.py @@ -111,7 +111,7 @@ class OpenSearchField(Constants): PACKAGE_EXTENSIONS = frozenset(["yaml", "yml", "json", "cwl", "job"]) PACKAGE_SIMPLE_TYPES = frozenset(["string", "boolean", "float", "int", "integer", "long", "double"]) PACKAGE_LITERAL_TYPES = frozenset(PACKAGE_SIMPLE_TYPES | {"null", "Any"}) -PACKAGE_COMPLEX_TYPES = frozenset(["File"]) # FIXME: type "Directory" not supported +PACKAGE_COMPLEX_TYPES = frozenset(["File", "Directory"]) PACKAGE_ENUM_BASE = "enum" PACKAGE_CUSTOM_TYPES = frozenset([PACKAGE_ENUM_BASE]) # can be anything, but support "enum" which is more common PACKAGE_ARRAY_BASE = "array" diff --git a/weaver/processes/convert.py b/weaver/processes/convert.py index 15dfcb0e2..c1fd863fb 100644 --- a/weaver/processes/convert.py +++ b/weaver/processes/convert.py @@ -53,6 +53,7 @@ PACKAGE_ARRAY_ITEMS, PACKAGE_ARRAY_MAX_SIZE, PACKAGE_ARRAY_TYPES, + PACKAGE_COMPLEX_TYPES, PACKAGE_CUSTOM_TYPES, PACKAGE_ENUM_BASE, PACKAGE_LITERAL_TYPES, @@ -109,7 +110,6 @@ AnySettingsContainer, AnyValueType, CWL, - CWL_IO_BaseType, CWL_IO_ComplexType, CWL_IO_EnumSymbols, CWL_IO_FileValue, @@ -1101,7 +1101,8 @@ def get_cwl_io_type(io_info, strict=True): io_min_occurs = 0 is_null = True - io_type = any2cwl_literal_datatype(io_type) + if io_type not in PACKAGE_COMPLEX_TYPES: + io_type = any2cwl_literal_datatype(io_type) io_def = CWLIODefinition( name=io_name, type=io_type, @@ -1200,7 +1201,10 @@ def cwl2wps_io(io_info, io_select): else: # we need to minimally add 1 format, otherwise empty list is evaluated as None by pywps # when "supported_formats" is None, the process's json property raises because of it cannot iterate formats - kw["supported_formats"] = [DEFAULT_FORMAT] + if io_def.type == "File": + kw["supported_formats"] = [DEFAULT_FORMAT] + if io_def.type == "Directory": + kw["supported_formats"] = [get_format(ContentType.APP_DIR)] kw["mode"] = MODE.NONE # don't validate anything as default is only raw text if is_output: if io_def.type == "Directory":