diff --git a/CHANGES.rst b/CHANGES.rst index a0cb4f786..39d136c54 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,7 +12,16 @@ Changes Changes: -------- -- No change. +- Add `Job` log message size checks to better control what gets logged during the `Application Package` execution to + avoid large documents causing problems when attempting save them to storage database. +- Update documentation with examples for ``cwltool:CUDARequirement``, ``ResourceRequirement`` and ``NetworkAccess``. +- Improve schema definition of ``ResourceRequirement``. +- Deprecate ``DockerGpuRequirement``, with attempts to auto-convert it into corresponding ``DockerRequirement`` + combined with ``cwltool:CUDARequirement`` definitions. If this conversion does not work transparently for the user, + explicit `CWL` updates with those definitions should be made. +- Ensure that validation check finds exactly one provided `CWL` requirement or hint to represent the application type. + In case of missing requirement, the `Process` deployment will fail with a reported error that contains a documentation + link to guide the user in adjusting its `Application Package` accordingly. Fixes: ------ @@ -20,7 +29,6 @@ Fixes: Use ``packaging.version.Version`` substitute whenever possible, but preserve backward compatibility with ``distutils`` in case of older Python not supporting it. - Fix ``cli._update_files`` so there are no attempts to upload remote references to the `Vault`. -- No change. .. _changes_4.27.0: diff --git a/README.rst b/README.rst index c6bce0767..cfcd3bcb1 100644 --- a/README.rst +++ b/README.rst @@ -5,6 +5,9 @@ Weaver **Implementations** * |ogc-proc-long| +* |wps| +* |esgf| processes +* |cwl| for |ogc-apppkg|_ * |ems| for Workflows * |ades| @@ -108,7 +111,7 @@ the application definition provided by |cwl| configuration. It can then directly a registered process |ogc-apppkg|_ with received inputs from a WPS request to expose output results for a following `ADES` in a `EMS` workflow execution chain. -`Weaver` **extends** |ogc-proc-api|_ by providing additional functionalities such as more detailed job logs +`Weaver` **extends** |ogc-api-proc|_ by providing additional functionalities such as more detailed job logs endpoints, adding more process management and search request options than required by the standard, and supporting *remote providers* registration for dynamic process definitions, to name a few. Because of this, not all features offered in `Weaver` are guaranteed to be applicable on other similarly @@ -283,9 +286,9 @@ It is part of `PAVICS`_ and `Birdhouse`_ ecosystems and is available within the .. |wps| replace:: `Web Processing Services` (WPS) .. |ogc| replace:: Open Geospatial Consortium .. _ogc: https://www.ogc.org/ -.. |ogc-proc-api| replace:: `OGC API - Processes` -.. _ogc-proc-api: https://github.com/opengeospatial/ogcapi-processes -.. |ogc-proc-long| replace:: |ogc-proc-api|_ (WPS-REST bindings) +.. |ogc-api-proc| replace:: `OGC API - Processes` +.. _ogc-api-proc: https://github.com/opengeospatial/ogcapi-processes +.. |ogc-proc-long| replace:: |ogc-api-proc|_ (WPS-REST bindings) .. |ogc-tb14| replace:: OGC Testbed-14 .. _ogc-tb14: https://www.ogc.org/projects/initiatives/testbed14 .. |ogc-tb14-platform-er| replace:: ADES & EMS Results and Best Practices Engineering Report diff --git a/docs/examples/docker-python-script-report.cwl b/docs/examples/docker-python-script-report.cwl index 891ec73ad..70a7b9ad0 100644 --- a/docs/examples/docker-python-script-report.cwl +++ b/docs/examples/docker-python-script-report.cwl @@ -1,3 +1,4 @@ +#!/usr/bin/env cwl-runner cwlVersion: v1.0 class: CommandLineTool baseCommand: diff --git a/docs/examples/docker-shell-script-cat.cwl b/docs/examples/docker-shell-script-cat.cwl index 23463d9b6..1b2ba93ac 100644 --- a/docs/examples/docker-shell-script-cat.cwl +++ b/docs/examples/docker-shell-script-cat.cwl @@ -1,3 +1,4 @@ +#!/usr/bin/env cwl-runner cwlVersion: v1.0 class: CommandLineTool baseCommand: cat @@ -13,4 +14,5 @@ outputs: - id: output type: File outputBinding: - glob: stdout.log + glob: output.txt +stdout: output.txt diff --git a/docs/examples/requirement-cuda.cwl b/docs/examples/requirement-cuda.cwl new file mode 100644 index 000000000..28d30ca5b --- /dev/null +++ b/docs/examples/requirement-cuda.cwl @@ -0,0 +1,19 @@ +#!/usr/bin/env cwl-runner +cwlVersion: v1.2 +class: CommandLineTool +baseCommand: nvidia-smi +requirements: + cwltool:CUDARequirement: + cudaVersionMin: "11.2" + cudaComputeCapability: "7.5" + cudaDeviceCountMin: 1 + cudaDeviceCountMax: 4 +$namespaces: + cwltool: "http://commonwl.org/cwltool#" +inputs: {} +outputs: + output: + type: File + outputBinding: + glob: output.txt +stdout: output.txt diff --git a/docs/examples/requirement-network.cwl b/docs/examples/requirement-network.cwl new file mode 100644 index 000000000..8bb890206 --- /dev/null +++ b/docs/examples/requirement-network.cwl @@ -0,0 +1,16 @@ +#!/usr/bin/env cwl-runner +cwlVersion: v1.2 +class: CommandLineTool +baseCommand: curl +requirements: + NetworkAccess: + networkAccess: true +inputs: + url: + type: string +outputs: + output: + type: File + outputBinding: + glob: "output.txt" +stdout: "output.txt" diff --git a/docs/examples/requirement-resources.cwl b/docs/examples/requirement-resources.cwl new file mode 100644 index 000000000..1bac40882 --- /dev/null +++ b/docs/examples/requirement-resources.cwl @@ -0,0 +1,21 @@ +#!/usr/bin/env cwl-runner +cwlVersion: v1.2 +class: CommandLineTool +baseCommand: "" +requirements: + ResourceRequirement: + coresMin: 8 + coresMax: 16 + ramMin: 1024 + ramMax: 2048 + tmpdirMin: 128 + tmpdirMax: 1024 + outdirMin: 1024 + outdirMax: 2048 +inputs: {} +outputs: + output: + type: File + outputBinding: + glob: output.txt +stdout: output.txt diff --git a/docs/source/appendix.rst b/docs/source/appendix.rst index 98030d1d3..33fdecedb 100644 --- a/docs/source/appendix.rst +++ b/docs/source/appendix.rst @@ -180,7 +180,8 @@ Glossary input/output chaining between operations. .. seealso:: - Refer to :ref:`proc_workflow`, :ref:`proc_workflow_ops` and :ref:`CWL Workflow` sections for more details. + Refer to :ref:`proc_workflow`, :ref:`proc_workflow_ops` and :ref:`app_pkg_workflow` + sections for more details. WPS | Web Processing Service. @@ -193,6 +194,10 @@ Glossary WPS-REST Alias employed to refer to :term:`OGC API - Processes` endpoints for corresponding :term:`WPS` definitions. + WPS-T + Alias employed to refer to older revisions of :term:`OGC API - Processes` standard. + The name referred to :term:`WPS` *Transactional* operations introduced by the RESTful API. + XML | Extensible Markup Language | Alternative representation of some data object provided by the application. Requires appropriate ``Accept`` @@ -218,5 +223,5 @@ Useful Links - |iana-link|_ - |oas|_ - |ogc|_ (:term:`OGC`) -- |ogc-proc-api|_ +- |ogc-api-proc|_ - |weaver-issues|_ diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index 27a974fe5..59c84e37c 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -214,7 +214,7 @@ they are optional and which default value or operation is applied in each situat | (default: *path* ``/``) | | Endpoint that will be employed as prefix to refer to WPS-REST requests - | (including but not limited to |ogc-proc-api|_ schemas). + | (including but not limited to |ogc-api-proc|_ schemas). | | It can either be the explicit *full URL* to use or the *path* relative to ``weaver.url``. | Setting ``weaver.wps_restapi_path`` is ignored if its URL equivalent is defined. diff --git a/docs/source/faq.rst b/docs/source/faq.rst index 02c1fed96..5e8691006 100644 --- a/docs/source/faq.rst +++ b/docs/source/faq.rst @@ -105,7 +105,7 @@ Problem connecting workflow steps together .. seealso:: - - :ref:`CWL Workflow` + - :ref:`app_pkg_workflow` - :ref:`Output File Format` diff --git a/docs/source/package.rst b/docs/source/package.rst index b112ca9d3..afe59147f 100644 --- a/docs/source/package.rst +++ b/docs/source/package.rst @@ -30,12 +30,15 @@ definition available with |pkg-req|_ request. .. note:: The package request is a `Weaver`-specific implementation, and therefore, is not necessarily available on other - :term:`ADES`/:term:`EMS` implementation as this feature is not part of |ogc-proc-api|_ specification. + :term:`ADES`/:term:`EMS` implementation as this feature is not part of |ogc-api-proc|_ specification. +.. _app_pkg_types: Typical CWL Package Definition =========================================== +.. _app_pkg_cmd: + CWL CommandLineTool ------------------------ @@ -46,7 +49,7 @@ Following :term:`CWL` package definition represents the :py:mod:`weaver.processe :linenos: The first main components is the ``class: CommandLineTool`` that tells `Weaver` it will be an *atomic* process -(contrarily to `CWL Workflow`_ presented later). +(contrarily to :ref:`app_pkg_workflow` presented later). The other important sections are ``inputs`` and ``outputs``. These define which parameters will be expected and produced by the described application. `Weaver` supports most formats and types as specified by |cwl-spec|_. @@ -159,6 +162,128 @@ whenever required for launching new :term:`Job` executions. .. versionadded:: 4.5 Specification and handling of the ``X-Auth-Docker`` header for providing an authentication token. +.. _app_pkg_resources: + +GPU and Resource dependant Applications +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When an :term:`Application Package` requires GPU or any other minimal set of hardware capabilities, such as in the +case of machine learning or high-performance computing tasks, the submitted :term:`CWL` must explicitly indicate +those requirements to ensure they can be met for performing its execution. Similarly, an :term:`Application Package` +that must obtain external access to remote contents must not assume that the connection would be available, and +must therefore request network access. Below are examples where such requirements are demonstrated and how to +define them. + +.. literalinclude:: ../examples/requirement-cuda.cwl + :language: yaml + :caption: Sample CWL definition with CUDA capabilities + :name: example_app_pkg_cuda + +.. literalinclude:: ../examples/requirement-resources.cwl + :language: yaml + :caption: Sample CWL definition with computing resources + :name: example_app_pkg_resources + +.. literalinclude:: ../examples/requirement-network.cwl + :language: yaml + :caption: Sample CWL definition with network access + :name: example_app_pkg_network + +Above requirements can be combined in any fashion as needed. They can also be combined with any other requirements +employed to define the core components of the application. + +Whenever possible, requirements should be provided with values that best match the minimum and maximum amount of +resources that the :term:`Application Package` operation requires. More precisely, over-requesting resources should +be avoided as this could lead to failing :term:`Job` execution if the server or worker node processing it deems it +cannot fulfill the requirements because they are too broad to obtain proper resource assignation, because it has +insufficient computing resources, or simply for rate-limiting/fair-share reasons. + +Although definitions such as |cwl-resource-req|_ and |cwl-cuda-req|_ are usually applied for atomic operations, +they can also become relevant in the context of :ref:`app_pkg_workflow` execution. Effectively, providing the +required hardware capabilities for each atomic application can allow the :term:`Workflow` engine to better schedule +:term:`Job` steps. For example, if two computationally heavy steps happened to have no restriction for parallelization +based on the :term:`Workflow` steps definition alone, but that running both of them simultaneously on the same machine +would necessarily end up causing an ``OutOfMemory`` error due to insufficient resources, those requirements could help +preemptively let the engine know to wait until *reserved* resources become available. As a result, execution of the +second task could be delayed until the first task is completed, therefore avoiding the error. + +.. versionadded:: 4.17 + Support of |cwl-resource-req|_. + +.. versionadded:: 4.27 + Support of |cwl-network-req|_ and |cwl-cuda-req|_. + +.. versionchanged:: + Deprecated ``DockerGpuRequirement``. + +.. warning:: + Any :term:`Application Package` that was making use of ``DockerGpuRequirement`` should be updated to employ + the official |cwl-docker-req|_ in combination with |cwl-cuda-req|_. For backward compatibility, any detected + ``DockerGpuRequirement`` definition will be updated automatically with a minimalistic |cwl-cuda-req|_ definition + using a very lax set of CUDA capabilities. It is recommended to provide specific configurations for your needs. + +.. _app_pkg_remote: +.. _app_pkg_wps1: +.. _app_pkg_ogc_api: +.. _app_pkg_esgf_cwt: + +Remote Applications +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To define an application that refers to a :ref:`proc_remote_provider`, an :ref:`proc_wps_12`, an :ref:`proc_ogc_api` +or an :ref:`proc_esgf_cwt` endpoint, the corresponding `Weaver`-specific :term:`CWL`-like requirements must be employed +to indicate the URL where that remote resource is accessible. Once deployed, the contained :term:`CWL` +package and the resulting :term:`Process` will be exposed as a :ref:`proc_ogc_api` resource. + +Upon reception of a :ref:`Process Execution ` request, `Weaver` will take care of resolving +the indicated process URL from the :term:`CWL` requirement and will dispatch the execution to the resource +after applying any relevant I/O, parameter and Media-Type conversion to align with the target server standard +for submitting the :term:`Job` requests. + +Below are examples of the corresponding :term:`CWL` requirements employed for each type of remote application. + +.. code-block:: yaml + :caption: WPS-1/2 Package Definition + + cwlVersion: "v1.0" + class: CommandLineTool + hints: + WPS1Requirement: + provider: "https://example.com/ows/wps/catalog" + process: "getpoint" + +.. code-block:: yaml + :caption: OGC API Package Definition + + cwlVersion: "v1.0" + class: CommandLineTool + hints: + OGCAPIRequirement: + process: "https://example.com/ogcapi/processes/getpoint" + +.. code-block:: json + :caption: ESGF-CWT Package Definition + + { + "cwlVersion": "v1.0", + "class": "CommandLineTool", + "hints": { + "ESGF-CWTRequirement": { + "provider": "https://edas.nccs.nasa.gov/wps/cwt", + "process": "xarray.subset" + } + } + } + + +.. seealso:: + - :ref:`proc_remote_provider` + - :ref:`proc_wps_12` + - :ref:`proc_ogc_api` + - :ref:`proc_esgf_cwt` + +.. _app_pkg_workflow: + CWL Workflow ------------------------ @@ -671,10 +796,10 @@ validate the full process integrity before it can be executed, this means that o permitted in its context (providing many will raise a validation error when parsing the :term:`CWL` definition). To ensure compatibility with multiple *supported formats* outputs of :term:`WPS`, any output that has more that one -format will have its ``format`` field dropped in the corresponding :term:`CWL` definition. Without any ``format`` on -the :term:`CWL` side, the validation process will ignore this specification and will effectively accept any type of -file. This will not break any execution operation with :term:`CWL`, but it will remove the additional validation layer -of the format (which especially deteriorates process resolution when chaining processes inside a :ref:`CWL Workflow`). +format will have its ``format`` field dropped in the corresponding :term:`CWL` definition. Without any ``format`` on the +:term:`CWL` side, the validation process will ignore this specification and will effectively accept any type of file. +This will not break any execution operation with :term:`CWL`, but it will remove the additional validation layer of the +format (which especially deteriorates process resolution when chaining processes inside a :ref:`app_pkg_workflow`). If the :term:`WPS` output only specifies a single MIME-type, then the equivalent format (after being resolved to a valid ontology) will be preserved on the :term:`CWL` side since the result is ensured to be the unique one provided. For this diff --git a/docs/source/processes.rst b/docs/source/processes.rst index d5e9586a4..ec2e01042 100644 --- a/docs/source/processes.rst +++ b/docs/source/processes.rst @@ -17,19 +17,17 @@ Type of Processes `Weaver` supports multiple type of processes, as listed below. Each one of them are accessible through the same API interface, but they have different implications. -- `Builtin`_ -- `WPS-1/2`_ -- `WPS-REST`_ (a.k.a.: WPS-3, |ogc-proc-api|_) -- `ESGF-CWT`_ -- `Workflow`_ -- `Remote Provider`_ - +- :ref:`proc_builtin` +- :ref:`proc_wps_12` +- :ref:`OGC API - Processes ` (formerly known as :term:`WPS-REST`, :term:`WPS-T` or `WPS-3`) +- :ref:`proc_esgf_cwt` +- :ref:`proc_workflow` +- :ref:`proc_remote_provider` .. seealso:: Section |examples|_ provides multiple concrete use cases of :ref:`Deploy ` and :ref:`Execute ` request payloads for diverse set of applications. - .. _proc_builtin: Builtin @@ -63,10 +61,11 @@ WPS-1/2 ------- This kind of process corresponds to a *traditional* :term:`WPS` :term:`XML` or :term:`JSON` endpoint -(depending of supported version) prior to `WPS-REST`_ specification. When the `WPS-REST`_ process is deployed -in `Weaver` using an URL reference to an WPS-1/2 process, `Weaver` parses and converts the :term:`XML` or :term:`JSON` -body of the response and registers the process locally using this definition. This allows a remote server offering -limited functionalities (e.g.: no REST bindings supported) to provide them through `Weaver`. +(depending of supported version) prior to :ref:`proc_wps_rest` specification. When an |ogc-api-proc|_ description is +deployed in `Weaver` using an URL reference to an WPS-1/2 process through the use of a :ref:`app_pkg_wps` requirement, +`Weaver` parses and converts the :term:`XML` or :term:`JSON` body of the :term:`WPS` response and registers the process +locally. This allows a remote server offering limited functionalities (e.g.: no REST bindings supported) +to provide them through `Weaver`. A minimal :ref:`Deploy ` request body for this kind of process could be as follows: @@ -103,18 +102,22 @@ Please refer to :ref:`Configuration of WPS Processes` section for more details o .. seealso:: - `Remote Provider`_ +.. _proc_ogc_api: .. _proc_wps_rest: -WPS-REST --------- +OGC API - Processes (WPS-REST, WPS-T, WPS-3) +-------------------------------------------- This :term:`Process` type is the main component of `Weaver`. All other types are converted to this one either -through some parsing (e.g.: `WPS-1/2`_) or with some requirement indicators (e.g.: `Builtin`_, `Workflow`_) for -special handling. +through some parsing (e.g.: :ref:`proc_wps_12`) or with some requirement indicators +(e.g.: :ref:`proc_builtin`, :ref:`proc_workflow`) for +special handling. The represented :term:`Process` is aligned with |ogc-api-proc|_ specifications. When deploying one such :term:`Process` directly, it is expected to have a definition specified -with a :term:`CWL` `Application Package`_. -This is most of the time employed to wrap an operations packaged in a reference :term:`Docker` image. +with a :term:`CWL` `Application Package`_, which provides resources about one of the described :ref:`app_pkg_types`. + +This is most of the time employed to wrap operations packaged in a reference :term:`Docker` image, but it can also +wrap :ref:`app_pkg_remote` to be executed on another server (i.e.: :term:`ADES`). The reference package can be provided in multiple ways as presented below. .. note:: @@ -198,48 +201,24 @@ Where the referenced file hosted at ``"https://remote-file-server.com/my-package ESGF-CWT ---------- -For *traditional* WPS-1 process type, Weaver adds default values to :term:`CWL` definition. As we can see in -:mod:`weaver/processes/wps_package.py`, the following default values for the :term:`CWL` package are: - -.. code-block:: python - - cwl_package = OrderedDict([ - ("cwlVersion", "v1.0"), - ("class", "CommandLineTool"), - ("hints", { - CWL_REQUIREMENT_APP_WPS1: { - "provider": get_url_without_query(wps_service_url), - "process": process_id, - }}), - ]) - -In :term:`ESGF-CWT` processes, ``ESGF-CWTRequirement`` hint must be used instead of usual ``WPS1Requirement``, contained -in the :py:data:`weaver.processes.constants.CWL_REQUIREMENT_APP_WPS1` variable. The handling of this technicality is -handled in :mod:`weaver/processes/wps_package.py`. We can define :term:`ESGF-CWT` processes using this syntax: +For :term:`ESGF-CWT` processes, the ``ESGF-CWTRequirement`` hint must be used. +For an example :term:`CWL` using this definition, see :ref:`app_pkg_esgf_cwt` section. -.. code-block:: json - - { - "cwlVersion": "v1.0", - "class": "CommandLineTool", - "hints": { - "ESGF-CWTRequirement": { - "provider": "https://edas.nccs.nasa.gov/wps/cwt", - "process": "xarray.subset" - } - } - } +This kind of :term:`Process` allows for remote :ref:`Execution ` and +:ref:`Monitoring ` of a :term:`Job` dispatched to an instance that +implements |esgf-cwt-git|_ part of the |esgf|_. +Using `Weaver`, this :term:`Process` automatically obtains an :ref:`proc_ogc_api` representation. .. _proc_workflow: Workflow ---------- -Processes categorized as :term:`Workflow` are very similar to `WPS-REST`_ processes. From the API standpoint, they -actually look exactly the same as an atomic process when calling :ref:`DescribeProcess ` +Processes categorized as :term:`Workflow` are very similar to :ref:`proc_wps_rest` processes. From the API standpoint, +they actually look exactly the same as an atomic process when calling :ref:`DescribeProcess ` or :ref:`Execute ` requests. -The difference lies within the referenced :ref:`Application Package` which uses a :ref:`CWL Workflow` instead of -typical :ref:`CWL CommandLineTool`, and therefore, modifies how the :term:`Process` is internally executed. +The difference lies within the referenced :ref:`Application Package` which uses a :ref:`app_pkg_workflow` instead of +typical :ref:`app_pkg_cmd`, and therefore, modifies how the :term:`Process` is internally executed. For :term:`Workflow` processes to be deploy-able and executable, it is **mandatory** that `Weaver` is configured as :term:`EMS` or :term:`HYBRID` (see: :ref:`Configuration Settings`). This requirement is due to the nature @@ -292,21 +271,21 @@ responses, the parsing operation accomplished by `Weaver` makes theses services allows the user to use it as a central hub to keep references to all his remotely accessible services and dispatch :term:`Job` executions from a common location. -A *remote provider* differs from previously presented `WPS-1/2`_ processes such that the underlying processes of the -service are not registered locally. For example, if a remote service has two WPS processes, only top-level service URL -will be registered locally (in `Weaver`'s database) and the application will have no explicit knowledge of these remote -processes until requested. When calling :term:`Process`-specific requests +A *remote provider* differs from previously presented :ref:`proc_wps_12` processes such that the underlying processes +of the service are not registered locally. For example, if a remote service has two WPS processes, only top-level +service URL will be registered locally (in `Weaver`'s database) and the application will have no explicit knowledge +of these remote processes until requested. When calling :term:`Process`-specific requests (e.g.: :ref:`DescribeProcess ` or :ref:`Execute `), `Weaver` will re-send the corresponding request (with appropriate interface conversion) directly to the remote :term:`Provider` each time and -return the result accordingly. On the other hand, a `WPS-1/2`_ reference would be parsed and saved locally with the -response *at the time of deployment*. This means that a deployed `WPS-1/2`_ reference would act as a *snapshot* of the -reference :term:`Process` (which could become out-of-sync), while `Remote Provider`_ will dynamically update according -to the re-fetched response from the remote service each time, always keeping the obtained description in sync with the -remote :term:`Provider`. If our example remote service was extended to have a third :term:`WPS` process, it would -immediately and transparently be reflected in :ref:`GetCapabilities ` +return the result accordingly. On the other hand, a :ref:`proc_wps_12` reference would be parsed and saved locally with +the response *at the time of deployment*. This means that a deployed :ref:`proc_wps_12` reference would act as +a *snapshot* of the reference :term:`Process` (which could become out-of-sync), while :ref:`proc_remote_provider` will +dynamically update according to the re-fetched response from the remote service each time, always keeping the obtained +description in sync with the remote :term:`Provider`. If our example remote service was extended to have a third +:term:`WPS` process, it would immediately and transparently be reflected in :ref:`GetCapabilities ` and :ref:`DescribeProcess ` retrieved by `Weaver` on `Providers`_-scoped requests without any change -to the registered :term:`Provider` definition. This would not be the case for the `WPS-1/2`_ reference that would need -a manual update (i.e.: deploy the third :term:`Process` to register it in `Weaver`). +to the registered :term:`Provider` definition. This would not be the case for the :ref:`proc_wps_12` reference that +would need a manual update (i.e.: deploy the third :term:`Process` to register it in `Weaver`). .. _`Providers`: https://pavics-weaver.readthedocs.io/en/latest/api.html#tag/Providers @@ -340,7 +319,7 @@ following request (`DescribeProviderProcess`_): .. warning:: - API requests scoped under `Providers`_ are `Weaver`-specific implementation. These are not part of |ogc-proc-api|_ + API requests scoped under `Providers`_ are `Weaver`-specific implementation. These are not part of |ogc-api-proc|_ specification. @@ -382,11 +361,12 @@ result in this process to become available for following steps. After deployment and visibility preconditions have been met, the corresponding process should become available through :ref:`DescribeProcess ` requests and other routes that depend on an existing process. -Note that when a process is deployed using the `WPS-REST`_ interface, it also becomes available through the `WPS-1/2`_ -interface with the same identifier and definition. Because of compatibility limitations, some parameters in the -`WPS-1/2`_ side might not be perfectly mapped to the equivalent or adjusted `WPS-REST`_ interface, although this -concerns mostly only new features such as :term:`Job` status monitoring. For most traditional use cases, properties -are mapped between the two interfaces, but it is recommended to use the `WPS-REST`_ one because of the added features. +Note that when a process is deployed using the :ref:`proc_wps_rest` interface, it also becomes available through the +:ref:`proc_wps_12` interface with the same identifier and definition. Because of compatibility limitations, some +parameters in the :ref:`proc_wps_12` side might not be perfectly mapped to the equivalent or adjusted +:ref:`proc_wps_rest` interface, although this concerns mostly only new features such as :term:`Job` status monitoring. +For most traditional use cases, properties are mapped between the two interfaces, but it is recommended to use the +:ref:`proc_wps_rest` one because of the added features. .. seealso:: Please refer to :ref:`application-package` chapter for any additional parameters that can be @@ -877,8 +857,9 @@ In this case, it becomes the responsibility of this remote instance to handle th avoids potential problems such as if `Weaver` as :term:`EMS` doesn't have authorized access to a link that only the target :term:`ADES` would have access to. -When :term:`CWL` package defines ``WPS1Requirement`` under ``hints`` for corresponding `WPS-1/2`_ remote processes -being monitored by `Weaver`, it will skip fetching of |http_scheme|-based references since that would otherwise lead +When :term:`CWL` package defines ``WPS1Requirement`` under ``hints`` for corresponding :ref:`proc_wps_12` remote +processes being monitored by `Weaver` (see also :ref:`app_pkg_wps1`), +it will skip fetching of |http_scheme|-based references since that would otherwise lead to useless double downloads (one on `Weaver` and the other on the :term:`WPS` side). It is the same in situation for ``ESGF-CWTRequirement`` employed for `ESGF-CWT`_ processes. Because these processes do not always support :term:`S3` buckets, and because `Weaver` supports many variants of :term:`S3` reference formats, it will first fetch the :term:`S3` @@ -927,57 +908,57 @@ combinations. :name: table-file-type-handling :align: center - +-----------+-----------------------------------------+---------------+-------------------------------------------+ - | |cfg| | Process Type | File Scheme | Applied Operation | - +===========+=========================================+===============+===========================================+ - | |any| | |any| | |os_scheme| | Query and re-process [#openseach]_ | - +-----------+-----------------------------------------+---------------+-------------------------------------------+ - | |ADES| | - `WPS-1/2`_ | |file_scheme| | Convert to |http_scheme| [#file2http]_ | - | | - `ESGF-CWT`_ +---------------+-------------------------------------------+ - | | - `WPS-REST`_ (remote) [#wps3]_ | |http_scheme| | Nothing (unmodified) | - | | - :ref:`proc_remote_provider` +---------------+-------------------------------------------+ - | | | |s3_scheme| | Fetch and convert to |http_scheme| [#s3]_ | - | | +---------------+-------------------------------------------+ - | | | |vault_ref| | Convert to |http_scheme| [#vault2http]_ | - | +-----------------------------------------+---------------+-------------------------------------------+ - | | - `WPS-REST`_ (`CWL`) [#wps3]_ | |file_scheme| | Nothing (file already local) | - | | +---------------+-------------------------------------------+ - | | | |http_scheme| | Fetch and convert to |file_scheme| | - | | +---------------+ | - | | | |s3_scheme| | | - | | +---------------+-------------------------------------------+ - | | | |vault_ref| | Convert to |file_scheme| | - +-----------+-----------------------------------------+---------------+-------------------------------------------+ - | |EMS| | - |any| (types listed above for |ADES|) | |file_scheme| | Convert to |http_scheme| [#file2http]_ | - | | - `Workflow`_ (`CWL`) [#wf]_ +---------------+-------------------------------------------+ - | | | |http_scheme| | Nothing (unmodified, step will handle it) | - | | +---------------+ | - | | | |s3_scheme| | | - | | +---------------+ | - | | | |vault_ref| | | - +-----------+-----------------------------------------+---------------+-------------------------------------------+ - | |HYBRID| | - `WPS-1/2`_ | |file_scheme| | Convert to |http_scheme| [#file2http]_ | - | | - `ESGF-CWT`_ +---------------+-------------------------------------------+ - | | - `WPS-REST`_ (remote) [#wps3]_ | |http_scheme| | Nothing (unmodified) | - | | - :ref:`proc_remote_provider` +---------------+-------------------------------------------+ - | | | |s3_scheme| | Fetch and convert to |http_scheme| [#s3]_ | - | | *Note*: |HYBRID| assumes |ADES| role +---------------+-------------------------------------------+ - | | (remote processes) | |vault_ref| | Convert to |http_scheme| [#vault2http]_ | - | +-----------------------------------------+---------------+-------------------------------------------+ - | | - `WPS-REST`_ (`CWL`) [#wps3]_ | |file_scheme| | Nothing (unmodified) | - | | +---------------+-------------------------------------------+ - | | | |http_scheme| | Fetch and convert to |file_scheme| | - | | *Note*: |HYBRID| assumes |ADES| role +---------------+-------------------------------------------+ - | | (local processes) | |vault_ref| | Convert to |file_scheme| [#vault2file]_ | - | +-----------------------------------------+---------------+-------------------------------------------+ - | | - `Workflow`_ (`CWL`) [#wf]_ | |file_scheme| | Convert to |http_scheme| [#file2http]_ | - | | +---------------+-------------------------------------------+ - | | | |http_scheme| | Nothing (unmodified, step will handle it) | - | | +---------------+ | - | | | |s3_scheme| | | - | | +---------------+ | - | | *Note*: |HYBRID| assumes |EMS| role | |vault_ref| | | - +-----------+-----------------------------------------+---------------+-------------------------------------------+ + +-----------+------------------------------------------+---------------+-------------------------------------------+ + | |cfg| | Process Type | File Scheme | Applied Operation | + +===========+==========================================+===============+===========================================+ + | |any| | |any| | |os_scheme| | Query and re-process [#openseach]_ | + +-----------+------------------------------------------+---------------+-------------------------------------------+ + | |ADES| | - :ref:`proc_wps_12` | |file_scheme| | Convert to |http_scheme| [#file2http]_ | + | | - :ref:`proc_esgf_cwt` +---------------+-------------------------------------------+ + | | - :ref:`proc_wps_rest` (remote) [#wps3]_ | |http_scheme| | Nothing (unmodified) | + | | - :ref:`proc_remote_provider` +---------------+-------------------------------------------+ + | | | |s3_scheme| | Fetch and convert to |http_scheme| [#s3]_ | + | | +---------------+-------------------------------------------+ + | | | |vault_ref| | Convert to |http_scheme| [#vault2http]_ | + | +------------------------------------------+---------------+-------------------------------------------+ + | | - :ref:`proc_wps_rest` (`CWL`) [#wps3]_ | |file_scheme| | Nothing (file already local) | + | | +---------------+-------------------------------------------+ + | | | |http_scheme| | Fetch and convert to |file_scheme| | + | | +---------------+ | + | | | |s3_scheme| | | + | | +---------------+-------------------------------------------+ + | | | |vault_ref| | Convert to |file_scheme| | + +-----------+------------------------------------------+---------------+-------------------------------------------+ + | |EMS| | - |any| (types listed above for |ADES|) | |file_scheme| | Convert to |http_scheme| [#file2http]_ | + | | - :ref:`proc_workflow` (`CWL`) [#wf]_ +---------------+-------------------------------------------+ + | | | |http_scheme| | Nothing (unmodified, step will handle it) | + | | +---------------+ | + | | | |s3_scheme| | | + | | +---------------+ | + | | | |vault_ref| | | + +-----------+------------------------------------------+---------------+-------------------------------------------+ + | |HYBRID| | - :ref:`proc_wps_12` | |file_scheme| | Convert to |http_scheme| [#file2http]_ | + | | - :ref:`proc_esgf_cwt` +---------------+-------------------------------------------+ + | | - :ref:`proc_wps_rest` (remote) [#wps3]_ | |http_scheme| | Nothing (unmodified) | + | | - :ref:`proc_remote_provider` +---------------+-------------------------------------------+ + | | | |s3_scheme| | Fetch and convert to |http_scheme| [#s3]_ | + | | *Note*: |HYBRID| assumes |ADES| role +---------------+-------------------------------------------+ + | | (remote processes) | |vault_ref| | Convert to |http_scheme| [#vault2http]_ | + | +------------------------------------------+---------------+-------------------------------------------+ + | | - :ref:`proc_wps_rest` (`CWL`) [#wps3]_ | |file_scheme| | Nothing (unmodified) | + | | +---------------+-------------------------------------------+ + | | | |http_scheme| | Fetch and convert to |file_scheme| | + | | *Note*: |HYBRID| assumes |ADES| role +---------------+-------------------------------------------+ + | | (local processes) | |vault_ref| | Convert to |file_scheme| [#vault2file]_ | + | +------------------------------------------+---------------+-------------------------------------------+ + | | - :ref:`proc_workflow` (`CWL`) [#wf]_ | |file_scheme| | Convert to |http_scheme| [#file2http]_ | + | | +---------------+-------------------------------------------+ + | | | |http_scheme| | Nothing (unmodified, step will handle it) | + | | +---------------+ | + | | | |s3_scheme| | | + | | +---------------+ | + | | *Note*: |HYBRID| assumes |EMS| role | |vault_ref| | | + +-----------+------------------------------------------+---------------+-------------------------------------------+ .. |any| replace:: ** .. |cfg| replace:: Configuration @@ -1009,12 +990,12 @@ combinations. where `Weaver` is hosted or another service takes care of this task. .. [#wps3] - When the process refers to a remote :ref:`WPS-REST` process (i.e.: remote :term:`WPS` instance that supports + When the process refers to a remote :ref:`proc_wps_rest` process (i.e.: remote :term:`WPS` instance that supports REST bindings but that is not necessarily an :term:`ADES`), `Weaver` simply *wraps* and monitors its remote execution, therefore files are handled just as for any other type of remote :term:`WPS`-like servers. When the process contains an actual :term:`CWL` :ref:`Application Package` that defines a ``CommandLineTool`` class (including applications with :term:`Docker` image requirement), files are fetched as it will be executed locally. - See :ref:`CWL CommandLineTool`, :ref:`WPS-REST` and :ref:`Remote Provider` for further details. + See :ref:`CWL CommandLineTool`, :ref:`proc_wps_rest` and :ref:`Remote Provider` for further details. .. [#s3] When an |s3_scheme| file is fetched, is gets downloaded to a temporary |file_scheme| location, which is **NOT** @@ -1022,9 +1003,9 @@ combinations. support :term:`S3` references, only then the file gets converted as in [#file2http]_. .. [#vault2file] - When a |vault_ref| file is specified, the local :ref:`WPS-REST` process can make use of it directly. The file is - therefore retrieved from the :term:`Vault` using the provided UUID and access token to be passed to the application. - See :ref:`file_vault_inputs` and :ref:`vault_upload` for more details. + When a |vault_ref| file is specified, the local :ref:`proc_wps_rest` process can make use of it directly. + The file is therefore retrieved from the :term:`Vault` using the provided UUID and access token to be passed + to the application. See :ref:`file_vault_inputs` and :ref:`vault_upload` for more details. .. [#vault2http] When a |vault_ref| file is specified, the remote process needs to access it using the hosted :term:`Vault` endpoint. @@ -1035,7 +1016,7 @@ combinations. .. [#wf] Workflows are only available on :term:`EMS` and :term:`HYBRID` instances. Since they chain processes, no fetch is needed as the sub-step process will do it instead as needed. See :ref:`Workflow` process as well - as :ref:`CWL Workflow` for more details. + as :ref:`app_pkg_workflow` for more details. .. todo:: method to indicate explicit fetch to override these? (https://github.com/crim-ca/weaver/issues/183) @@ -1521,7 +1502,7 @@ and/or ``logging`` operation for scripts or :term:`Docker` images executed throu .. note:: :term:`Job` logs and exceptions are a `Weaver`-specific implementation. - They are not part of traditional |ogc-proc-api|_. + They are not part of traditional |ogc-api-proc|_. A minimalistic example of logging output is presented below. This can be retrieved using |log-req|_ request, at any moment during :term:`Job` execution (with logs up to that point in time) or after its completion (for full output). @@ -1684,6 +1665,6 @@ Workflow (Chaining Step Processes) .. seealso:: - - :ref:`CWL Workflow` + - :ref:`app_pkg_workflow` - :ref:`proc_workflow_ops` - :ref:`Workflow` process type diff --git a/docs/source/references.rst b/docs/source/references.rst index 65b5068d6..ac73dabb1 100644 --- a/docs/source/references.rst +++ b/docs/source/references.rst @@ -36,6 +36,14 @@ .. _cwl-workdir-ex: https://www.commonwl.org/user_guide/15-staging/ .. |cwl-docker-req| replace:: DockerRequirement .. _cwl-docker-req: https://www.commonwl.org/v1.1/CommandLineTool.html#DockerRequirement +.. FIXME apply official CWL specification location +.. https://github.com/common-workflow-language/cwl-v1.2/issues/212 +.. |cwl-cuda-req| replace:: cwltool:CUDARequirement +.. _cwl-cuda-req: https://doc.arvados.org/v2.4/user/cwl/cwl-extensions.html#CUDARequirement +.. |cwl-resource-req| replace:: ResourceRequirement +.. _cwl-resource-req: https://www.commonwl.org/v1.2/CommandLineTool.html#ResourceRequirement +.. |cwl-network-req| replace:: NetworkAccess +.. _cwl-network-req: https://www.commonwl.org/v1.2/CommandLineTool.html#NetworkAccess .. |cwl-io-map| replace:: CWL Mapping .. _cwl-io-map: https://www.commonwl.org/v1.1/CommandLineTool.html#map .. |cwl-io-type| replace:: CWLType Symbols @@ -60,8 +68,8 @@ .. _ogc: https://www.ogc.org/ .. |ogc-home| replace:: |ogc| Homepage .. _ogc-home: `ogc`_ -.. |ogc-proc-api| replace:: OGC API - Processes -.. _ogc-proc-api: https://github.com/opengeospatial/ogcapi-processes +.. |ogc-api-proc| replace:: OGC API - Processes +.. _ogc-api-proc: https://github.com/opengeospatial/ogcapi-processes .. |ogc-exec-sync-responses| replace:: OGC API - Processes, Responses (sync) .. _ogc-exec-sync-responses: https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response .. |ogc-exec-async-responses| replace:: OGC API - Processes, Responses (async) diff --git a/setup.cfg b/setup.cfg index c82adcb70..ca693b92d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,23 +5,23 @@ tag = True tag_name = {new_version} [bumpversion:file:CHANGES.rst] -search = +search = `Unreleased `_ (latest) ======================================================================== -replace = +replace = `Unreleased `_ (latest) ======================================================================== - + Changes: -------- - No change. - + Fixes: ------ - No change. - + .. _changes_{new_version}: - + `{new_version} `_ ({now:%%Y-%%m-%%d}) ======================================================================== @@ -42,14 +42,14 @@ search = LABEL version="{current_version}" replace = LABEL version="{new_version}" [tool:pytest] -addopts = +addopts = --strict-markers --tb=native weaver/ log_cli = false log_level = DEBUG python_files = test_*.py -markers = +markers = cli: mark test as related to CLI operations testbed14: mark test as 'testbed14' validation functional: mark test as functionality validation @@ -80,7 +80,7 @@ targets = . [flake8] ignore = E126,E226,E402,F401,W503,W504 max-line-length = 120 -exclude = +exclude = src, .git, __pycache__, @@ -112,14 +112,14 @@ add_select = D201,D213 branch = true source = ./ include = weaver/* -omit = +omit = setup.py docs/* tests/* *_mako [coverage:report] -exclude_lines = +exclude_lines = pragma: no cover raise OSError raise AssertionError @@ -143,5 +143,6 @@ exclude_lines = raise PackageAuthenticationError raise PackageExecutionError raise PackageNotFound + raise PackageParsingError raise PackageRegistrationError raise PackageTypeError diff --git a/tests/functional/application-packages/ReadFile/deploy.yml b/tests/functional/application-packages/ReadFile/deploy.yml index b5828ed8e..05d4703fb 100644 --- a/tests/functional/application-packages/ReadFile/deploy.yml +++ b/tests/functional/application-packages/ReadFile/deploy.yml @@ -19,5 +19,5 @@ outputTransmission: - reference executionUnit: # note: This does not work by itself! The test suite injects the file dynamically. - - href: "tests/functional/application-packages/CatValue/cat.cwl" + - href: "tests/functional/application-packages/ReadValue/package.cwl" deploymentProfileName: "http://www.opengis.net/profiles/eoc/dockerizedApplication" diff --git a/tests/functional/application-packages/SimulateResourceUsage/deploy.yml b/tests/functional/application-packages/SimulateResourceUsage/deploy.yml new file mode 100644 index 000000000..f3ffd0d5c --- /dev/null +++ b/tests/functional/application-packages/SimulateResourceUsage/deploy.yml @@ -0,0 +1,15 @@ +# YAML representation supported by WeaverClient +processDescription: + id: SimulateResourceUsage + title: Gradually allocate RAM to simulate a process load. + version: "1.0" + keywords: + - test +jobControlOptions: + - async-execute +outputTransmission: + - reference +executionUnit: + # note: This does not work by itself! The test suite injects the file dynamically. + - href: "tests/functional/application-packages/SimulateResourceUsage/package.cwl" +deploymentProfileName: "http://www.opengis.net/profiles/eoc/dockerizedApplication" diff --git a/tests/functional/application-packages/SimulateResourceUsage/job.yml b/tests/functional/application-packages/SimulateResourceUsage/job.yml new file mode 100644 index 000000000..4e1653283 --- /dev/null +++ b/tests/functional/application-packages/SimulateResourceUsage/job.yml @@ -0,0 +1,4 @@ +ram_amount: 4 +ram_chunks: 16 +time_duration: 1 +time_interval: 0.25 diff --git a/tests/functional/application-packages/SimulateResourceUsage/package.cwl b/tests/functional/application-packages/SimulateResourceUsage/package.cwl new file mode 100644 index 000000000..29339c0af --- /dev/null +++ b/tests/functional/application-packages/SimulateResourceUsage/package.cwl @@ -0,0 +1,45 @@ +#!/usr/bin/env cwl-runner +# WARNING: +# This process can generate a large memory load and a very large output file that captures the generated data. +# Even with default ResourceRequirement values, the output can become substantial rapidly. +cwlVersion: "v1.0" +class: CommandLineTool +baseCommand: + - bash + - script.sh +requirements: + DockerRequirement: + dockerPull: debian:stretch-slim + InitialWorkDirRequirement: + listing: + # below script is generated dynamically in the working directory, and then called by the base command + # reference: https://unix.stackexchange.com/a/254976/288952 + - entryname: script.sh + entry: | + echo "Will allocate RAM chunks of $(inputs.ram_chunks) MiB." + echo "Will allocate RAM chunks in increments up to $(inputs.ram_amount) times." + echo "Will maintain allocated RAM load for $(inputs.time_duration)s for each increment." + echo "Will wait $(inputs.time_interval)s between each allocation." + echo "Begin allocating memory..." + for index in \$(seq $(inputs.ram_amount)); do + echo "Allocating \$index x $(inputs.ram_chunks) MiB for $(inputs.time_duration)s..." + cat <( None @@ -2028,7 +2031,7 @@ def test_execute_job_with_inline_input_values(self): except Exception as exc: print(exc) raise - """) + """) } ] } @@ -2499,6 +2502,106 @@ def test_execute_with_directory_output(self): output_dir_files = {os.path.join(root, file) for root, _, files in os.walk(out_dir) for file in files} assert output_dir_files == expect_out_files + @parameterized.expand([ + # all values in MiB / seconds accordingly + (False, 48, 96, 16, 3, 0.25, 0.25, {}), + (False, 48, 36, 4, 4, 0.25, 0.25, {CWL_REQUIREMENT_RESOURCE: {"ramMax": 52}}), + # FIXME: ensure ResourceRequirements are effective (https://github.com/crim-ca/weaver/issues/138) + # (True, 48, 96, 4, 2, 0.25, 0.25, {CWL_REQUIREMENT_RESOURCE: {"ramMax": 2}}), # FIXME: hangs forever + # (True, 48, 96, 4, 2, 0.25, 0.25, {CWL_REQUIREMENT_RESOURCE: {"outdirMax": 2}}), # FIXME: not failing + (False, 48, 12, 4, 2, 0.25, 0.25, {CWL_REQUIREMENT_RESOURCE: {"outdirMax": 16}}), + ]) + def test_execute_with_resource_requirement(self, + expect_fail, # type: bool + expect_ram_min_mb, # type: int + expect_size_min_mb, # type: int + ram_chunks_mb, # type: int + ram_amount_mb, # type: int + time_duration_s, # type: Number + time_interval_s, # type: Number + resource_requirement, # type: CWL_RequirementsDict + ): # type: (...) -> None + """ + Test that :data:`CWL_REQUIREMENT_RESOURCE` are considered for :term:`Process` execution. + + .. note:: + This test also conveniently serves for testing how large :term:`Job` logs are handled by the storage. + Because of the large output produced and captured in the logs, saving them directly to the database + is not supported. The :term:`Job` should therefore filter problematic entries to the log. + """ + proc = "SimulateResourceUsage" + body = self.retrieve_payload(proc, "deploy", local=True) + pkg = self.retrieve_payload(proc, "package", local=True) + pkg["requirements"].update(resource_requirement) + body["executionUnit"] = [{"unit": pkg}] + self.deploy_process(body, describe_schema=ProcessSchema.OGC) + + exec_body = { + "mode": ExecuteMode.ASYNC, + "response": ExecuteResponse.DOCUMENT, + "inputs": { + "ram_chunks": ram_chunks_mb, + "ram_amount": ram_amount_mb, + "time_duration": time_duration_s, + "time_interval": time_interval_s, + }, + "outputs": [{"id": "output", "transmissionMode": ExecuteTransmissionMode.REFERENCE}] + } + out_dir = None + try: + with contextlib.ExitStack() as stack: + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + proc_url = f"/processes/{proc}/jobs" + resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5, + data=exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + status_url = resp.json["location"] + job_id = resp.json["jobID"] + wps_dir = get_wps_output_dir(self.settings) + out_dir = os.path.join(wps_dir, job_id, "output") + + results = self.monitor_job(status_url, expect_failed=expect_fail) + assert "output" in results + out_log = os.path.join(out_dir, "stdout.log") + assert os.path.isfile(out_log) + assert os.stat(out_log).st_size >= expect_size_min_mb * 2**20 + with open(out_log, mode="r", encoding="utf-8") as out_file: + output = (line for line in out_file.readlines() if line[0] != "\0") + output = list(output) + assert all( + any(f"Allocating {i} x {ram_chunks_mb} MiB" in line for line in output) + for i in range(1, ram_amount_mb + 1) + ) + + log_url = f"{status_url}/logs" + log_resp = mocked_sub_requests(self.app, "get", log_url, timeout=5, + headers=self.json_headers, only_local=True) + job_logs = log_resp.json + assert all( + any(f"Allocating {i} x {ram_chunks_mb} MiB" in line for line in job_logs) + for i in range(1, ram_amount_mb + 1) + ) + assert all( + any( + f"" + in line for line in job_logs + ) + for i in range(1, ram_amount_mb + 1) + ) + + stat_url = f"{status_url}/statistics" + stat_resp = mocked_sub_requests(self.app, "get", stat_url, timeout=5, + headers=self.json_headers, only_local=True) + job_stats = stat_resp.json + assert all( + job_stats["process"][mem] > expect_ram_min_mb + for mem in ["rssBytes", "ussBytes", "vmsBytes"] + ) + finally: + if out_dir: + shutil.rmtree(out_dir, ignore_errors=True) + # FIXME: create a real async test (threading/multiprocess) to evaluate this correctly def test_dismiss_job(self): """ diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 76cc0fd80..04a80e3a0 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -29,6 +29,7 @@ from weaver.datatype import Job from weaver.formats import ContentType from weaver.processes.constants import ProcessSchema +from weaver.processes.wps_package import get_application_requirement from weaver.status import Status from weaver.utils import fully_qualified_name, load_file from weaver.visibility import Visibility @@ -314,19 +315,34 @@ def describe_process(cls, process_id, describe_schema=ProcessSchema.OGC): @classmethod @overload - def deploy_process(cls, payload, process_id=None, describe_schema=ProcessSchema.OGC, mock_requests_only_local=True): - # type: (JSON, Optional[str], Literal[ProcessSchema.OGC], bool) -> Tuple[ProcessDescriptionMapping, CWL] # noqa + def deploy_process(cls, + payload, # type: JSON + process_id=None, # type: Optional[str] + describe_schema=ProcessSchema.OGC, # type: Literal[ProcessSchema.OGC] # noqa + mock_requests_only_local=True, # type: bool + add_package_requirement=True, # type: bool + ): # type: (...) -> Tuple[ProcessDescriptionMapping, CWL] ... @classmethod @overload - def deploy_process(cls, payload, process_id=None, describe_schema=ProcessSchema.OGC, mock_requests_only_local=True): - # type: (JSON, Optional[str], Literal[ProcessSchema.OLD], bool) -> Tuple[ProcessDescriptionListing, CWL] # noqa + def deploy_process(cls, + payload, # type: JSON + process_id=None, # type: Optional[str] + describe_schema=ProcessSchema.OGC, # type: Literal[ProcessSchema.OLD] # noqa + mock_requests_only_local=True, # type: bool + add_package_requirement=True, # type: bool + ): # type: (...) -> Tuple[ProcessDescriptionListing, CWL] ... @classmethod - def deploy_process(cls, payload, process_id=None, describe_schema=ProcessSchema.OGC, mock_requests_only_local=True): - # type: (JSON, Optional[str], ProcessSchema, bool) -> Tuple[ProcessDescription, CWL] + def deploy_process(cls, + payload, # type: JSON + process_id=None, # type: Optional[str] + describe_schema=ProcessSchema.OGC, # type: ProcessSchema + mock_requests_only_local=True, # type: bool + add_package_requirement=True, # type: bool + ): # type: (...) -> Tuple[ProcessDescription, CWL] """ Deploys a process with :paramref:`payload`. @@ -348,6 +364,16 @@ def deploy_process(cls, payload, process_id=None, describe_schema=ProcessSchema. exec_unit = load_file(os.path.join(WEAVER_ROOT_DIR, exec_href)) exec_list[0]["unit"] = exec_unit exec_list[0].pop("href") + exec_unit = exec_list[0].get("unit") # type: CWL + if exec_unit and add_package_requirement: + app_req = get_application_requirement(exec_unit, validate=False, required=False) + if not app_req["class"]: + exec_unit.setdefault("requirements", {}) + reqs = exec_unit["requirements"] + if isinstance(reqs, list): + reqs.append({"class": "DockerRequirement", "dockerPull": "alpine:latest"}) + else: + reqs.update({"DockerRequirement": {"dockerPull": "alpine:latest"}}) resp = mocked_sub_requests(cls.app, "post_json", "/processes", data=payload, headers=cls.json_headers, only_local=mock_requests_only_local) assert resp.status_code == 201, f"Expected successful deployment.\nError:\n{resp.text}" diff --git a/tests/processes/test_constants.py b/tests/processes/test_constants.py new file mode 100644 index 000000000..9b8880243 --- /dev/null +++ b/tests/processes/test_constants.py @@ -0,0 +1,33 @@ +import pytest + +from weaver.processes.constants import CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS + + +def test_cuda_default_parameters_immutable(): + with pytest.raises(TypeError): + CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS["value"] = 1 + assert "value" not in CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS + + key = list(CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS)[0] + before = CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS[key] + with pytest.raises(TypeError): + CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS[key] = "test" + assert CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS[key] == before + + +@pytest.mark.parametrize("parameters_copy", [ + dict(CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS), + CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS.copy(), +]) +def test_cuda_default_parameters_copy_mutable(parameters_copy): + assert parameters_copy is not CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS + key = list(parameters_copy)[0] + try: + parameters_copy["value"] = "test" + parameters_copy.pop(key) + except TypeError: + pytest.fail("Only original mapping should be immutable, copy should be permitted updates.") + assert "value" not in CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS + assert parameters_copy["value"] == "test" + assert key in CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS + assert key not in parameters_copy diff --git a/tests/processes/test_convert.py b/tests/processes/test_convert.py index 5e13db9c1..0ede11a29 100644 --- a/tests/processes/test_convert.py +++ b/tests/processes/test_convert.py @@ -7,6 +7,7 @@ import tempfile from collections import OrderedDict from copy import deepcopy +from typing import TYPE_CHECKING import pytest import yaml @@ -17,6 +18,7 @@ from pywps.inout.outputs import ComplexOutput from pywps.validator.mode import MODE +from tests.utils import assert_equal_any_order from weaver.exceptions import PackageTypeError from weaver.formats import IANA_NAMESPACE_DEFINITION, OGC_MAPPING, OGC_NAMESPACE_DEFINITION, ContentType from weaver.processes.constants import ( @@ -56,6 +58,9 @@ ) from weaver.utils import null +if TYPE_CHECKING: + from typing import List + class ObjectWithEqProperty(object): """ @@ -737,16 +742,6 @@ def test_is_cwl_complex_type_not_files(test_type): assert not is_cwl_complex_type(io_info) -def assert_formats_equal_any_order(format_result, format_expect): - assert len(format_result) == len(format_expect), "Expected formats sizes mismatch" - for r_fmt in format_result: - for e_fmt in format_expect: - if r_fmt.json == e_fmt.json: - format_expect.remove(e_fmt) - break - assert not format_expect, f"Not all expected formats matched {[fmt.json for fmt in format_expect]}" - - def test_wps2json_io_default_format(): # must create object with matching data/supported formats or error otherwise wps_io = ComplexInput("test", "", supported_formats=[DEFAULT_FORMAT], data_format=DEFAULT_FORMAT) @@ -788,6 +783,13 @@ def test_merge_io_formats_no_wps(): assert res_fmt[0] is DEFAULT_FORMAT +def assert_formats_equal_any_order(result_formats, expect_formats): + # type: (List[Format], List[Format]) -> None + assert_equal_any_order(result_formats, expect_formats, + comparer=lambda res, exp: res.json == exp.json, + formatter=lambda fmt: str(fmt.json)) + + def test_merge_io_formats_with_wps_and_default_cwl(): wps_fmt = [Format(ContentType.APP_NETCDF)] cwl_fmt = [DEFAULT_FORMAT] diff --git a/tests/processes/test_wps_package.py b/tests/processes/test_wps_package.py index f45bf5620..e8399cda5 100644 --- a/tests/processes/test_wps_package.py +++ b/tests/processes/test_wps_package.py @@ -12,14 +12,30 @@ import shutil import sys import tempfile +from typing import TYPE_CHECKING import pytest +from tests.utils import assert_equal_any_order from weaver.datatype import Process from weaver.exceptions import PackageExecutionError -from weaver.processes.wps_package import WpsPackage +from weaver.processes.constants import ( + CWL_REQUIREMENT_APP_DOCKER, + CWL_REQUIREMENT_APP_DOCKER_GPU, + CWL_REQUIREMENT_CUDA, + CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS +) +from weaver.processes.wps_package import WpsPackage, _update_package_compatibility from weaver.wps.service import WorkerRequest +if TYPE_CHECKING: + from typing import Dict, TypeVar + + KT = TypeVar("KT") + VT_co = TypeVar("VT_co", covariant=True) + + from weaver.typedefs import CWL + # pylint: disable=R1729 # ignore non-generator representation employed for displaying test log results @@ -40,6 +56,9 @@ def status_location(self): def status_location(self, value): pass + def setup_docker_image(self): + return None + class MockWpsRequest(WorkerRequest): def __init__(self, process_id=None, with_message_input=True): @@ -86,6 +105,7 @@ def __init__(self, shell_command, arguments=None, with_message_input=True): "cwlVersion": "v1.0", "class": "CommandLineTool", "baseCommand": shell_command, + "requirements": {"DockerRequirement": {"dockerPull": "alpine:latest"}}, "inputs": {}, "outputs": {} } @@ -149,7 +169,7 @@ def test_stdout_stderr_logging_for_commandline_tool_success(caplog): # cwltool call with reference to the command and stdout/stderr redirects assert re.match( r".*" - rf"cwltool.*job {process.id}.*\$ echo \\\n" + rf"cwltool:job.* \[job {process.id}\].*echo \\\n" r"\s+'Dummy message' \> [\w\-/\.]+/stdout\.log 2\> [\w\-/\.]+/stderr\.log\n" r".*", log_data, @@ -230,10 +250,138 @@ def test_stdout_stderr_logging_for_commandline_tool_exception(caplog): with open(expect_log, mode="r", encoding="utf-8") as file: job_err = file.read() log_err = stderr.getvalue() + "\n" + caplog.text + "\n" + job_err - assert "Could not retrieve any internal application log." in log_err, ( - "Since command did not run, nothing captured is expected" - ) + assert "Package completed with errors." in log_err assert "Traceback (most recent call last):" in log_err assert "weaver.processes.wps_package|mock-process" in log_err else: pytest.fail("\"wps_package._handler()\" was expected to throw \"PackageExecutionError\" exception") + + +def _combine(dict1, dict2): + # type: (Dict[KT, VT_co], Dict[KT, VT_co]) -> Dict[KT, VT_co] + dict1 = dict1.copy() + dict1.update(dict2) + return dict1 + + +def assert_equal_requirements_any_order(result, expected): + for field in ["hints", "requirements"]: + if field in expected: + if isinstance(expected[field], dict): + assert result[field] == expected[field] + else: + assert_equal_any_order(result[field], expected[field]) + else: + assert field not in result + + +@pytest.mark.parametrize("original, expected", [ + ( + {"requirements": {CWL_REQUIREMENT_APP_DOCKER_GPU: {"dockerPull": "python:3.7-alpine"}}}, + {"requirements": {CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}, + CWL_REQUIREMENT_CUDA: CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS}}, + ), + ( + {"requirements": {CWL_REQUIREMENT_APP_DOCKER_GPU: {"dockerPull": "python:3.7-alpine"}, + CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8}}}, + {"requirements": {CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}, + CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8}}}, + ), + ( + {"requirements": [{"class": CWL_REQUIREMENT_APP_DOCKER_GPU, "dockerPull": "python:3.7-alpine"}]}, + {"requirements": [{"class": CWL_REQUIREMENT_APP_DOCKER, "dockerPull": "python:3.7-alpine"}, + _combine({"class": CWL_REQUIREMENT_CUDA}, CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS)]}, + ), + ( + {"requirements": [{"class": CWL_REQUIREMENT_APP_DOCKER_GPU, "dockerPull": "python:3.7-alpine"}, + _combine({"class": CWL_REQUIREMENT_CUDA}, + {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8})]}, + {"requirements": [{"class": CWL_REQUIREMENT_APP_DOCKER, "dockerPull": "python:3.7-alpine"}, + _combine({"class": CWL_REQUIREMENT_CUDA}, + {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8})]}, + ), + ( + {"hints": {CWL_REQUIREMENT_APP_DOCKER_GPU: {"dockerPull": "python:3.7-alpine"}}}, + {"hints": {CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}, + CWL_REQUIREMENT_CUDA: CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS}}, + ), + ( + {"hints": {CWL_REQUIREMENT_APP_DOCKER_GPU: {"dockerPull": "python:3.7-alpine"}, + CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8}}}, + {"hints": {CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}, + CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8}}}, + ), + ( + {"hints": [{"class": CWL_REQUIREMENT_APP_DOCKER_GPU, "dockerPull": "python:3.7-alpine"}]}, + {"hints": [{"class": CWL_REQUIREMENT_APP_DOCKER, "dockerPull": "python:3.7-alpine"}, + _combine({"class": CWL_REQUIREMENT_CUDA}, CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS)]}, + ), + ( + {"hints": [{"class": CWL_REQUIREMENT_APP_DOCKER_GPU, "dockerPull": "python:3.7-alpine"}, + _combine({"class": CWL_REQUIREMENT_CUDA}, + {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8})]}, + {"hints": [{"class": CWL_REQUIREMENT_APP_DOCKER, "dockerPull": "python:3.7-alpine"}, + _combine({"class": CWL_REQUIREMENT_CUDA}, + {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8})]}, + ), + ( + {"requirements": {CWL_REQUIREMENT_APP_DOCKER_GPU: {"dockerPull": "python:3.7-alpine"}}, + "hints": {CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8}}}, + {"requirements": {CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}}, + "hints": {CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8}}}, + ), + ( + {"hints": {CWL_REQUIREMENT_APP_DOCKER_GPU: {"dockerPull": "python:3.7-alpine"}}, + "requirements": {CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8}}}, + {"hints": {CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}}, + "requirements": {CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8}}}, + ), + ( + {"requirements": [{"class": CWL_REQUIREMENT_APP_DOCKER_GPU, "dockerPull": "python:3.7-alpine"}], + "hints": [_combine({"class": CWL_REQUIREMENT_CUDA}, + {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8})]}, + {"requirements": [{"class": CWL_REQUIREMENT_APP_DOCKER, "dockerPull": "python:3.7-alpine"}], + "hints": [_combine({"class": CWL_REQUIREMENT_CUDA}, + {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8})]}, + ), + ( + {"hints": [{"class": CWL_REQUIREMENT_APP_DOCKER_GPU, "dockerPull": "python:3.7-alpine"}], + "requirements": [_combine({"class": CWL_REQUIREMENT_CUDA}, + {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8})]}, + {"hints": [{"class": CWL_REQUIREMENT_APP_DOCKER, "dockerPull": "python:3.7-alpine"}], + "requirements": [_combine({"class": CWL_REQUIREMENT_CUDA}, + {"custom": 1, "cudaVersionMin": "11.0", "cudaDeviceCountMin": 8})]}, + ), + # cases that should trigger no change in definition + ( + {"requirements": {CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}}}, + {"requirements": {CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}}}, + ), + ( + {"requirements": {CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaDeviceCountMin": 8}}}, + {"requirements": {CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaDeviceCountMin": 8}}}, + ), + ( + {"requirements": [_combine({"class": CWL_REQUIREMENT_CUDA}, {"custom": 1, "cudaDeviceCountMin": 8})]}, + {"requirements": [_combine({"class": CWL_REQUIREMENT_CUDA}, {"custom": 1, "cudaDeviceCountMin": 8})]}, + ), + ( + {"hints": {CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}}}, + {"hints": {CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}}}, + ), + ( + {"hints": {CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaDeviceCountMin": 8}}}, + {"hints": {CWL_REQUIREMENT_CUDA: {"custom": 1, "cudaDeviceCountMin": 8}}}, + ), + ( + {"hints": [_combine({"class": CWL_REQUIREMENT_CUDA}, {"custom": 1, "cudaDeviceCountMin": 8})]}, + {"hints": [_combine({"class": CWL_REQUIREMENT_CUDA}, {"custom": 1, "cudaDeviceCountMin": 8})]}, + ) +]) +def test_update_package_compatibility(original, expected): + # type: (CWL, CWL) -> None + cwl_base = {"cwlVersion": "v1.2", "class": "CommandLineTool"} + original = _combine(cwl_base, original) + expected = _combine(cwl_base, expected) + test_cwl = _update_package_compatibility(original) + assert_equal_requirements_any_order(test_cwl, expected) diff --git a/tests/utils.py b/tests/utils.py index 45b8e8a2d..58495c9f4 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -100,6 +100,8 @@ CommandType = Callable[[Union[str, Tuple[str]]], int] + CompareType = TypeVar("CompareType") + LOGGER = logging.getLogger(".".join([__package__, __name__])) MOCK_AWS_REGION = "ca-central-1" # type: RegionName @@ -1148,8 +1150,17 @@ def mocked_process_package(): """ Provides mocks that bypasses execution when calling :module:`weaver.processes.wps_package` functions. """ + from weaver.processes.wps_package import get_application_requirement as real_get_application_requirement + + def mock_get_app_req(package, **kwargs): + if package.get("class") == "test": + kwargs["required"] = False + kwargs["validate"] = False + return real_get_application_requirement(package, **kwargs) + return ( mock.patch("weaver.processes.utils.load_package_file", return_value={"class": "test"}), + mock.patch("weaver.processes.wps_package.get_application_requirement", side_effect=mock_get_app_req), mock.patch("weaver.processes.wps_package.load_package_file", return_value={"class": "test"}), mock.patch("weaver.processes.wps_package._load_package_content", return_value=(None, "test", None)), mock.patch("weaver.processes.wps_package._get_package_inputs_outputs", return_value=(None, None)), @@ -1415,3 +1426,26 @@ def setup_test_file_hierarchy(test_paths, test_root_dir, test_data="data"): listing.extend((os.path.join(path, dir_path) + "/" for dir_path in dirs)) listing.extend((os.path.join(path, file_name) for file_name in files)) return sorted(listing) + + +def assert_equal_any_order(result, # type: Iterable[Any] + expect, # type: Iterable[Any] + comparer=None, # type: Optional[Callable[[CompareType, CompareType], bool]] + formatter=str, # type: Optional[Callable[[CompareType], str]] + ): # type: (...) -> None + if not callable(comparer): + def comparer(_res, _exp): # pylint: disable=E0102 + return _res == _exp + + assert type(result) == type(expect), "Expected types mismatch between iterable containers." # pylint: disable=C0123 + # in case of exhaustible iterators, compute them to get a copy once + # also use the copy to remove items such that all must be matched + result = list(result) + expect = list(expect) + assert len(result) == len(expect), "Expected items sizes mismatch between iterable containers." + for res in result: + for exp in expect: + if comparer(res, exp): + expect.remove(exp) + break + assert not expect, f"Not all expected items matched {[formatter(exp) for exp in expect]}" diff --git a/tests/wps_restapi/test_processes.py b/tests/wps_restapi/test_processes.py index 439dcdf67..e4b05ae71 100644 --- a/tests/wps_restapi/test_processes.py +++ b/tests/wps_restapi/test_processes.py @@ -753,7 +753,7 @@ def test_deploy_process_CWL_DockerRequirement_auth_header_format(self): process = self.process_store.fetch_by_id(proc_id) assert process.auth is not None assert process.auth.type == AuthenticationTypes.DOCKER - assert process.auth.token == token + assert process.auth.token == token # noqa assert process.auth.docker == docker def test_deploy_process_CWL_direct_raised_missing_id(self): @@ -783,21 +783,8 @@ def deploy_process_CWL_direct(self, process_id="test-direct-cwl-json", # type: str version=None, # type: Optional[AnyVersion] ): # type: (...) -> Tuple[CWL, JSON] - cwl_core = { - "id": process_id, - "class": "CommandLineTool", - "baseCommand": ["python3", "-V"], - "inputs": {}, - "outputs": { - "output": { - "type": "File", - "outputBinding": { - "glob": "stdout.log" - }, - } - }, - } cwl = {} + cwl_core = self.get_cwl_docker_python_version(cwl_version=None, process_id=process_id) cwl_base = {"cwlVersion": "v1.0"} cwl.update(cwl_base) if version: @@ -857,10 +844,14 @@ def test_deploy_process_CWL_direct_graph_multi_invalid(self): assert "Longer than maximum length 1" in error @staticmethod - def get_cwl_docker_python_version(): - # type: () -> CWL - return { - "cwlVersion": "v1.0", + def get_cwl_docker_python_version(cwl_version="v1.0", process_id=None): + # type: (Optional[str], Optional[str]) -> CWL + cwl = {} + if cwl_version: + cwl["cwlVersion"] = cwl_version + if process_id: + cwl["id"] = process_id + cwl.update({ "class": "CommandLineTool", "requirements": { CWL_REQUIREMENT_APP_DOCKER: { @@ -877,14 +868,15 @@ def get_cwl_docker_python_version(): }, } }, - } + }) + return cwl def test_deploy_process_CWL_DockerRequirement_href(self): with contextlib.ExitStack() as stack: stack.enter_context(mocked_wps_output(self.settings)) out_dir = self.settings["weaver.wps_output_dir"] out_url = self.settings["weaver.wps_output_url"] - assert out_url.startswith("http"), "test can run only if reference is a HTTP reference" # sanity check + assert out_url.startswith("http"), "test can run only if reference is an HTTP reference" # sanity check tmp_dir = stack.enter_context(tempfile.TemporaryDirectory(dir=out_dir)) tmp_file = os.path.join(tmp_dir, "docker-python.cwl") tmp_href = tmp_file.replace(out_dir, out_url, 1) @@ -926,7 +918,7 @@ def test_deploy_process_CWL_DockerRequirement_owsContext(self): stack.enter_context(mocked_wps_output(self.settings)) out_dir = self.settings["weaver.wps_output_dir"] out_url = self.settings["weaver.wps_output_url"] - assert out_url.startswith("http"), "test can run only if reference is a HTTP reference" # sanity check + assert out_url.startswith("http"), "test can run only if reference is an HTTP reference" # sanity check tmp_dir = stack.enter_context(tempfile.TemporaryDirectory(dir=out_dir)) tmp_file = os.path.join(tmp_dir, "docker-python.cwl") tmp_href = tmp_file.replace(out_dir, out_url, 1) @@ -1044,11 +1036,11 @@ def test_deploy_process_CWL_NetworkRequirement_executionUnit(self): stack.enter_context(mocked_wps_output(self.settings)) network_access_requirement = {"networkAccess": True} docker_requirement = {"dockerPull": "python:3.7-alpine"} - for type in ["hints", "requirements"]: + for req_type in ["hints", "requirements"]: cwl = { "class": "CommandLineTool", "cwlVersion": "v1.2", - type: { + req_type: { "NetworkAccess": network_access_requirement, "DockerRequirement": docker_requirement }, @@ -1063,7 +1055,7 @@ def test_deploy_process_CWL_NetworkRequirement_executionUnit(self): } } - p_id = "test-network-access-" + type + p_id = "test-network-access-" + req_type body = { "processDescription": {"process": {"id": p_id}}, "executionUnit": [{"unit": cwl}], @@ -1073,8 +1065,8 @@ def test_deploy_process_CWL_NetworkRequirement_executionUnit(self): pkg = self.get_application_package(p_id) assert desc["deploymentProfile"] == "http://www.opengis.net/profiles/eoc/dockerizedApplication" assert desc["process"]["id"] == p_id - assert pkg[type]["NetworkAccess"] == network_access_requirement - assert pkg[type]["DockerRequirement"] == docker_requirement + assert pkg[req_type]["NetworkAccess"] == network_access_requirement + assert pkg[req_type]["DockerRequirement"] == docker_requirement @mocked_remote_server_requests_wps1([ resources.TEST_REMOTE_SERVER_URL, @@ -1086,7 +1078,7 @@ def test_deploy_process_CWL_WPS1Requirement_href(self): stack.enter_context(mocked_wps_output(self.settings)) out_dir = self.settings["weaver.wps_output_dir"] out_url = self.settings["weaver.wps_output_url"] - assert out_url.startswith("http"), "test can run only if reference is a HTTP reference" # sanity check + assert out_url.startswith("http"), "test can run only if reference is an HTTP reference" # sanity check tmp_dir = stack.enter_context(tempfile.TemporaryDirectory(dir=out_dir)) tmp_file = os.path.join(tmp_dir, "wps1.cwl") tmp_href = tmp_file.replace(out_dir, out_url, 1) @@ -1174,7 +1166,7 @@ def test_deploy_process_CWL_WPS1Requirement_owsContext(self): stack.enter_context(mocked_wps_output(self.settings)) wps_dir = self.settings["weaver.wps_output_dir"] wps_url = self.settings["weaver.wps_output_url"] - assert wps_url.startswith("http"), "test can run only if reference is a HTTP reference" # sanity check + assert wps_url.startswith("http"), "test can run only if reference is an HTTP reference" # sanity check tmp_file = stack.enter_context(tempfile.NamedTemporaryFile(dir=wps_dir, mode="w", suffix=".cwl")) tmp_http = tmp_file.name.replace(wps_dir, wps_url, 1) json.dump(cwl, tmp_file) @@ -1761,7 +1753,7 @@ def test_replace_process_valid(self): body = resp.json assert body["processSummary"]["title"] == data["processDescription"]["process"]["title"], ( "Even though MAJOR update for CWL is accomplished, other fields that usually correspond to MINOR changes " - "should also applied at the same time since the operation replaces the new process definition (PUT)." + "should also be applied at the same time since the operation replaces the new process definition (PUT)." ) assert ( "description" not in body["processSummary"] or # if undefined, dropped from body @@ -2037,7 +2029,7 @@ def test_execute_process_no_error_not_required_params(self): execute_mock_data_tests.append((mock_execute, data_execute)) # apply modifications for testing - execute_mock_data_tests[0][1].pop("inputs") # no inputs is valid (although can be required for WPS process) + execute_mock_data_tests[0][1].pop("inputs") # no inputs valid (although it can be required for WPS process) execute_mock_data_tests[0][1]["outputs"][0].pop("transmissionMode") # should resolve to default value for mock_execute, data_execute in execute_mock_data_tests: diff --git a/weaver/datatype.py b/weaver/datatype.py index a4d52d0b7..459e3dbe1 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -44,6 +44,7 @@ from weaver.utils import localize_datetime # for backward compatibility of previously saved jobs not time-locale-aware from weaver.utils import ( VersionFormat, + apply_number_with_unit, as_version_major_minor_patch, extend_instance, fully_qualified_name, @@ -378,7 +379,7 @@ def wps(self, container=None, **kwargs): try: _wps = self.get("_wps") if _wps is None: - # client retrieval could also be cached if recently fetched an not yet invalidated + # client retrieval could also be cached if recently fetched and not yet invalidated self["_wps"] = _wps = get_wps_client(self.url, container=container, **kwargs) return _wps except (OWSServiceException, xml_util.ParseError) as exc: @@ -579,7 +580,7 @@ def check_accessible(self, settings, ignore=True): meth = "HEAD" url = self.url # - allow 500 for services that incorrectly handle invalid request params, but at least respond - # (should be acceptable in this case because the 'ping' request is not necessarily well formed) + # (should be acceptable in this case because the 'ping' request is not necessarily well-formed) # - allow 400/405 for bad request/method directly reported by the service for the same reasons # - enforce quick timeout (but don't allow 408 code) to avoid long pending connexions that never resolve allowed_codes = [200, 400, 405, 500] @@ -620,27 +621,38 @@ def __init__(self, *args, **kwargs): if not isinstance(self.id, (str, uuid.UUID)): raise TypeError(f"Type 'str' or 'UUID' is required for '{self.__name__}.id'") - def _get_log_msg(self, msg=None, status=None, progress=None): - # type: (Optional[str], Optional[AnyStatusType], Optional[Number]) -> str - if not msg: - msg = self.status_message + @staticmethod + def _get_message(message, size_limit=None): + # type: (str, Optional[int]) -> str + msg_len = len(message) + size_limit = size_limit if isinstance(size_limit, int) and size_limit > 0 else 1024**2 + if len(message) > size_limit: + msg_size = apply_number_with_unit(msg_len, binary=True, decimals=2) + return f"" + return message + + def _get_log_msg(self, msg=None, status=None, progress=None, size_limit=None): + # type: (Optional[str], Optional[AnyStatusType], Optional[Number], Optional[int]) -> str + msg = self._get_message(msg or self.status_message, size_limit=size_limit) status = map_status(status or self.status) progress = max(0, min(100, progress or self.progress)) return get_job_log_msg(duration=self.duration_str, progress=progress, status=status, message=msg) @staticmethod - def _get_err_msg(error): - # type: (WPSException) -> str - return f"{error.text} - code={error.code} - locator={error.locator}" + def _get_err_msg(error, size_limit=None): + # type: (WPSException, Optional[int]) -> str + error_msg = Job._get_message(error.text, size_limit=size_limit) + return f"{error_msg} - code={error.code} - locator={error.locator}" def save_log(self, - errors=None, # type: Optional[Union[str, Exception, WPSException, List[WPSException]]] - logger=None, # type: Optional[Logger] - message=None, # type: Optional[str] - level=INFO, # type: AnyLogLevel - status=None, # type: Optional[AnyStatusType] - progress=None, # type: Optional[Number] - ): # type: (...) -> None + errors=None, # type: Optional[Union[str, Exception, WPSException, List[WPSException]]] + logger=None, # type: Optional[Logger] + message=None, # type: Optional[str] + level=INFO, # type: AnyLogLevel + status=None, # type: Optional[AnyStatusType] + progress=None, # type: Optional[Number] + size_limit=None, # type: Optional[int] + ): # type: (...) -> None """ Logs the specified error and/or message, and adds the log entry to the complete job log. @@ -661,6 +673,10 @@ def save_log(self, :param progress: Override progress applied in the logged message entry, but does not set it to the job object. Uses the current :attr:`Job.progress` value if not specified. + :param size_limit: + Log message entries that individually exceed the limit will be clipped with a generic message. + The parameter is provided for convenience, but take note that setting a too large value could cause the + complete :term:`Job` to fail saving to the database if its total size exceeds the document limit. .. note:: The job object is updated with the log but still requires to be pushed to database to actually persist it. @@ -668,22 +684,30 @@ def save_log(self, if isinstance(errors, WPSException): errors = [errors] elif isinstance(errors, Exception): - errors = str(errors) + errors = self._get_message(str(errors), size_limit=size_limit) if isinstance(errors, str): - log_msg = [(ERROR, self._get_log_msg(message, status=status, progress=progress))] + log_msg = [(ERROR, self._get_log_msg(message, status=status, progress=progress, size_limit=size_limit))] self.exceptions.append(errors) elif isinstance(errors, list): log_msg = [ - (ERROR, self._get_log_msg(self._get_err_msg(error), status=status, progress=progress)) + ( + ERROR, + self._get_log_msg( + self._get_err_msg(error, size_limit=size_limit), + status=status, + progress=progress, + size_limit=size_limit, + ) + ) for error in errors ] self.exceptions.extend([{ "Code": error.code, "Locator": error.locator, - "Text": error.text + "Text": self._get_message(error.text, size_limit=size_limit), } for error in errors]) else: - log_msg = [(level, self._get_log_msg(message, status=status, progress=progress))] + log_msg = [(level, self._get_log_msg(message, status=status, progress=progress, size_limit=size_limit))] for lvl, msg in log_msg: fmt_msg = get_log_fmt() % dict(asctime=now().strftime(get_log_date_fmt()), levelname=getLevelName(lvl), @@ -1574,7 +1598,7 @@ def image(self): def registry(self): # type: () -> str """ - Obtains the registry entry that must used for ``docker login {registry}``. + Obtains the registry entry that must be used for ``docker login {registry}``. """ return dict.__getitem__(self, "registry") @@ -1847,7 +1871,7 @@ def name(self): def tag(self): # type: () -> str """ - Full identifier including the version for an unique reference. + Full identifier including the version for a unique reference. """ proc_id = self.split_version(self.id)[0] # bw-compat, if no version available, no update was applied (single deploy) @@ -2088,7 +2112,7 @@ def deployment_profile(self): base = "http://www.opengis.net/profiles/eoc/" pkg = self.package or {} cls = str(pkg.get("class", "")).lower() - req = get_application_requirement(pkg).get("class") + req = get_application_requirement(pkg, required=False).get("class") typ = self.type if cls == ProcessType.WORKFLOW: @@ -2648,7 +2672,7 @@ def status(self, status): (value == QuoteStatus.SUBMITTED and prev != QuoteStatus.SUBMITTED) or (value == QuoteStatus.PROCESSING and prev == QuoteStatus.COMPLETED) ): - LOGGER.error("Cannot revert back to previous quote status (%s => %s)", value, self.status) + LOGGER.error("Cannot revert to previous quote status (%s => %s)", value, self.status) LOGGER.debug(traceback.extract_stack()) return self["status"] = value diff --git a/weaver/exceptions.py b/weaver/exceptions.py index e022c5511..18f03f37c 100644 --- a/weaver/exceptions.py +++ b/weaver/exceptions.py @@ -235,6 +235,15 @@ class PackageTypeError(HTTPUnprocessableEntity, PackageException): """ +class PackageParsingError(HTTPUnprocessableEntity, PackageException): + """ + Error related to parsing of a package definition. + + Error indicating that a :term:`CWL` package could not be properly parsed + according to expected structures or unresolved definitions. + """ + + class PackageRegistrationError(HTTPInternalServerError, OWSNoApplicableCode, PackageException): """ Error related to a registration issue for a package. diff --git a/weaver/processes/builtin/__init__.py b/weaver/processes/builtin/__init__.py index 82326b84a..d65df0301 100644 --- a/weaver/processes/builtin/__init__.py +++ b/weaver/processes/builtin/__init__.py @@ -135,7 +135,7 @@ def register_builtin_processes(container): for process_id, process_data in builtin_apps_mapping.items(): process_path = process_data["package"] process_desc = process_data["payload"] - process_info = get_process_definition(process_desc, package=None, reference=process_path) + process_info = get_process_definition(process_desc, package=None, reference=process_path, builtin=True) process_url = "/".join([restapi_url, "processes", process_id]) process_package = _get_builtin_package(process_id, process_info["package"]) process_abstract = _get_builtin_metadata(process_id, process_path, "__doc__", clean=True) diff --git a/weaver/processes/constants.py b/weaver/processes/constants.py index 447e502b7..9873b2a5e 100644 --- a/weaver/processes/constants.py +++ b/weaver/processes/constants.py @@ -1,4 +1,5 @@ import sys +from types import MappingProxyType from typing import TYPE_CHECKING from weaver.base import Constants @@ -44,7 +45,7 @@ class OpenSearchField(Constants): # CWL package (requirements/hints) corresponding to `ProcessType.APPLICATION` CWL_REQUIREMENT_APP_BUILTIN = "BuiltinRequirement" CWL_REQUIREMENT_APP_DOCKER = "DockerRequirement" -CWL_REQUIREMENT_APP_DOCKER_GPU = "DockerGpuRequirement" +CWL_REQUIREMENT_APP_DOCKER_GPU = "DockerGpuRequirement" # backward compatibility CWL_REQUIREMENT_APP_ESGF_CWT = "ESGF-CWTRequirement" CWL_REQUIREMENT_APP_OGC_API = "OGCAPIRequirement" CWL_REQUIREMENT_APP_WPS1 = "WPS1Requirement" @@ -52,10 +53,7 @@ class OpenSearchField(Constants): CWL_REQUIREMENT_APP_TYPES = frozenset([ CWL_REQUIREMENT_APP_BUILTIN, CWL_REQUIREMENT_APP_DOCKER, - # FIXME: properly support GPU execution - # - https://github.com/crim-ca/weaver/issues/104 - # - https://github.com/crim-ca/weaver/issues/138 - # CWL_REQUIREMENT_APP_DOCKER_GPU, + CWL_REQUIREMENT_APP_DOCKER_GPU, # backward compatibility, use 'DockerRequirement+cwltool:CUDARequirement' instead CWL_REQUIREMENT_APP_ESGF_CWT, CWL_REQUIREMENT_APP_OGC_API, CWL_REQUIREMENT_APP_WPS1, @@ -67,6 +65,7 @@ class OpenSearchField(Constants): CWL_REQUIREMENT_APP_LOCAL = frozenset([ CWL_REQUIREMENT_APP_BUILTIN, CWL_REQUIREMENT_APP_DOCKER, + CWL_REQUIREMENT_APP_DOCKER_GPU, ]) """ Set of :term:`CWL` requirements that correspond to local execution of an :term:`Application Package`. @@ -82,6 +81,19 @@ class OpenSearchField(Constants): Set of :term:`CWL` requirements that correspond to remote execution of an :term:`Application Package`. """ +CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS = MappingProxyType({ + # use older minimal version/capability to allow more chances to match any available GPU + # if this causes an issue for an actual application, it must provide it explicitly anyway + "cudaVersionMin": "10.0", + "cudaComputeCapability": "3.0", + # use minimum defaults, single GPU + "cudaDeviceCountMin": 1, + "cudaDeviceCountMax": 1, +}) +""" +Parameters employed by default for updating :data:`CWL_REQUIREMENT_APP_DOCKER_GPU` into :data:`CWL_REQUIREMENT_CUDA`. +""" + # FIXME: convert to 'Constants' class CWL_REQUIREMENT_CUDA = "cwltool:CUDARequirement" CWL_REQUIREMENT_ENV_VAR = "EnvVarRequirement" @@ -184,8 +196,11 @@ class ProcessSchema(Constants): CWL_REQUIREMENT_APP_ESGF_CWT, CWL_REQUIREMENT_APP_OGC_API, CWL_REQUIREMENT_APP_WPS1, + CWL_REQUIREMENT_CUDA, CWL_REQUIREMENT_ENV_VAR, CWL_REQUIREMENT_INIT_WORKDIR, + CWL_REQUIREMENT_INLINE_JAVASCRIPT, + CWL_REQUIREMENT_NETWORK_ACCESS, CWL_REQUIREMENT_RESOURCE, CWL_REQUIREMENT_SCATTER, ] diff --git a/weaver/processes/convert.py b/weaver/processes/convert.py index 21139d9e1..005c24168 100644 --- a/weaver/processes/convert.py +++ b/weaver/processes/convert.py @@ -398,8 +398,8 @@ def ows2json_output_data(output, process_description, container=None): if output.reference: json_output["reference"] = output.reference - # Handle special case where we have a reference to a json array containing dataset reference - # Avoid reference to reference by fetching directly the dataset references + # Handle special case where we have a reference to a json array containing dataset reference. + # Avoid reference to reference by fetching directly the dataset references. json_array = _get_multi_json_references(output, container) if json_array and all(str(ref).startswith("http") for ref in json_array): json_output["data"] = json_array @@ -2262,7 +2262,7 @@ def oas2json_io_file(io_info, io_href=null): def oas2json_io_measure(io_info): # type: (OpenAPISchemaObject) -> Union[JSON_IO_TypedInfo, Type[null]] """ - Convert an unit of measure (``UoM``) I/O definition by :term:`OpenAPI` schema into :term:`JSON` representation. + Convert a unit of measure (``UoM``) I/O definition by :term:`OpenAPI` schema into :term:`JSON` representation. This conversion projects an object (normally complex type) into a literal type, considering that other provided parameters are all metadata information. @@ -2739,8 +2739,8 @@ def wps2json_io(io_wps, forced_fields=False): for io_format in io_wps_json["formats"]: transform_json(io_format, rename=rename, replace_values=replace_values, replace_func=replace_func) - # set 'default' format if it matches perfectly, or if only mime-type matches and it is the only available one - # (this avoid 'encoding' possibly not matching due to CWL not providing this information) + # set 'default' format if it matches perfectly, or if only mime-type matches, and it is the only available one + # (this avoids 'encoding' possibly not matching due to CWL not providing this information) io_default = get_field(io_wps_json, "default", search_variations=True) for io_format in io_wps_json["formats"]: io_format["default"] = (io_default != null and is_equal_formats(io_format, io_default)) @@ -2890,7 +2890,7 @@ def _are_different_and_set(item1, item2): Compares two value representations and returns ``True`` only if both are not ``null``, are of same ``type`` and of different representative value. By "representative", we consider here the visual representation of byte/unicode strings rather than literal values to support XML/JSON and Python 2/3 implementations. - Other non string-like types are verified with literal (usual) equality method. + Other non-string-like types are verified with literal (usual) equality method. """ if item1 is null or item2 is null: return False @@ -2946,7 +2946,7 @@ def normalize_ordered_io(io_section, order_hints=None): of parsers. This is merely *cosmetic* adjustments to ease readability of I/O to avoid always shuffling their order across multiple :term:`Application Package` and :term:`Process` reporting formats. - The important result of this function is to provide the I/O as a consistent list of objects so it is less + The important result of this function is to provide the I/O as a consistent list of objects, so it is less cumbersome to compare/merge/iterate over the elements with all functions that will follow. .. note:: @@ -2956,7 +2956,7 @@ def normalize_ordered_io(io_section, order_hints=None): :param io_section: Definition contained under the ``inputs`` or ``outputs`` fields. :param order_hints: Optional/partial I/O definitions hinting an order to sort unsorted-dict I/O. - :returns: I/O specified as list of dictionary definitions with preserved order (as best as possible). + :returns: I/O specified as list of dictionary definitions with preserved order (as good as possible). """ if isinstance(io_section, list): return io_section diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index 4ee5c2980..b9c6b72de 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -21,7 +21,7 @@ import tempfile import time import uuid -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, overload from urllib.parse import parse_qsl, urlparse import colander @@ -49,6 +49,7 @@ PackageException, PackageExecutionError, PackageNotFound, + PackageParsingError, PackageRegistrationError, PackageTypeError, PayloadNotFound @@ -58,12 +59,15 @@ from weaver.processes.constants import ( CWL_REQUIREMENT_APP_BUILTIN, CWL_REQUIREMENT_APP_DOCKER, + CWL_REQUIREMENT_APP_DOCKER_GPU, CWL_REQUIREMENT_APP_ESGF_CWT, CWL_REQUIREMENT_APP_LOCAL, CWL_REQUIREMENT_APP_OGC_API, CWL_REQUIREMENT_APP_REMOTE, CWL_REQUIREMENT_APP_TYPES, CWL_REQUIREMENT_APP_WPS1, + CWL_REQUIREMENT_CUDA, + CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS, CWL_REQUIREMENT_ENV_VAR, CWL_REQUIREMENT_RESOURCE, CWL_REQUIREMENTS_SUPPORTED, @@ -100,6 +104,7 @@ fetch_directory, fetch_file, fully_qualified_name, + generate_diff, get_any_id, get_header, get_job_log_msg, @@ -108,6 +113,7 @@ get_sane_name, get_settings, list_directory_recursive, + null, request_extra, setup_loggers ) @@ -150,12 +156,12 @@ CWL_Requirement, CWL_RequirementsDict, CWL_RequirementNames, - CWL_RequirementsList, CWL_Results, CWL_ToolPathObject, CWL_WorkflowStepPackage, CWL_WorkflowStepPackageMap, CWL_WorkflowStepReference, + Default, JSON, Literal, Number, @@ -312,25 +318,108 @@ def _get_process_payload(process_url): def _get_package_type(package_dict): # type: (CWL) -> Literal[ProcessType.APPLICATION, ProcessType.WORKFLOW] - return ProcessType.WORKFLOW if package_dict.get("class").lower() == "workflow" else ProcessType.APPLICATION + return ProcessType.WORKFLOW if package_dict.get("class", "").lower() == "workflow" else ProcessType.APPLICATION -def _get_package_requirements_as_class_list(requirements): - # type: (CWL_AnyRequirements) -> CWL_RequirementsList +def _get_package_requirements_normalized(requirements, as_dict=False): + # type: (CWL_AnyRequirements, bool) -> CWL_AnyRequirements """ - Converts `CWL` package ``requirements`` or ``hints`` into list representation. + Converts :term:`CWL` package ``requirements`` or ``hints`` into :class:`list` or :class:`dict` representation. - Uniformization `CWL` requirements into the list representation, whether the input definitions where - provided using the dictionary definition as ``{"": {}}`` or - the list of dictionary requirements ``[{}]`` each with a ``class`` key. + Uniformization of :term:`CWL` ``requirements`` or ``hints`` into the :class:`list` representation (default) + or as :class:`dict` if requested, whether the input definitions where provided using the dictionary definition + as ``{"": {}}`` or the list of dictionary requirements ``[{}]`` + each with a ``class`` key. """ if isinstance(requirements, dict): + if as_dict: + return {req: dict(params) for req, params in requirements.items()} # ensure literals instead of dict-like reqs = [] for req in requirements: reqs.append({"class": req}) reqs[-1].update(requirements[req] or {}) return reqs - return [dict(req) for req in requirements] # ensure list-of-dict instead of sequence of dict-like + reqs = [dict(req) for req in requirements] # ensure list-of-dict instead of sequence of dict-like + if as_dict: + return {req.pop("class"): req for req in reqs} + return reqs + + +def _update_package_compatibility(package): + # type: (CWL) -> CWL + """ + Update a :term:`CWL` package with backward compatibility changes if applicable. + """ + package_original = copy.deepcopy(package) + package_type = _get_package_type(package) + if package_type == ProcessType.APPLICATION: + docker_req = get_application_requirement(package, validate=False, required=False) + if docker_req["class"].endswith(CWL_REQUIREMENT_APP_DOCKER_GPU): + # backup original for later compare and find requirements of interest + # requirements unrelated to update must be preserved in same locations and formats to preserve behavior + r_original = package.get("requirements", {}) + h_original = package.get("hints", {}) + r_list = _get_package_requirements_normalized(r_original) + h_list = _get_package_requirements_normalized(h_original) + r_no_docker = list(filter(lambda _req: not _req["class"].endswith(CWL_REQUIREMENT_APP_DOCKER_GPU), r_list)) + h_no_docker = list(filter(lambda _req: not _req["class"].endswith(CWL_REQUIREMENT_APP_DOCKER_GPU), h_list)) + r_cuda = list(filter(lambda _req: _req["class"].endswith(CWL_REQUIREMENT_CUDA), r_list)) + h_cuda = list(filter(lambda _req: _req["class"].endswith(CWL_REQUIREMENT_CUDA), h_list)) + docker_req["class"] = CWL_REQUIREMENT_APP_DOCKER # GPU to official Docker requirement + cuda_req = r_cuda or h_cuda + cuda_found = bool(cuda_req) + if not cuda_req: # if CUDA no explicitly provided along the older GPU requirement, define default + cuda_req = CWL_REQUIREMENT_CUDA_DEFAULT_PARAMETERS.copy() + cuda_req["class"] = CWL_REQUIREMENT_CUDA + # apply the change to the relevant list where it was originally found + if r_list == r_no_docker and h_list != h_no_docker: + h_list = h_no_docker + ([docker_req] if cuda_found else [docker_req, cuda_req]) + elif r_list != r_no_docker and h_list == h_no_docker: + r_list = r_no_docker + ([docker_req] if cuda_found else [docker_req, cuda_req]) + else: + raise PackageParsingError( + f"Expected to find a unique '{CWL_REQUIREMENT_APP_DOCKER_GPU}' definition in CWL " + "requirements or hints, but could not resolve it between mapping and listing representations." + ) + # revert list conversion if necessary + r_list = _get_package_requirements_normalized(r_list, as_dict=isinstance(r_original, dict)) + h_list = _get_package_requirements_normalized(h_list, as_dict=isinstance(h_original, dict)) + if r_list: + package["requirements"] = r_list + if h_list: + package["hints"] = h_list + LOGGER.warning( + "CWL package definition updated using '%s' backward-compatibility definition.\n%s", + CWL_REQUIREMENT_APP_DOCKER_GPU, + generate_diff(package_original, package, val_name="Original CWL", ref_name="Updated CWL") + ) + return package + + +@overload +def _load_package_content(package_dict, # type: CWL + package_name=PACKAGE_DEFAULT_FILE_NAME, # type: str + data_source=None, # type: Optional[str] + only_dump_file=False, # type: Literal[True] + tmp_dir=None, # type: Optional[str] + loading_context=None, # type: Optional[LoadingContext] + runtime_context=None, # type: Optional[RuntimeContext] + process_offering=None, # type: Optional[JSON] + ): # type: (...) -> None + ... + + +@overload +def _load_package_content(package_dict, # type: CWL + package_name=PACKAGE_DEFAULT_FILE_NAME, # type: str + data_source=None, # type: Optional[str] + only_dump_file=False, # type: Literal[False] + tmp_dir=None, # type: Optional[str] + loading_context=None, # type: Optional[LoadingContext] + runtime_context=None, # type: Optional[RuntimeContext] + process_offering=None, # type: Optional[JSON] + ): # type: (...) -> Tuple[CWLFactoryCallable, str, CWL_WorkflowStepPackageMap] + ... def _load_package_content(package_dict, # type: CWL @@ -343,34 +432,37 @@ def _load_package_content(package_dict, # type: CWL process_offering=None, # type: Optional[JSON] ): # type: (...) -> Optional[Tuple[CWLFactoryCallable, str, CWL_WorkflowStepPackageMap]] """ - Loads `CWL` package definition using various contextual resources. + Loads :term:`CWL` package definition using various contextual resources. Following operations are accomplished to validate the package: - - Starts by resolving any intermediate sub-packages steps if the parent package is a `Workflow` (CWL class), + - Starts by resolving any intermediate sub-packages steps if the parent package is a :term:`Workflow` in order to recursively generate and validate their process and package, potentially using remote reference. - Each of those operations are applied to every step. + Each of the following operations are applied to every step individually. - Package I/O are reordered using any reference process offering hints if provided to generate consistent results. + - Perform backward compatibility checks and conversions to the package if applicable. - The resulting package definition is dumped to a temporary JSON file, to validate the content can be serialized. - - Optionally, the `CWL` factory is employed to create the application runner, validating any provided loading and - runtime contexts, and considering all Workflow steps if applicable, or the single application otherwise. - - :param package_dict: package content representation as a json dictionary. - :param package_name: name to use to create the package file. - :param data_source: identifier of the data source to map to specific ADES, or map to localhost if ``None``. - :param only_dump_file: specify if the :class:`CWLFactoryCallable` should be validated and returned. - :param tmp_dir: location of the temporary directory to dump files (deleted on exit). - :param loading_context: cwltool context used to create the cwl package (required if ``only_dump_file=False``) - :param runtime_context: cwltool context used to execute the cwl package (required if ``only_dump_file=False``) - :param process_offering: JSON body of the process description payload (used as I/O hint ordering) + - Optionally, the :term:`CWL` factory is employed to create the application runner, validating any provided loading + and runtime contexts, and considering all resolved :term:`Workflow` steps if applicable, or the atomic application + otherwise. + + :param package_dict: Package content representation as a dictionary. + :param package_name: Name to use to create the package file and :term:`CWL` identifiers. + :param data_source: + Identifier of the :term:`Data Source` to map to specific :term:`ADES`, or map to ``localhost`` if ``None``. + :param only_dump_file: Specify if the :class:`CWLFactoryCallable` should be validated and returned. + :param tmp_dir: Location of the temporary directory to dump files (deleted on exit). + :param loading_context: :mod:`cwltool` context used to create the :term:`CWL` package. + :param runtime_context: :mod:`cwltool` context used to execute the :term:`CWL` package. + :param process_offering: :term:`JSON` body of the process description payload (used as I/O hint ordering). :returns: If :paramref:`only_dump_file` is ``True``, returns ``None``. - Otherwise, tuple of: + Otherwise, :class:`tuple` of: - Instance of :class:`CWLFactoryCallable` - Package type (:attr:`ProcessType.WORKFLOW` or :attr:`ProcessType.APPLICATION`) - - Package sub-steps definitions if package is of type :attr:`ProcessType.WORKFLOW`. Otherwise, empty mapping. - Mapping of each step name contains their respective package ID and definition that must be run. + - Package sub-steps definitions if package represents a :attr:`ProcessType.WORKFLOW`. Otherwise, empty mapping. + Mapping of each step name contains their respective package ID and :term:`CWL` definition that must be run. .. warning:: Specified :paramref:`tmp_dir` will be deleted on exit. @@ -380,6 +472,7 @@ def _load_package_content(package_dict, # type: CWL tmp_json_cwl = os.path.join(tmp_dir, package_name) # for workflows, retrieve each 'sub-package' file + package_dict = _update_package_compatibility(package_dict) package_type = _get_package_type(package_dict) workflow_steps = get_package_workflow_steps(package_dict) step_packages = {} @@ -552,6 +645,10 @@ def _generate_process_with_cwl_from_reference(reference, process_hint=None): The resulting :term:`Process` and its :term:`CWL` will correspond to a remote instance to which execution should be dispatched and monitored, except if the reference was directly a :term:`CWL` file. + .. warning:: + Only conversion of the reference into a potential :term:`CWL` definition is accomplished by this function. + Further validations must still be applied to ensure the loaded definition is valid and meets all requirements. + .. seealso:: - :class:`weaver.processes.ogc_api_process.OGCAPIRemoteProcess` - :class:`weaver.processes.wps1_process.Wps1Process` @@ -648,24 +745,36 @@ def _generate_process_with_cwl_from_reference(reference, process_hint=None): return cwl_package, process_info -def get_application_requirement(package, search=None, default=None, validate=True): - # type: (CWL, Optional[CWL_RequirementNames], Optional[Any], bool) -> Union[CWL_Requirement, Any] +def get_application_requirement(package, # type: CWL + search=None, # type: Optional[CWL_RequirementNames] + default=null, # type: Optional[Union[CWL_Requirement, Default]] + validate=True, # type: bool + required=True, # type: bool + ): # type: (...) -> Union[CWL_Requirement, Default] """ Retrieves a requirement or hint from the :term:`CWL` package definition. - If no filter is specified (default), retrieve the *principal* requirement that allows mapping to the appropriate - :term:`Process` implementation. Obtains the first item in :term:`CWL` package ``requirements`` or ``hints`` - that corresponds to a `Weaver`-specific application type as defined in :py:data:`CWL_REQUIREMENT_APP_TYPES`. - If a filter is provided, this specific requirement or hint is looked for instead. - Regardless of the applied filter, only a unique item can be matched across requirements/hints containers, and - within a same container in case of listing representation to avoid ambiguity. When requirements/hints validation - is enabled, all requirements must also be defined amongst :data:`CWL_REQUIREMENTS_SUPPORTED` for the :term:`CWL` - package to be considered valid. + If no :paramref:`search` filter is specified (default), retrieve the *principal* requirement that allows + mapping to the appropriate :term:`Process` implementation. The *principal* requirement can be extracted + for an :term:`Application Package` of type :data:`ProcessType.APPLICATION` because only one is permitted + simultaneously amongst :data:`CWL_REQUIREMENT_APP_TYPES`. If the :term:`CWL` is not of type + :data:`ProcessType.APPLICATION`, the requirement check is skipped regardless of :paramref:`required`. + + If a :paramref:`search` filter is provided, this specific requirement or hint is looked for instead. + Regardless of the applied filter, only a unique item can be matched across ``requirements``/``hints`` mapping + and/or listing representations. + + When :paramref:`validate` is enabled, all ``requirements`` and ``hints`` must also be defined + within :data:`CWL_REQUIREMENTS_SUPPORTED` for the :term:`CWL` package to be considered valid. + + When :paramref:`convert` is enabled, any backward compatibility definitions will be converted to their + corresponding definition. :param package: CWL definition to parse. :param search: Specific requirement/hint name to search and retrieve the definition if available. :param default: Default value to return if no match was found. If ``None``, returns an empty ``{"class": ""}``. :param validate: Validate supported requirements/hints definition while extracting requested one. + :param required: Validation will fail if no supported requirements/hints definition could be found. :returns: dictionary that minimally has ``class`` field, and optionally other parameters from that requirement. """ # package can define requirements and/or hints, @@ -673,7 +782,7 @@ def get_application_requirement(package, search=None, default=None, validate=Tru # workflow can have multiple, but they are not explicitly handled reqs = package.get("requirements", {}) hints = package.get("hints", {}) - all_hints = _get_package_requirements_as_class_list(reqs) + _get_package_requirements_as_class_list(hints) + all_hints = _get_package_requirements_normalized(reqs) + _get_package_requirements_normalized(hints) if search: app_hints = list(filter(lambda h: h["class"] == search, all_hints)) else: @@ -681,15 +790,31 @@ def get_application_requirement(package, search=None, default=None, validate=Tru if len(app_hints) > 1: raise PackageTypeError( f"Package 'requirements' and/or 'hints' define too many conflicting values: {list(app_hints)}, " - f"only one permitted amongst {list(CWL_REQUIREMENT_APP_TYPES)}." + f"only one requirement is permitted amongst {list(CWL_REQUIREMENT_APP_TYPES)}." ) - req_default = default if default is not None else {"class": ""} + req_default = default if default is not null else {"class": ""} requirement = app_hints[0] if app_hints else req_default if validate: - cwl_supported_reqs = list(CWL_REQUIREMENTS_SUPPORTED) - if not all(item.get("class") in cwl_supported_reqs for item in all_hints): - raise PackageTypeError(f"Invalid requirement, the requirements supported are {cwl_supported_reqs}") + all_classes = sorted(list(set(item.get("class") for item in all_hints))) + app_required = _get_package_type(package) == ProcessType.APPLICATION + if required and app_required: + cwl_impl_type_reqs = sorted(list(CWL_REQUIREMENT_APP_TYPES)) + if not all_classes or not any(cls in cwl_impl_type_reqs for cls in all_classes): + raise PackageTypeError( + f"Invalid package requirement. One supported requirement amongst {cwl_impl_type_reqs} is expected. " + f"Detected package specification {all_classes} did not provide any of the mandatory requirements. " + f"If a script definition is indented for this application, the '{CWL_REQUIREMENT_APP_DOCKER}' " + "requirement can be used to provide a suitable execution environment with needed dependencies. " + f"Refer to '{sd.DOC_URL}/package.html#script-application' for examples." + ) + cwl_supported_reqs = sorted(list(CWL_REQUIREMENTS_SUPPORTED)) + cwl_invalid_reqs = sorted(filter(lambda cls: cls not in cwl_supported_reqs, all_classes)) + if cwl_invalid_reqs: + raise PackageTypeError( + f"Invalid package requirement. Unknown requirement detected: {cwl_invalid_reqs}. " + f"Expected requirements and hints must be amongst the following definitions {cwl_supported_reqs}." + ) return requirement @@ -786,8 +911,13 @@ def get_process_identifier(process_info, package): return process_id -def get_process_definition(process_offering, reference=None, package=None, data_source=None, headers=None): - # type: (JSON, Optional[str], Optional[CWL], Optional[str], Optional[AnyHeadersContainer]) -> JSON +def get_process_definition(process_offering, # type: JSON + reference=None, # type: Optional[str] + package=None, # type: Optional[CWL] + data_source=None, # type: Optional[str] + headers=None, # type: Optional[AnyHeadersContainer] + builtin=False, # type: bool + ): # type: (...) -> JSON """ Resolve the process definition considering corresponding metadata from the offering, package and references. @@ -795,11 +925,12 @@ def get_process_definition(process_offering, reference=None, package=None, data_ and a package definition passed by ``reference`` or ``package`` `CWL` content. The returned process information can be used later on to load an instance of :class:`weaver.wps_package.WpsPackage`. - :param process_offering: `WPS REST-API` (`WPS-3`) process offering as `JSON`. - :param reference: URL to `CWL` package definition, `WPS-1 DescribeProcess` endpoint or `WPS-3 Process` endpoint. - :param package: literal `CWL` package definition (`YAML` or `JSON` format). - :param data_source: where to resolve process IDs (default: localhost if ``None``). + :param process_offering: `WPS REST-API` (`WPS-3`) process offering as :term:`JSON`. + :param reference: URL to :term:`CWL` package, `WPS-1 DescribeProcess` endpoint or `WPS-3 Process` endpoint. + :param package: Literal :term:`CWL` package definition (`YAML` or `JSON` format). + :param data_source: Where to resolve process IDs (default: localhost if ``None``). :param headers: Request headers provided during deployment to retrieve details such as authentication tokens. + :param builtin: Indicate if the package is expected to be a :data:`CWL_REQUIREMENT_APP_BUILTIN` definition. :return: Updated process definition with resolved/merged information from ``package``/``reference``. """ @@ -825,7 +956,8 @@ def try_or_raise_package_error(call, reason): if reference: package, process_info = try_or_raise_package_error( lambda: _generate_process_with_cwl_from_reference(reference, process_info), - reason="Loading package from reference") + reason="Loading package from reference", + ) process_info.update(process_offering) # override upstream details if not isinstance(package, dict): raise PackageRegistrationError("Cannot decode process package contents.") @@ -835,29 +967,34 @@ def try_or_raise_package_error(call, reason): LOGGER.debug("Using data source: '%s'", data_source) package_factory, process_type, _ = try_or_raise_package_error( lambda: _load_package_content(package, data_source=data_source, process_offering=process_info), - reason="Loading package content") + reason="Loading package content", + ) package_inputs, package_outputs = try_or_raise_package_error( lambda: _get_package_inputs_outputs(package_factory), - reason="Definition of package/process inputs/outputs") + reason="Definition of package/process inputs/outputs", + ) process_inputs = process_info.get("inputs", []) process_outputs = process_info.get("outputs", []) try_or_raise_package_error( lambda: _update_package_metadata(process_info, package), - reason="Metadata update") + reason="Metadata update", + ) process_inputs, process_outputs = try_or_raise_package_error( lambda: _merge_package_inputs_outputs(process_inputs, package_inputs, process_outputs, package_outputs), - reason="Merging of inputs/outputs") + reason="Merging of inputs/outputs", + ) app_requirement = try_or_raise_package_error( - lambda: get_application_requirement(package), - reason="Validate requirements and hints") + lambda: get_application_requirement(package, validate=True, required=not builtin), + reason="Validate requirements and hints", + ) auth_requirements = try_or_raise_package_error( lambda: get_auth_requirements(app_requirement, headers), - reason="Obtaining authentication requirements" + reason="Obtaining authentication requirements", ) # obtain any retrieved process id if not already provided from upstream process offering, and clean it @@ -874,7 +1011,7 @@ def try_or_raise_package_error(call, reason): "type": process_type, "inputs": process_inputs, "outputs": process_outputs, - "auth": auth_requirements + "auth": auth_requirements, }) return process_offering @@ -1303,9 +1440,10 @@ def setup_runtime(self): # when process is a docker image, memory monitoring information is obtained with CID file # this file is only generated when the below command is explicitly None (not even when '') "user_space_docker_cmd": None, - # if 'ResourceRequirement' is specified to limit RAM usage, below must be added to ensure it is applied + # if 'ResourceRequirement' is specified to limit RAM/CPU usage, below must be added to ensure it is applied # but don't enable it otherwise, since some defaults are applied which could break existing processes "strict_memory_limit": bool(res_req), + "strict_cpu_limit": bool(res_req), } return runtime_params @@ -1328,7 +1466,7 @@ def update_requirements(self): if req_cls != CWL_REQUIREMENT_APP_DOCKER: continue # remove build-related parameters because we forbid this in our case - # remove output directory since we must explicitly defined it to match with WPS + # remove output directory since we must explicitly define it to match with WPS for req_rm in ["dockerFile", "dockerOutputDirectory"]: is_rm = req_def.pop(req_rm, None) if is_rm: @@ -2056,7 +2194,7 @@ def get_job_process_definition(self, job_name, job_order, tool): # noqa: E811 input_value is one of `input_object` or `array [input_object]` input_object is one of `string` or `dict {class: File, location: string}` in our case input are expected to be File object - :param tool: Whole `CWL` config including hints requirement + :param tool: Whole :term:`CWL` config including hints and requirements (see: :py:data:`weaver.processes.constants.CWL_REQUIREMENT_APP_TYPES`) """ diff --git a/weaver/store/mongodb.py b/weaver/store/mongodb.py index 4f7f22278..addf6a980 100644 --- a/weaver/store/mongodb.py +++ b/weaver/store/mongodb.py @@ -1144,7 +1144,7 @@ def _apply_duration_filter(pipeline, min_duration, max_duration): # duration is not directly stored in the database (as it can change), it must be computed inplace duration_field = { "$addFields": { - "duration": { # becomes 'null' if cannot be computed (e.g.: not started) + "duration": { # becomes 'null' if it cannot be computed (e.g.: not started) "$dateDiff": { # compute the same way as Job.duration "startDate": "$started", diff --git a/weaver/typedefs.py b/weaver/typedefs.py index f6c0336e2..171377ff0 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -72,6 +72,7 @@ Path = Union[os.PathLike, str, bytes] + Default = TypeVar("Default") # used for return value that is employed from a provided default value Params = ParamSpec("Params") # use with 'Callable[Params, Return]', 'Params.args' and 'Params.kwargs' Return = TypeVar("Return") # alias to identify the same return value as a decorated/wrapped function AnyCallable = TypeVar("AnyCallable", bound=Callable[..., Any]) # callable used for decorated/wrapped functions @@ -167,11 +168,11 @@ # 'requirements' includes 'hints' CWL_Requirement = TypedDict("CWL_Requirement", { - "class": CWL_RequirementNames, # type: ignore + "class": Required[CWL_RequirementNames], "provider": NotRequired[str], "process": NotRequired[str], }, total=False) - CWL_RequirementsDict = Dict[CWL_RequirementNames, Dict[str, str]] # {'': {: }} + CWL_RequirementsDict = Dict[CWL_RequirementNames, Dict[str, ValueType]] # {'': {: }} CWL_RequirementsList = List[CWL_Requirement] # [{'class': , : }] CWL_AnyRequirements = Union[CWL_RequirementsDict, CWL_RequirementsList] CWL_Class = Literal["CommandLineTool", "ExpressionTool", "Workflow"] diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 60d44bc04..84ce952a3 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -14,6 +14,7 @@ """ # pylint: disable=C0103,invalid-name import datetime +import inspect import os import re from copy import copy @@ -40,6 +41,7 @@ CWL_REQUIREMENT_INIT_WORKDIR, CWL_REQUIREMENT_INLINE_JAVASCRIPT, CWL_REQUIREMENT_NETWORK_ACCESS, + CWL_REQUIREMENT_RESOURCE, OAS_COMPLEX_TYPES, OAS_DATA_TYPES, PACKAGE_ARRAY_BASE, @@ -2164,7 +2166,7 @@ class OWSContactInfo(ExtendedMappingSchema, OWSNamespace): class OWSServiceContact(ExtendedMappingSchema, OWSNamespace): name = "ServiceContact" individual = OWSString(name="IndividualName", title="OWSIndividualName", example="John Smith", missing=drop) - position = OWSString(name="PositionName", title="OWSPositionName", example="One Man Team", missing=drop) + position = OWSString(name="PositionName", title="OWSPositionName", example="One-Man Team", missing=drop) contact = OWSContactInfo(missing=drop, default={}) @@ -3144,7 +3146,7 @@ class JobStatusInfo(ExtendedMappingSchema): description="Timestamp when the job will be canceled if not yet completed.") estimatedCompletion = ExtendedSchemaNode(DateTime(), missing=drop) nextPoll = ExtendedSchemaNode(DateTime(), missing=drop, - description="Timestamp when the job will prompted for updated status details.") + description="Timestamp when the job will be prompted for updated status details.") percentCompleted = NumberType(example=0, validator=Range(min=0, max=100), description="Completion percentage of the job as indicated by the process.") progress = ExtendedSchemaNode(Integer(), example=100, validator=Range(0, 100), @@ -3457,7 +3459,7 @@ class ExecuteInputOutputs(ExtendedMappingSchema): "Defines which outputs to be obtained from the execution (filtered or all), " "as well as the reporting method for each output according to 'transmissionMode', " "the 'response' type, and the execution 'mode' provided " - "(see for more details: https://pavics-weaver.readthedocs.io/en/latest/processes.html#execution-body)." + f"(see for more details: {DOC_URL}/processes.html#execution-body)." ), default={} ) @@ -3477,7 +3479,7 @@ class Execute(ExecuteInputOutputs): description=( "Desired execution mode specified directly. This is intended for backward compatibility support. " "To obtain more control over execution mode selection, employ the official Prefer header instead " - "(see for more details: https://pavics-weaver.readthedocs.io/en/latest/processes.html#execution-mode)." + f"(see for more details: {DOC_URL}/processes.html#execution-mode)." ), validator=OneOf(ExecuteMode.values()) ) @@ -3486,7 +3488,7 @@ class Execute(ExecuteInputOutputs): default=ExecuteResponse.DOCUMENT, description=( "Indicates the desired representation format of the response. " - "(see for more details: https://pavics-weaver.readthedocs.io/en/latest/processes.html#execution-body)." + f"(see for more details: {DOC_URL}/processes.html#execution-body)." ), validator=OneOf(ExecuteResponse.values()) ) @@ -3617,6 +3619,15 @@ class CWLClass(ExtendedSchemaNode): ) +class CWLExpression(ExtendedSchemaNode): + # https://www.commonwl.org/v1.2/CommandLineTool.html#Expression + schema_type = String + description = ( + f"When combined with '{CWL_REQUIREMENT_INLINE_JAVASCRIPT}', " + "this field allows runtime parameter references." + ) + + class RequirementClass(ExtendedSchemaNode): # in this case it is ok to use 'name' because target fields receiving it will # never be able to be named 'class' because of Python reserved keyword @@ -3626,47 +3637,84 @@ class RequirementClass(ExtendedSchemaNode): description = "CWL requirement class specification." -class CudaRequirementSpecification(PermissiveMappingSchema): +class CUDAComputeCapability(ExtendedSchemaNode): + schema_type = String + example = "3.0" + title = "CUDA compute capability" + description = "The compute capability supported by the GPU hardware." + validator = SemanticVersion(regex=r"^\d+\.\d+$") + + +class CUDAComputeCapabilityArray(ExtendedSequenceSchema): + item = CUDAComputeCapability() + validator = Length(min=1) + + +class CUDAComputeCapabilitySchema(OneOfKeywordSchema): + # https://github.com/common-workflow-language/cwltool/blob/67a180/cwltool/extensions.yml#L178 + title = CUDAComputeCapability.title + description = inspect.cleandoc(""" + The compute capability supported by the GPU hardware. + + * If this is a single value, it defines only the minimum + compute capability. GPUs with higher capability are also + accepted. + * If it is an array value, then only select GPUs with compute + capabilities that explicitly appear in the array. + + See https://docs.nvidia.com/deploy/cuda-compatibility/#faq and + https://docs.nvidia.com/cuda/cuda-c-best-practices-guide/index.html#cuda-compute-capability for details. + """) + _one_of = [ + CUDAComputeCapability, + CUDAComputeCapabilityArray, + ] + + +class CUDARequirementSpecification(PermissiveMappingSchema): + # https://github.com/common-workflow-language/cwltool/blob/67a180/cwltool/extensions.yml#L178 cudaVersionMin = ExtendedSchemaNode( String(), example="11.4", - title="Cuda version minimum", - description="The minimum Cuda version required.", - validator=SemanticVersion(regex=r"^\d+\.\d+$") - ) - cudaComputeCapability = ExtendedSchemaNode( - String(), - example="3.0", - title="Cuda compute capability", - description="The compute capability supported by the GPU.", - validator=SemanticVersion(regex=r"^\d+\.\d+$") + title="CUDA version minimum", + description=inspect.cleandoc(""" + The minimum CUDA version required to run the software. This corresponds to a CUDA SDK release. + + When run in a container, the container image should provide the CUDA runtime, and the host + driver is injected into the container. In this case, because CUDA drivers are backwards compatible, + it is possible to use an older SDK with a newer driver across major versions. + + See https://docs.nvidia.com/deploy/cuda-compatibility/ for details. + """), + validator=SemanticVersion(regex=r"^\d+\.\d+$"), ) + cudaComputeCapability = CUDAComputeCapabilitySchema() cudaDeviceCountMin = ExtendedSchemaNode( Integer(), example=1, default=1, validator=Range(min=1), - title="Cuda device count minimum", - description="The minimum amount of devices required." + title="CUDA device count minimum", + description="The minimum amount of devices required.", ) cudaDeviceCountMax = ExtendedSchemaNode( Integer(), example=8, default=1, validator=Range(min=1), - title="Cuda device count maximum", - description="The maximum amount of devices required." + title="CUDA device count maximum", + description="The maximum amount of devices required.", ) -class CudaRequirementMap(ExtendedMappingSchema): - CudaRequirement = CudaRequirementSpecification( +class CUDARequirementMap(ExtendedMappingSchema): + CUDARequirement = CUDARequirementSpecification( name=CWL_REQUIREMENT_CUDA, - title=CWL_REQUIREMENT_CUDA + title=CWL_REQUIREMENT_CUDA, ) -class CudaRequirementClass(CudaRequirementSpecification): +class CUDARequirementClass(CUDARequirementSpecification): _class = RequirementClass(example=CWL_REQUIREMENT_CUDA, validator=OneOf([CWL_REQUIREMENT_CUDA])) @@ -3675,14 +3723,14 @@ class NetworkAccessRequirementSpecification(PermissiveMappingSchema): Boolean(), example=True, title="Network Access", - description="Indicate whether a process requires outgoing IPv4/IPv6 network access." + description="Indicate whether a process requires outgoing IPv4/IPv6 network access.", ) class NetworkAccessRequirementMap(ExtendedMappingSchema): NetworkAccessRequirement = NetworkAccessRequirementSpecification( name=CWL_REQUIREMENT_NETWORK_ACCESS, - title=CWL_REQUIREMENT_NETWORK_ACCESS + title=CWL_REQUIREMENT_NETWORK_ACCESS, ) @@ -3690,6 +3738,123 @@ class NetworkAccessRequirementClass(NetworkAccessRequirementSpecification): _class = RequirementClass(example=CWL_REQUIREMENT_NETWORK_ACCESS, validator=OneOf([CWL_REQUIREMENT_NETWORK_ACCESS])) +class ResourceRequirementValue(OneOfKeywordSchema): + _one_of = [ + ExtendedSchemaNode(Float(), validator=BoundedRange(min=0.0, exclusive_min=True)), + ExtendedSchemaNode(Integer(), validator=Range(min=1)), + CWLExpression, + ] + + +class ResourceRequirementSpecification(PermissiveMappingSchema): + # descriptions extracted from: https://www.commonwl.org/v1.2/CommandLineTool.html#ResourceRequirement + coresMin = ResourceRequirementValue( + missing=drop, + default=1, + title="Minimum reserved number of CPU cores.", + description=inspect.cleandoc(""" + Minimum reserved number of CPU cores. + + May be a fractional value to indicate to a scheduling algorithm that one core can be allocated + to multiple jobs. For example, a value of 0.25 indicates that up to 4 jobs may run in parallel + on 1 core. A value of 1.25 means that up to 3 jobs can run on a 4 core system (4/1.25 ≈ 3). + + Processes can only share a core allocation if the sum of each of their 'ramMax', 'tmpdirMax', + and 'outdirMax' requests also do not exceed the capacity of the node. + + Processes sharing a core must have the same level of isolation (typically a container or VM) + that they would normally have. + + The reported number of CPU cores reserved for the process, which is available to expressions on the + 'CommandLineTool' as 'runtime.cores', must be a non-zero integer, and may be calculated by rounding up + the cores request to the next whole number. + + Scheduling systems may allocate fractional CPU resources by setting quotas or scheduling weights. + Scheduling systems that do not support fractional CPUs may round up the request to the next whole number. + """), + ) + coresMax = ResourceRequirementValue( + missing=drop, + title="Maximum reserved number of CPU cores.", + description=( + "Maximum reserved number of CPU cores. " + "See 'coresMin' for discussion about fractional CPU requests." + ), + ) + ramMin = ResourceRequirementValue( + missing=drop, + default=256, + title="Minimum reserved RAM in mebibytes.", + description=inspect.cleandoc(""" + Minimum reserved RAM in mebibytes (2**20). + + May be a fractional value. If so, the actual RAM request must be rounded up to the next whole number. + The reported amount of RAM reserved for the process, which is available to expressions on the + 'CommandLineTool' as 'runtime.ram', must be a non-zero integer. + """), + ) + ramMax = ResourceRequirementValue( + missing=drop, + title="Maximum reserved RAM in mebibytes.", + description=( + "Maximum reserved RAM in mebibytes (2**20). " + "See 'ramMin' for discussion about fractional RAM requests." + ), + ) + tmpdirMin = ResourceRequirementValue( + missing=drop, + default=1024, + title="Minimum reserved filesystem based storage for the designated temporary directory in mebibytes.", + description=inspect.cleandoc(""" + Minimum reserved filesystem based storage for the designated temporary directory in mebibytes (2**20). + + May be a fractional value. If so, the actual storage request must be rounded up to the next whole number. + The reported amount of storage reserved for the process, which is available to expressions on the + 'CommandLineTool' as 'runtime.tmpdirSize', must be a non-zero integer. + """), + ) + tmpdirMax = ResourceRequirementValue( + missing=drop, + title="Maximum reserved filesystem based storage for the designated temporary directory in mebibytes.", + description=( + "Maximum reserved filesystem based storage for the designated temporary directory in mebibytes (2**20). " + "See 'tmpdirMin' for discussion about fractional storage requests." + ), + ) + outdirMin = ResourceRequirementValue( + missing=drop, + default=1024, + title="Minimum reserved filesystem based storage for the designated output directory in mebibytes.", + description=inspect.cleandoc(""" + Minimum reserved filesystem based storage for the designated output directory in mebibytes (2**20). + + May be a fractional value. If so, the actual storage request must be rounded up to the next whole number. + The reported amount of storage reserved for the process, which is available to expressions on the + 'CommandLineTool' as runtime.outdirSize, must be a non-zero integer. + """), + ) + outdirMax = ResourceRequirementValue( + missing=drop, + default=1, + title="Maximum reserved filesystem based storage for the designated output directory in mebibytes.", + description=( + "Maximum reserved filesystem based storage for the designated output directory in mebibytes (2**20). " + "See 'outdirMin' for discussion about fractional storage requests." + ), + ) + + +class ResourceRequirementMap(ExtendedMappingSchema): + ResourceRequirement = ResourceRequirementSpecification( + name=CWL_REQUIREMENT_RESOURCE, + title=CWL_REQUIREMENT_RESOURCE, + ) + + +class ResourceRequirementClass(ResourceRequirementSpecification): + _class = RequirementClass(example=CWL_REQUIREMENT_RESOURCE, validator=OneOf([CWL_REQUIREMENT_RESOURCE])) + + class DockerRequirementSpecification(PermissiveMappingSchema): dockerPull = ExtendedSchemaNode( String(), @@ -3711,17 +3876,24 @@ class DockerRequirementClass(DockerRequirementSpecification): class DockerGpuRequirementSpecification(DockerRequirementSpecification): + deprecated = True description = ( "Docker requirement with GPU-enabled support (https://github.com/NVIDIA/nvidia-docker). " - "The instance must have the NVIDIA toolkit installed to use this feature." + "The instance must have the NVIDIA toolkit installed to use this feature. " + "\nWARNING:\n" + "This requirement is specific to Weaver and is preserved only for backward compatibility. " + f"Prefer the combined use of official '{CWL_REQUIREMENT_APP_DOCKER}' and '{CWL_REQUIREMENT_CUDA}' " + "for better support of GPU capabilities and portability to other CWL-supported platforms." ) class DockerGpuRequirementMap(ExtendedMappingSchema): + deprecated = True req = DockerGpuRequirementSpecification(name=CWL_REQUIREMENT_APP_DOCKER_GPU) class DockerGpuRequirementClass(DockerGpuRequirementSpecification): + deprecated = True title = CWL_REQUIREMENT_APP_DOCKER_GPU _class = RequirementClass(example=CWL_REQUIREMENT_APP_DOCKER_GPU, validator=OneOf([CWL_REQUIREMENT_APP_DOCKER_GPU])) @@ -3848,6 +4020,10 @@ class WPS1RequirementClass(WPS1RequirementSpecification): _class = RequirementClass(example=CWL_REQUIREMENT_APP_WPS1, validator=OneOf([CWL_REQUIREMENT_APP_WPS1])) +class UnknownRequirementMap(PermissiveMappingSchema): + description = "Generic schema to allow alternative CWL requirements/hints not explicitly defined in schemas." + + class UnknownRequirementClass(PermissiveMappingSchema): _class = RequirementClass(example="UnknownRequirement") @@ -3859,7 +4035,8 @@ class CWLRequirementsMap(AnyOfKeywordSchema): InitialWorkDirRequirementMap(missing=drop), InlineJavascriptRequirementMap(missing=drop), NetworkAccessRequirementMap(missing=drop), - PermissiveMappingSchema(missing=drop), + ResourceRequirementMap(missing=drop), + UnknownRequirementMap(missing=drop), # allows anything, must be last ] @@ -3873,6 +4050,7 @@ class CWLRequirementsItem(OneOfKeywordSchema): InitialWorkDirRequirementClass(missing=drop), InlineJavascriptRequirementClass(missing=drop), NetworkAccessRequirementClass(missing=drop), + ResourceRequirementClass(missing=drop), UnknownRequirementClass(missing=drop), # allows anything, must be last ] @@ -3891,15 +4069,17 @@ class CWLRequirements(OneOfKeywordSchema): class CWLHintsMap(AnyOfKeywordSchema, PermissiveMappingSchema): _any_of = [ BuiltinRequirementMap(missing=drop), - CudaRequirementMap(missing=drop), + CUDARequirementMap(missing=drop), DockerRequirementMap(missing=drop), DockerGpuRequirementMap(missing=drop), InitialWorkDirRequirementMap(missing=drop), InlineJavascriptRequirementMap(missing=drop), NetworkAccessRequirementMap(missing=drop), + ResourceRequirementMap(missing=drop), ESGF_CWT_RequirementMap(missing=drop), OGCAPIRequirementMap(missing=drop), WPS1RequirementMap(missing=drop), + UnknownRequirementMap(missing=drop), # allows anything, must be last ] @@ -3909,12 +4089,13 @@ class CWLHintsItem(OneOfKeywordSchema, PermissiveMappingSchema): discriminator = "class" _one_of = [ BuiltinRequirementClass(missing=drop), - CudaRequirementClass(missing=drop), + CUDARequirementClass(missing=drop), DockerRequirementClass(missing=drop), DockerGpuRequirementClass(missing=drop), InitialWorkDirRequirementClass(missing=drop), InlineJavascriptRequirementClass(missing=drop), NetworkAccessRequirementClass(missing=drop), + ResourceRequirementClass(missing=drop), ESGF_CWT_RequirementClass(missing=drop), OGCAPIRequirementClass(missing=drop), WPS1RequirementClass(missing=drop), @@ -4046,7 +4227,7 @@ class CWLInputsDefinition(OneOfKeywordSchema): class OutputBinding(PermissiveMappingSchema): glob = ExtendedSchemaNode(String(), missing=drop, - description="Glob pattern the will find the output on disk or mounted docker volume.") + description="Glob pattern to find the output on disk or mounted docker volume.") class CWLOutputObject(PermissiveMappingSchema): @@ -4477,6 +4658,7 @@ class ExecutionUnitList(ExtendedSequenceSchema): title="ExecutionUnit", description="Definition of the Application Package to execute." ) + validator = Length(min=1, max=1) class ProcessDeploymentWithContext(ProcessDeployment): @@ -4822,7 +5004,7 @@ class PostProcessJobsEndpointXML(LocalProcessPath): body = WPSExecutePost( # very important to override 'name' in this case # original schema uses it to specify the XML class name - # in this context, it is used to defined the 'in' location of this schema to form 'requestBody' in OpenAPI + # in this context, it is used to define the 'in' location of this schema to form 'requestBody' in OpenAPI name="body", examples={ "ExecuteXML": { @@ -5114,7 +5296,7 @@ class BadRequestResponseSchema(ExtendedMappingSchema): class ConflictRequestResponseSchema(ExtendedMappingSchema): - description = "Conflict between the affected entity an another existing definition." + description = "Conflict between the affected entity and another existing definition." header = ResponseHeaders() body = ErrorJsonResponseBodySchema()