Skip to content

Commit

Permalink
partially address issues #21 and #126 - celery worker running the job…
Browse files Browse the repository at this point in the history
…s instead of weaver api manager
  • Loading branch information
fmigneault committed Feb 2, 2021
1 parent 2c0dc95 commit cf397bf
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Changes:
--------
- Add support of YAML format for loading ``weaver.data_sources`` definition.
- Pre-install ``Docker`` CLI in ``worker`` image to avoid bad practice of mounting it from the host.
- Adjust WPS request dispatching such that process jobs get executed by ``Celery`` worker as intended
(see `#21 <https://github.com/crim-ca/weaver/issues/21>`_ and `#126 <https://github.com/crim-ca/weaver/issues/126>`_).

Fixes:
------
Expand Down
117 changes: 116 additions & 1 deletion tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import contextlib
import logging
import os
import tempfile
import time
import unittest
from copy import deepcopy
Expand Down Expand Up @@ -47,7 +48,7 @@
IANA_NAMESPACE,
get_cwl_file_format
)
from weaver.processes.constants import CWL_REQUIREMENT_APP_BUILTIN
from weaver.processes.constants import CWL_REQUIREMENT_APP_BUILTIN, CWL_REQUIREMENT_APP_DOCKER
from weaver.status import STATUS_RUNNING, STATUS_SUCCEEDED
from weaver.utils import get_any_value
from weaver.visibility import VISIBILITY_PUBLIC
Expand Down Expand Up @@ -1502,3 +1503,117 @@ def test_execute_application_package_process_with_bucket(self):
assert not os.path.exists(os.path.join(wps_outdir, job_id, out_file))
assert not os.path.exists(os.path.join(wps_outdir, wps_uuid, out_file))
assert os.path.isfile(os.path.join(wps_outdir, "{}.xml".format(job_id)))


@pytest.mark.functional
class WpsPackageDockerAppTest(WpsPackageConfigBase):
@classmethod
def setUpClass(cls):
cls.settings = {
"weaver.wps": True,
"weaver.wps_output": True,
"weaver.wps_output_path": "/wpsoutputs",
"weaver.wps_output_dir": "/tmp", # nosec: B108 # don't care hardcoded for test
"weaver.wps_path": "/ows/wps",
"weaver.wps_restapi_path": "/",
}
super(WpsPackageDockerAppTest, cls).setUpClass()

def test_execute_application_package_process_docker(self):
"""
Test validates that basic Docker application runs successfully, fetching the reference as needed.
"""
out_key = "output"
out_file = "output.txt"
cwl = {
"cwlVersion": "v1.0",
"class": "CommandLineTool",
"baseCommand": "cat",
"requirements": {
CWL_REQUIREMENT_APP_DOCKER: {
"dockerPull": "debian:stretch-slim"
}
},
"inputs": [
{"id": "file", "type": "File", "inputBinding": {"position": 1}},
],
"outputs": [
{"id": out_key, "type": "File", "outputBinding": {"glob": out_file}},
]
}
body = {
"processDescription": {
"process": {"id": self._testMethodName}
},
"deploymentProfileName": "http://www.opengis.net/profiles/eoc/dockerizedApplication",
"executionUnit": [{"unit": cwl}],
}
self.deploy_process(body)

test_content = "Test file in Docker"
with contextlib.ExitStack() as stack_proc:
# setup
dir_name = tempfile.gettempdir()
tmp_file = stack_proc.enter_context(tempfile.NamedTemporaryFile(dir=dir_name, mode="w", suffix=".txt"))
tmp_file.write(test_content)
tmp_file.seek(0)
exec_body = {
"mode": EXECUTE_MODE_ASYNC,
"response": EXECUTE_RESPONSE_DOCUMENT,
"inputs": [
{"id": "file", "href": tmp_file.name},
],
"outputs": [
{"id": out_key, "transmissionMode": "reference"},
]
}
for process in mocked_execute_process():
stack_proc.enter_context(process)

# execute
proc_url = "/processes/{}/jobs".format(self._testMethodName)
resp = mocked_sub_requests(self.app, "post_json", proc_url,
params=exec_body, headers=self.json_headers, only_local=True)
assert resp.status_code in [200, 201], "Failed with: [{}]\nReason:\n{}".format(resp.status_code, resp.json)
status_url = resp.json["location"]
job_id = resp.json["jobID"]

# job monitoring
monitor_timeout = 60
time.sleep(1) # small delay to ensure process started
while monitor_timeout >= 0:
resp = self.app.get(status_url, headers=self.json_headers)
assert resp.status_code == 200
assert resp.json["status"] in [STATUS_RUNNING, STATUS_SUCCEEDED]
if resp.json["status"] == STATUS_SUCCEEDED:
break
time.sleep(1)
assert resp.json["status"] == STATUS_SUCCEEDED
resp = self.app.get("{}/result".format(status_url), headers=self.json_headers)
assert resp.status_code == 200

# check that output is HTTP reference to file
output_values = {out["id"]: get_any_value(out) for out in resp.json["outputs"]}
assert len(output_values) == 1
wps_uuid = self.job_store.fetch_by_id(job_id).wps_id
wps_out_ref = "localhost/{}/{}".format(self.settings["weaver.wps_output_path"], wps_uuid)
wps_output = "{}/{}".format(wps_out_ref, wps_uuid, out_file)
assert output_values[out_key] == wps_output

# check that actual output file was created in expected location along with XML job status
wps_outdir = self.settings["weaver.wps_output_dir"]
wps_out_file = os.path.join(wps_outdir, job_id, out_file)
assert not os.path.exists(os.path.join(wps_outdir, out_file)), "File must be in job subdir, not wps out dir."
# job log, XML status and output directory can be retrieved with both Job UUID and underlying WPS UUID reference
assert os.path.isfile(os.path.join(wps_outdir, "{}.log".format(wps_uuid)))
assert os.path.isfile(os.path.join(wps_outdir, "{}.xml".format(wps_uuid)))
assert os.path.isfile(os.path.join(wps_outdir, wps_uuid, out_file))
assert os.path.isfile(os.path.join(wps_outdir, "{}.log".format(job_id)))
assert os.path.isfile(os.path.join(wps_outdir, "{}.xml".format(job_id)))
assert os.path.isfile(wps_out_file)

# validate content
with open(wps_out_file) as res_file:
assert res_file.read() == test_content
4 changes: 2 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ def id(self):

task = MockTask()

def mock_execute_process(job_id, url, headers, notification_email):
execute_process(job_id, url, headers, notification_email)
def mock_execute_process(job_id, url, headers):
execute_process(job_id, url, headers)
return task

return (
Expand Down
13 changes: 9 additions & 4 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import TYPE_CHECKING

import six
import lxml.etree
from dateutil.parser import parse as dt_parse # noqa
from owslib.wps import WPSException
from pywps import Process as ProcessWPS
Expand Down Expand Up @@ -545,25 +546,29 @@ def access(self, visibility):
@property
def request(self):
# type: () -> Optional[AnyStr]
"""XML request for WPS execution submission as string."""
"""XML request for WPS execution submission as string (binary)."""
return self.get("request", None)

@request.setter
def request(self, request):
# type: (Optional[AnyStr]) -> None
"""XML request for WPS execution submission as string."""
"""XML request for WPS execution submission as string (binary)."""
if isinstance(request, lxml.etree._Element): # noqa
request = lxml.etree.tostring(request)
self["request"] = request

@property
def response(self):
# type: () -> Optional[AnyStr]
"""XML status response from WPS execution submission as string."""
"""XML status response from WPS execution submission as string (binary)."""
return self.get("response", None)

@response.setter
def response(self, response):
# type: (Optional[AnyStr]) -> None
"""XML status response from WPS execution submission as string."""
"""XML status response from WPS execution submission as string (binary)."""
if isinstance(response, lxml.etree._Element): # noqa
response = lxml.etree.tostring(response)
self["response"] = response

def _job_url(self, settings):
Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/wps1_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def execute(self, workflow_inputs, out_dir, expected_outputs):
if num_retries >= max_retries:
raise Exception("Could not read status document after {} retries. Giving up.".format(max_retries))
try:
execution = check_wps_status(url=execution.statusLocation, verify=self.verify,
execution = check_wps_status(location=execution.statusLocation, verify=self.verify,
sleep_secs=wait_secs(run_step))
job_id = execution.statusLocation.replace(".xml", "").split("/")[-1]
LOGGER.debug(get_log_monitor_msg(job_id, status.map_status(execution.getStatus()),
Expand Down
6 changes: 6 additions & 0 deletions weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,12 @@ def update_status(self, message, progress, status):
pywps_status = map_status(status, STATUS_COMPLIANT_PYWPS)
pywps_status_id = STATUS_PYWPS_IDS[pywps_status]

# NOTE:
# When running process in sync (because executed within celery worker already async),
# pywps reverts status file output flag. Re-enforce it for our needs.
# (see: 'wevaer.wps.WorkerService.execute_job')
self.response.store_status_file = True

# pywps overrides 'status' by 'accepted' in 'update_status', so use the '_update_status' to enforce the status
# using protected method also avoids weird overrides of progress percent on failure and final 'success' status
self.response._update_status(pywps_status_id, message, self.percent) # noqa: W0212
Expand Down
2 changes: 2 additions & 0 deletions weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ def save_job(self,
service=None, # type: Optional[AnyStr]
inputs=None, # type: Optional[List[Any]]
is_workflow=False, # type: bool
is_local=False, # type: bool
user_id=None, # type: Optional[int]
execute_async=True, # type: bool
custom_tags=None, # type: Optional[List[AnyStr]]
Expand Down Expand Up @@ -409,6 +410,7 @@ def save_job(self,
"status": map_status(STATUS_ACCEPTED),
"execute_async": execute_async,
"is_workflow": is_workflow,
"is_local": is_local,
"created": now(),
"tags": list(set(tags)),
"access": access,
Expand Down
11 changes: 6 additions & 5 deletions weaver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,9 +748,7 @@ def fetch_file(file_reference, file_outdir, settings=None, **request_kwargs):
file_path = os.path.join(file_outdir, file_name)
if file_reference.startswith("file://"):
file_reference = file_reference[7:]
LOGGER.debug("Fetch file resolved:\n"
" Reference: [%s]\n"
" File Path: [%s]", file_href, file_path)
LOGGER.debug("Fetching file reference: [%s]", file_href)
if os.path.isfile(file_reference):
LOGGER.debug("Fetch file resolved as local reference.")
# NOTE:
Expand All @@ -760,7 +758,8 @@ def fetch_file(file_reference, file_outdir, settings=None, **request_kwargs):
# Do symlink operation by hand instead of with argument to have Python-2 compatibility.
if os.path.islink(file_reference):
os.symlink(os.readlink(file_reference), file_path)
else:
# otherwise copy the file if not already available
elif not os.path.isfile(file_path) or os.path.realpath(file_path) != os.path.realpath(file_reference):
shutil.copyfile(file_reference, file_path)
elif file_reference.startswith("s3://"):
LOGGER.debug("Fetch file resolved as S3 bucket reference.")
Expand Down Expand Up @@ -798,7 +797,9 @@ def fetch_file(file_reference, file_outdir, settings=None, **request_kwargs):
scheme = "<none>" if len(scheme) < 2 else scheme[0]
raise ValueError("Unresolved fetch file scheme: '{!s}', supported: {}"
.format(scheme, list(SUPPORTED_FILE_SCHEMES)))
LOGGER.debug("Fetch file written")
LOGGER.debug("Fetch file resolved:\n"
" Reference: [%s]\n"
" File Path: [%s]", file_href, file_path)
return file_path


Expand Down
Loading

0 comments on commit cf397bf

Please sign in to comment.