From 48377455891cb309e275bb0453cdc01a045feacb Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 11 May 2020 14:09:35 -0400 Subject: [PATCH] better handling of WPS-REST JSON multiple-output file (relates to #25) --- CHANGES.rst | 15 ++++++ weaver/processes/utils.py | 57 +++++++++++++++-------- weaver/wps_restapi/processes/processes.py | 3 +- 3 files changed, 55 insertions(+), 20 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index f0edcb860..ce7ade163 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -4,6 +4,21 @@ Changes `Unreleased `_ (latest) ======================================================================== +Changes: +-------- + +- Improve handling of `WPS-REST` HREF output matching a JSON file with array of URL references corresponding to process + pseudo multiple-output format (relates to `#25 `_). The output is now + expanded to contain both the original JSON file reference and the extended contained URL references so that either + variation can be retrieved as result. + +Fixes: +------ + +- Fix handling of WPS-REST output matching a JSON file for multiple-output format specified with a relative local path + as specified by job output location. Only remote HTTP references where correctly parsed. Also avoid failing the job if + the reference JSON parsing fails. It will simply return the original reference URL in this case without expanded data. + `1.6.0 `_ (2020-05-07) ======================================================================== diff --git a/weaver/processes/utils.py b/weaver/processes/utils.py index edb0bd58f..6a81dfd40 100644 --- a/weaver/processes/utils.py +++ b/weaver/processes/utils.py @@ -1,5 +1,6 @@ import json import logging +import os import warnings from copy import deepcopy from distutils.version import LooseVersion @@ -44,6 +45,7 @@ from weaver.processes.types import PROCESS_APPLICATION, PROCESS_WORKFLOW from weaver.store.base import StoreProcesses from weaver.utils import get_sane_name, get_settings, get_url_without_query +from weaver.wps import get_wps_output_dir from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.utils import get_wps_restapi_base_url @@ -84,29 +86,42 @@ def _read_reference(url): return None -def _get_multi_json_references(output): - # type: (owslib.wps.Output) -> Optional[List[JSON]] +def _get_multi_json_references(output, container): + # type: (owslib.wps.Output, Optional[AnySettingsContainer]) -> Optional[List[JSON]] """ Since WPS standard does not allow to return multiple values for a single output, a lot of process actually return a json array containing references to these outputs. - This function goal is to detect this particular format. - :return: An array of HTTP(S) references if the specified output is effectively a JSON containing that, - ``None`` otherwise. - """ + Because the multi-output references are contained within this JSON file, it is not very convenient to retrieve + the list of URLs as one always needs to open and read the file to get them. This function goal is to detect this + particular format and expand the references to make them quickly available in the job output response. - # Check for the json datatype and mimetype + :return: + Array of HTTP(S) references if the specified output is effectively a JSON containing that, ``None`` otherwise. + """ + # Check for the json datatype and mime-type if output.dataType == WPS_COMPLEX_DATA and output.mimeType == CONTENT_TYPE_APP_JSON: + try: + # If the json data is referenced read it's content + if output.reference: + out_ref = output.reference + if container: + if out_ref.startswith("file://"): + out_ref = out_ref[7:] + if out_ref.startswith("/"): + wps_out_dir = get_wps_output_dir(container) + out_ref = os.path.join(wps_out_dir, out_ref) + if not os.path.isfile(out_ref): + out_ref = output.reference + json_data_str = _read_reference(out_ref) + # Else get the data directly + else: + json_data_str = _get_data(output) - # If the json data is referenced read it's content - if output.reference: - json_data_str = _read_reference(output.reference) - # Else get the data directly - else: - json_data_str = _get_data(output) - - # Load the actual json dict - json_data = json.loads(json_data_str) + # Load the actual json dict + json_data = json.loads(json_data_str) + except Exception: + return None if isinstance(json_data, list): for data_value in json_data: @@ -122,10 +137,14 @@ def map_progress(progress, range_min, range_max): return max(range_min, min(range_max, range_min + (progress * (range_max - range_min)) / 100)) -def jsonify_output(output, process_description): - # type: (owslib.wps.Output, owslib.wps.Process) -> JSON +def jsonify_output(output, process_description, container=None): + # type: (owslib.wps.Output, owslib.wps.Process, Optional[AnySettingsContainer]) -> JSON """ Utility method to jsonify an output element from a WPS1 process description. + + In the case that a reference JSON output is specified and that it refers to a file that contains an array list of + URL references to simulate a multiple-output, this specific output gets expanded to contain both the original + URL ``reference`` field and the loaded URL list under ``data`` field for easier access from the response body. """ if not output.dataType: @@ -144,7 +163,7 @@ def jsonify_output(output, process_description): # Handle special case where we have a reference to a json array containing dataset reference # Avoid reference to reference by fetching directly the dataset references - json_array = _get_multi_json_references(output) + json_array = _get_multi_json_references(output, container) if json_array and all(str(ref).startswith("http") for ref in json_array): json_output["data"] = json_array else: diff --git a/weaver/wps_restapi/processes/processes.py b/weaver/wps_restapi/processes/processes.py index 108e205b6..a5d962a2e 100644 --- a/weaver/wps_restapi/processes/processes.py +++ b/weaver/wps_restapi/processes/processes.py @@ -201,7 +201,8 @@ def execute_process(self, job_id, url, headers=None, notification_email=None): job.status_message = "Job succeeded{}.".format(msg_progress) wps_package.retrieve_package_job_log(execution, job) job.save_log(logger=task_logger) - job_results = [jsonify_output(output, process) for output in execution.processOutputs] + job_results = [jsonify_output(output, process, settings) + for output in execution.processOutputs] job.results = make_results_relative(job_results, settings) else: task_logger.debug("Job failed.")