diff --git a/CHANGES.rst b/CHANGES.rst index a739cafc0..68ddbc669 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,6 +10,8 @@ Changes: -------- - Add support of YAML format for loading ``weaver.data_sources`` definition. - Pre-install ``Docker`` CLI in ``worker`` image to avoid bad practice of mounting it from the host. +- Adjust WPS request dispatching such that process jobs get executed by ``Celery`` worker as intended + (see `#21 `_ and `#126 `_). Fixes: ------ diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 6cd11c3fe..e92b44f86 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -10,6 +10,7 @@ import contextlib import logging import os +import tempfile import time import unittest from copy import deepcopy @@ -47,7 +48,7 @@ IANA_NAMESPACE, get_cwl_file_format ) -from weaver.processes.constants import CWL_REQUIREMENT_APP_BUILTIN +from weaver.processes.constants import CWL_REQUIREMENT_APP_BUILTIN, CWL_REQUIREMENT_APP_DOCKER from weaver.status import STATUS_RUNNING, STATUS_SUCCEEDED from weaver.utils import get_any_value from weaver.visibility import VISIBILITY_PUBLIC @@ -1502,3 +1503,117 @@ def test_execute_application_package_process_with_bucket(self): assert not os.path.exists(os.path.join(wps_outdir, job_id, out_file)) assert not os.path.exists(os.path.join(wps_outdir, wps_uuid, out_file)) assert os.path.isfile(os.path.join(wps_outdir, "{}.xml".format(job_id))) + + +@pytest.mark.functional +class WpsPackageDockerAppTest(WpsPackageConfigBase): + @classmethod + def setUpClass(cls): + cls.settings = { + "weaver.wps": True, + "weaver.wps_output": True, + "weaver.wps_output_path": "/wpsoutputs", + "weaver.wps_output_dir": "/tmp", # nosec: B108 # don't care hardcoded for test + "weaver.wps_path": "/ows/wps", + "weaver.wps_restapi_path": "/", + } + super(WpsPackageDockerAppTest, cls).setUpClass() + + def test_execute_application_package_process_docker(self): + """ + Test validates that basic Docker application runs successfully, fetching the reference as needed. + + + """ + out_key = "output" + out_file = "output.txt" + cwl = { + "cwlVersion": "v1.0", + "class": "CommandLineTool", + "baseCommand": "cat", + "requirements": { + CWL_REQUIREMENT_APP_DOCKER: { + "dockerPull": "debian:stretch-slim" + } + }, + "inputs": [ + {"id": "file", "type": "File", "inputBinding": {"position": 1}}, + ], + "outputs": [ + {"id": out_key, "type": "File", "outputBinding": {"glob": out_file}}, + ] + } + body = { + "processDescription": { + "process": {"id": self._testMethodName} + }, + "deploymentProfileName": "http://www.opengis.net/profiles/eoc/dockerizedApplication", + "executionUnit": [{"unit": cwl}], + } + self.deploy_process(body) + + test_content = "Test file in Docker" + with contextlib.ExitStack() as stack_proc: + # setup + dir_name = tempfile.gettempdir() + tmp_file = stack_proc.enter_context(tempfile.NamedTemporaryFile(dir=dir_name, mode="w", suffix=".txt")) + tmp_file.write(test_content) + tmp_file.seek(0) + exec_body = { + "mode": EXECUTE_MODE_ASYNC, + "response": EXECUTE_RESPONSE_DOCUMENT, + "inputs": [ + {"id": "file", "href": tmp_file.name}, + ], + "outputs": [ + {"id": out_key, "transmissionMode": "reference"}, + ] + } + for process in mocked_execute_process(): + stack_proc.enter_context(process) + + # execute + proc_url = "/processes/{}/jobs".format(self._testMethodName) + resp = mocked_sub_requests(self.app, "post_json", proc_url, + params=exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], "Failed with: [{}]\nReason:\n{}".format(resp.status_code, resp.json) + status_url = resp.json["location"] + job_id = resp.json["jobID"] + + # job monitoring + monitor_timeout = 60 + time.sleep(1) # small delay to ensure process started + while monitor_timeout >= 0: + resp = self.app.get(status_url, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.json["status"] in [STATUS_RUNNING, STATUS_SUCCEEDED] + if resp.json["status"] == STATUS_SUCCEEDED: + break + time.sleep(1) + assert resp.json["status"] == STATUS_SUCCEEDED + resp = self.app.get("{}/result".format(status_url), headers=self.json_headers) + assert resp.status_code == 200 + + # check that output is HTTP reference to file + output_values = {out["id"]: get_any_value(out) for out in resp.json["outputs"]} + assert len(output_values) == 1 + wps_uuid = self.job_store.fetch_by_id(job_id).wps_id + wps_out_ref = "localhost/{}/{}".format(self.settings["weaver.wps_output_path"], wps_uuid) + wps_output = "{}/{}".format(wps_out_ref, wps_uuid, out_file) + assert output_values[out_key] == wps_output + + # check that actual output file was created in expected location along with XML job status + wps_outdir = self.settings["weaver.wps_output_dir"] + wps_out_file = os.path.join(wps_outdir, job_id, out_file) + assert not os.path.exists(os.path.join(wps_outdir, out_file)), "File must be in job subdir, not wps out dir." + # job log, XML status and output directory can be retrieved with both Job UUID and underlying WPS UUID reference + assert os.path.isfile(os.path.join(wps_outdir, "{}.log".format(wps_uuid))) + assert os.path.isfile(os.path.join(wps_outdir, "{}.xml".format(wps_uuid))) + assert os.path.isfile(os.path.join(wps_outdir, wps_uuid, out_file)) + assert os.path.isfile(os.path.join(wps_outdir, "{}.log".format(job_id))) + assert os.path.isfile(os.path.join(wps_outdir, "{}.xml".format(job_id))) + assert os.path.isfile(wps_out_file) + + # validate content + with open(wps_out_file) as res_file: + assert res_file.read() == test_content diff --git a/tests/utils.py b/tests/utils.py index a970cb5d6..6ae98a83a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -360,8 +360,8 @@ def id(self): task = MockTask() - def mock_execute_process(job_id, url, headers, notification_email): - execute_process(job_id, url, headers, notification_email) + def mock_execute_process(job_id, url, headers): + execute_process(job_id, url, headers) return task return ( diff --git a/weaver/datatype.py b/weaver/datatype.py index 603f9e044..acd8ee9a4 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING import six +import lxml.etree from dateutil.parser import parse as dt_parse # noqa from owslib.wps import WPSException from pywps import Process as ProcessWPS @@ -545,25 +546,29 @@ def access(self, visibility): @property def request(self): # type: () -> Optional[AnyStr] - """XML request for WPS execution submission as string.""" + """XML request for WPS execution submission as string (binary).""" return self.get("request", None) @request.setter def request(self, request): # type: (Optional[AnyStr]) -> None - """XML request for WPS execution submission as string.""" + """XML request for WPS execution submission as string (binary).""" + if isinstance(request, lxml.etree._Element): # noqa + request = lxml.etree.tostring(request) self["request"] = request @property def response(self): # type: () -> Optional[AnyStr] - """XML status response from WPS execution submission as string.""" + """XML status response from WPS execution submission as string (binary).""" return self.get("response", None) @response.setter def response(self, response): # type: (Optional[AnyStr]) -> None - """XML status response from WPS execution submission as string.""" + """XML status response from WPS execution submission as string (binary).""" + if isinstance(response, lxml.etree._Element): # noqa + response = lxml.etree.tostring(response) self["response"] = response def _job_url(self, settings): diff --git a/weaver/processes/wps1_process.py b/weaver/processes/wps1_process.py index 85984ae8f..cdce08913 100644 --- a/weaver/processes/wps1_process.py +++ b/weaver/processes/wps1_process.py @@ -125,7 +125,7 @@ def execute(self, workflow_inputs, out_dir, expected_outputs): if num_retries >= max_retries: raise Exception("Could not read status document after {} retries. Giving up.".format(max_retries)) try: - execution = check_wps_status(url=execution.statusLocation, verify=self.verify, + execution = check_wps_status(location=execution.statusLocation, verify=self.verify, sleep_secs=wait_secs(run_step)) job_id = execution.statusLocation.replace(".xml", "").split("/")[-1] LOGGER.debug(get_log_monitor_msg(job_id, status.map_status(execution.getStatus()), diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index 2537aa105..523efd071 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -1953,6 +1953,12 @@ def update_status(self, message, progress, status): pywps_status = map_status(status, STATUS_COMPLIANT_PYWPS) pywps_status_id = STATUS_PYWPS_IDS[pywps_status] + # NOTE: + # When running process in sync (because executed within celery worker already async), + # pywps reverts status file output flag. Re-enforce it for our needs. + # (see: 'wevaer.wps.WorkerService.execute_job') + self.response.store_status_file = True + # pywps overrides 'status' by 'accepted' in 'update_status', so use the '_update_status' to enforce the status # using protected method also avoids weird overrides of progress percent on failure and final 'success' status self.response._update_status(pywps_status_id, message, self.percent) # noqa: W0212 diff --git a/weaver/store/mongodb.py b/weaver/store/mongodb.py index 9b25388a6..186c343bc 100644 --- a/weaver/store/mongodb.py +++ b/weaver/store/mongodb.py @@ -377,6 +377,7 @@ def save_job(self, service=None, # type: Optional[AnyStr] inputs=None, # type: Optional[List[Any]] is_workflow=False, # type: bool + is_local=False, # type: bool user_id=None, # type: Optional[int] execute_async=True, # type: bool custom_tags=None, # type: Optional[List[AnyStr]] @@ -409,6 +410,7 @@ def save_job(self, "status": map_status(STATUS_ACCEPTED), "execute_async": execute_async, "is_workflow": is_workflow, + "is_local": is_local, "created": now(), "tags": list(set(tags)), "access": access, diff --git a/weaver/utils.py b/weaver/utils.py index b906cc196..37bc8e654 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -748,9 +748,7 @@ def fetch_file(file_reference, file_outdir, settings=None, **request_kwargs): file_path = os.path.join(file_outdir, file_name) if file_reference.startswith("file://"): file_reference = file_reference[7:] - LOGGER.debug("Fetch file resolved:\n" - " Reference: [%s]\n" - " File Path: [%s]", file_href, file_path) + LOGGER.debug("Fetching file reference: [%s]", file_href) if os.path.isfile(file_reference): LOGGER.debug("Fetch file resolved as local reference.") # NOTE: @@ -760,7 +758,8 @@ def fetch_file(file_reference, file_outdir, settings=None, **request_kwargs): # Do symlink operation by hand instead of with argument to have Python-2 compatibility. if os.path.islink(file_reference): os.symlink(os.readlink(file_reference), file_path) - else: + # otherwise copy the file if not already available + elif not os.path.isfile(file_path) or os.path.realpath(file_path) != os.path.realpath(file_reference): shutil.copyfile(file_reference, file_path) elif file_reference.startswith("s3://"): LOGGER.debug("Fetch file resolved as S3 bucket reference.") @@ -798,7 +797,9 @@ def fetch_file(file_reference, file_outdir, settings=None, **request_kwargs): scheme = "" if len(scheme) < 2 else scheme[0] raise ValueError("Unresolved fetch file scheme: '{!s}', supported: {}" .format(scheme, list(SUPPORTED_FILE_SCHEMES))) - LOGGER.debug("Fetch file written") + LOGGER.debug("Fetch file resolved:\n" + " Reference: [%s]\n" + " File Path: [%s]", file_href, file_path) return file_path diff --git a/weaver/wps.py b/weaver/wps.py index c72e87a74..5aa21999e 100644 --- a/weaver/wps.py +++ b/weaver/wps.py @@ -15,6 +15,7 @@ from pyramid.wsgi import wsgiapp2 from pyramid_celery import celery_app as app from pywps import configuration as pywps_config +from pywps.app import WPSRequest from pywps.app.Service import Service from six.moves.configparser import ConfigParser from six.moves.urllib.parse import urlparse @@ -129,12 +130,12 @@ def get_wps_local_status_location(url_status_location, container, must_exist=Tru return out_path -def check_wps_status(url=None, response=None, sleep_secs=2, verify=True, settings=None): +def check_wps_status(location=None, response=None, sleep_secs=2, verify=True, settings=None): # type: (Optional[AnyStr], Optional[etree.ElementBase], int, bool, Optional[AnySettingsContainer]) -> WPSExecution """ Run :func:`owslib.wps.WPSExecution.checkStatus` with additional exception handling. - :param url: job URL where to look for job status. + :param location: job URL or file path where to look for job status. :param response: WPS response document of job status. :param sleep_secs: number of seconds to sleep before returning control to the caller. :param verify: Flag to enable SSL verification. @@ -143,9 +144,9 @@ def check_wps_status(url=None, response=None, sleep_secs=2, verify=True, setting """ def _retry_file(): LOGGER.warning("Failed retrieving WPS status-location, attempting with local file.") - out_path = get_wps_local_status_location(url, settings) + out_path = get_wps_local_status_location(location, settings) if not out_path: - raise HTTPNotFound("Could not find file resource from [{}].".format(url)) + raise HTTPNotFound("Could not find file resource from [{}].".format(location)) LOGGER.info("Resolved WPS status-location using local file reference.") return open(out_path, "r").read() @@ -153,11 +154,11 @@ def _retry_file(): if response: LOGGER.debug("Retrieving WPS status from XML response document...") xml = response - elif url: + elif location: xml_resp = HTTPNotFound() try: LOGGER.debug("Attempt to retrieve WPS status-location from URL...") - xml_resp = request_extra("get", url, verify=verify, settings=settings) + xml_resp = request_extra("get", location, verify=verify, settings=settings) xml = xml_resp.content except Exception as ex: LOGGER.debug("Got exception during get status: [%r]", ex) @@ -276,14 +277,58 @@ def load_pywps_config(container, config=None): return pywps_config.CONFIG -# @app.task(bind=True) -@wsgiapp2 -def pywps_view(environ, start_response): - """ - * TODO: add xml response renderer +class WorkerService(Service): """ - LOGGER.debug("pywps env: %s", environ.keys()) + Dispatches PyWPS requests from *older* WPS-1/2 XML endpoint to WPS-REST as appropriate. + + When ``GetCapabilities`` or ``DescribeProcess`` requests are received, directly return to result + (no need to subprocess as Celery task that gets resolved quickly with only the process(es) details). + When receiving ``Execute`` request, convert the XML payload to corresponding JSON and + dispatch it to some Celery Worker to actually process it. + """ + def __init__(self, *_, is_worker=False, **__): + super(WorkerService, self).__init__(*_, **__) + self.is_worker = is_worker + + def execute(self, identifier, wps_request, uuid): + """ + Dispatch operation to WPS-REST endpoint, which it turn should call back the real Celery Worker for execution. + """ + # FIXME: !!! + # XML -> JSON + # submit_job_handler() + + def execute_job(self, process_id, wps_inputs, wps_outputs, mode, job_uuid): + """ + Real execution of the process by active Celery Worker. + """ + execution = WPSExecution(version="2.0", url="localhost") + xml_request = execution.buildRequest(process_id, wps_inputs, wps_outputs, mode=mode, lineage=True) + wps_request = WPSRequest() + wps_request.identifier = process_id + wps_request.set_version("2.0.0") + request_parser = wps_request._post_request_parser(wps_request.WPS.Execute().tag) + request_parser(xml_request) + # NOTE: + # Setting 'status = false' will disable async execution of 'pywps.app.Process.Process' + # but this is needed since this job is running within Celery already async + # (daemon process can't have children processes) + # Because if how the code in PyWPS is made, we have to re-enable creation of status file + wps_request.status = "false" + wps_response = super(WorkerService, self).execute(process_id, wps_request, job_uuid) + wps_response.store_status_file = True + # update execution status with actual status file and apply required references + execution = check_wps_status(location=wps_response.process.status_location) + execution.request = xml_request + return execution + + +def get_pywps_service(environ=None, is_worker=False): + """ + Generates the PyWPS Service that provides *older* WPS-1/2 XML endpoint. + """ + environ = environ or {} try: # get config file settings = get_settings(app) @@ -291,15 +336,25 @@ def pywps_view(environ, start_response): if not isinstance(pywps_cfg, ConfigParser) or not settings.get("weaver.wps_configured"): load_pywps_config(app, config=pywps_cfg) - # call pywps application with processes filtered according to the adapter"s definition + # call pywps application with processes filtered according to the adapter's definition process_store = get_db(app).get_store(StoreProcesses) processes_wps = [process.wps() for process in process_store.list_processes(visibility=VISIBILITY_PUBLIC, request=get_current_request())] - service = Service(processes_wps) + service = WorkerService(processes_wps, is_worker=is_worker) except Exception as ex: LOGGER.exception("Error occurred during PyWPS Service and/or Processes setup.") raise OWSNoApplicableCode("Failed setup of PyWPS Service and/or Processes. Error [{!r}]".format(ex)) + return service + +# @app.task(bind=True) +@wsgiapp2 +def pywps_view(environ, start_response): + """ + Served location for PyWPS Service that provides *older* WPS-1/2 XML endpoint. + """ + LOGGER.debug("pywps env: %s", environ.keys()) + service = get_pywps_service(environ) return service(environ, start_response) diff --git a/weaver/wps_restapi/processes/processes.py b/weaver/wps_restapi/processes/processes.py index 750fd86cf..41c6ac77f 100644 --- a/weaver/wps_restapi/processes/processes.py +++ b/weaver/wps_restapi/processes/processes.py @@ -45,6 +45,7 @@ from weaver.status import STATUS_ACCEPTED, STATUS_FAILED, STATUS_STARTED, STATUS_SUCCEEDED, map_status from weaver.store.base import StoreJobs, StoreProcesses, StoreServices from weaver.utils import ( + bytes2str, get_any_id, get_any_value, get_cookie_headers, @@ -57,9 +58,11 @@ from weaver.visibility import VISIBILITY_PUBLIC, VISIBILITY_VALUES from weaver.wps import ( check_wps_status, + get_pywps_service, get_wps_local_status_location, get_wps_output_path, get_wps_output_url, + get_wps_url, load_pywps_config ) from weaver.wps_restapi import swagger_definitions as sd @@ -90,7 +93,7 @@ @app.task(bind=True) -def execute_process(self, job_id, url, headers=None, notification_email=None): +def execute_process(self, job_id, url, headers=None): LOGGER.debug("Job execute process called.") settings = get_settings(app) task_logger = get_task_logger(__name__) @@ -154,14 +157,18 @@ def execute_process(self, job_id, url, headers=None, notification_email=None): # prepare outputs job.progress = JOB_PROGRESS_GET_OUTPUTS job.save_log(logger=task_logger, message="Fetching job output definitions.") - outputs = [(o.identifier, o.dataType == WPS_COMPLEX_DATA) for o in process.processOutputs] + wps_outputs = [(o.identifier, o.dataType == WPS_COMPLEX_DATA) for o in process.processOutputs] mode = EXECUTE_MODE_ASYNC if job.execute_async else EXECUTE_MODE_SYNC job.progress = JOB_PROGRESS_EXECUTE_REQUEST job.save_log(logger=task_logger, message="Starting job process execution.") job.save_log(logger=task_logger, message="Following updates could take a while until the Application Package answers...") - execution = wps.execute(job.process, inputs=wps_inputs, output=outputs, mode=mode, lineage=True) + + wps_worker = get_pywps_service(environ=settings, is_worker=True) + execution = wps_worker.execute_job(job.process, wps_inputs=wps_inputs, wps_outputs=wps_outputs, + mode=mode, job_uuid=job.id) + ###execution = wps.execute(job.process, inputs=wps_inputs, output=outputs, mode=mode, lineage=True) if not execution.process and execution.errors: raise execution.errors[0] @@ -178,7 +185,7 @@ def execute_process(self, job_id, url, headers=None, notification_email=None): job.status_message = execution.statusMessage or "{} initiation done.".format(str(job)) job.status_location = wps_status_path job.request = execution.request - job.response = etree.tostring(execution.response) + job.response = execution.response job.progress = JOB_PROGRESS_EXECUTE_MONITOR_START job.save_log(logger=task_logger, message="Starting monitoring of job execution.") job = store.update_job(job) @@ -195,9 +202,10 @@ def execute_process(self, job_id, url, headers=None, notification_email=None): # WPS execution logs can be inserted within the current job log and appear continuously. # Only update internal job fields in case they get referenced elsewhere. job.progress = JOB_PROGRESS_EXECUTE_MONITOR_LOOP - execution = check_wps_status(url=wps_status_path, settings=settings, sleep_secs=wait_secs(run_step)) + execution = check_wps_status(location=wps_status_path, settings=settings, + sleep_secs=wait_secs(run_step)) job_msg = (execution.statusMessage or "").strip() - job.response = etree.tostring(execution.response) + job.response = execution.response job.status = map_status(execution.getStatus()) job.status_message = "Job execution monitoring (progress: {}%, status: {})."\ .format(execution.percentCompleted, job_msg or "n/a") @@ -252,10 +260,10 @@ def execute_process(self, job_id, url, headers=None, notification_email=None): job.save_log(logger=task_logger) # Send email if requested - if notification_email is not None: + if job.notification_email is not None: job.progress = JOB_PROGRESS_NOTIFY try: - notify_job_complete(job, notification_email, settings) + notify_job_complete(job, job.notification_email, settings) message = "Notification email sent successfully." job.save_log(logger=task_logger, message=message) except Exception as exc: @@ -407,9 +415,7 @@ def submit_job_handler(request, service_url, is_workflow=False, visibility=None) result = execute_process.delay( job_id=job.id, url=clean_ows_url(service_url), - # Convert EnvironHeaders to a simple dict (should cherry-pick the required headers) - headers={k: v for k, v in request.headers.items()}, - notification_email=notification_email) + headers=dict(request.headers)) LOGGER.debug("Celery pending task [%s] for job [%s].", result.id, job.id) # local/provider process location