Skip to content

Commit

Permalink
add job stdout/stderr captured log of cwl app (relates to #131)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed May 12, 2020
1 parent 4837745 commit cb71b43
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 35 deletions.
12 changes: 7 additions & 5 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ Changes
Changes:
--------

- Improve handling of `WPS-REST` HREF output matching a JSON file with array of URL references corresponding to process
pseudo multiple-output format (relates to `#25 <https://github.com/crim-ca/weaver/issues/25>`_). The output is now
expanded to contain both the original JSON file reference and the extended contained URL references so that either
variation can be retrieved as result.
- Add additional status log for ``EOImage`` input modification with `OpenSearch` during process execution.
- Add captured ``stderr/stdout`` logging of underlying `CWL` application being executed to resulting ``Job`` logs
(addresses first step of `#131 <https://github.com/crim-ca/weaver/issues/131>`_).

Fixes:
------

- Fix handling of WPS-REST output matching a JSON file for multiple-output format specified with a relative local path
as specified by job output location. Only remote HTTP references where correctly parsed. Also avoid failing the job if
the reference JSON parsing fails. It will simply return the original reference URL in this case without expanded data.
the reference JSON parsing fails. It will simply return the original reference URL in this case without expanded data
(relates to `#25 <https://github.com/crim-ca/weaver/issues/25>`_).
- Fix `CWL` job logs to be timezone aware, just like most other logs that will report UTC time.
- Fix JSON response parsing of remote provider processes.

`1.6.0 <https://github.com/crim-ca/weaver/tree/1.6.0>`_ (2020-05-07)
========================================================================
Expand Down
143 changes: 119 additions & 24 deletions weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import shutil
import tempfile
import uuid
from collections import Hashable, OrderedDict # pylint: disable=E0611,no-name-in-module # moved to .abc in Python 3
from copy import deepcopy
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -81,6 +82,8 @@
get_sane_name,
get_settings,
get_url_without_query,
localize_datetime,
now,
null,
str2bytes
)
Expand Down Expand Up @@ -143,15 +146,16 @@
PACKAGE_ARRAY_ITEMS = frozenset(list(PACKAGE_BASE_TYPES) + list(PACKAGE_COMPLEX_TYPES) + list(PACKAGE_CUSTOM_TYPES))
PACKAGE_ARRAY_TYPES = frozenset(["{}[]".format(item) for item in PACKAGE_ARRAY_ITEMS])
PACKAGE_DEFAULT_FILE_NAME = "package"
PACKAGE_LOG_FILE = "package_log_file"
PACKAGE_OUTPUT_HOOK_LOG_UUID = "PACKAGE_OUTPUT_HOOK_LOG_{}"

# process execution progress
PACKAGE_PROGRESS_PREP_LOG = 1
PACKAGE_PROGRESS_LAUNCHING = 2
PACKAGE_PROGRESS_LOADING = 5
PACKAGE_PROGRESS_GET_INPUT = 6
PACKAGE_PROGRESS_ADD_EO_IMAGES = 7
PACKAGE_PROGRESS_CONVERT_INPUT = 8
PACKAGE_PROGRESS_RUN_CWL = 10
PACKAGE_PROGRESS_CWL_RUN = 10
PACKAGE_PROGRESS_CWL_DONE = 95
PACKAGE_PROGRESS_PREP_OUT = 98
PACKAGE_PROGRESS_DONE = 100
Expand Down Expand Up @@ -1641,17 +1645,19 @@ def try_or_raise_package_error(call, reason):

class WpsPackage(Process):
# defined on __init__ call
package = None # type: Optional[CWL]
package = None # type: Optional[CWL]
# defined only after _handler is called (or sub-methods)
package_id = None # type: Optional[AnyStr]
percent = None # type: Optional[Number]
log_file = None # type: Optional[AnyStr]
log_level = logging.INFO # type: int
logger = None # type: Optional[logging.Logger]
step_packages = None # type: Optional[List[CWL]]
step_launched = None # type: Optional[List[AnyStr]]
request = None # type: Optional[WPSRequest]
response = None # type: Optional[ExecuteResponse]
package_id = None # type: Optional[AnyStr]
package_log_hook_stderr = None # type: Optional[AnyStr]
package_log_hook_stdout = None # type: Optional[AnyStr]
percent = None # type: Optional[Number]
log_file = None # type: Optional[AnyStr]
log_level = logging.INFO # type: int
logger = None # type: Optional[logging.Logger]
step_packages = None # type: Optional[List[CWL]]
step_launched = None # type: Optional[List[AnyStr]]
request = None # type: Optional[WPSRequest]
response = None # type: Optional[ExecuteResponse]

def __init__(self, **kw):
"""
Expand Down Expand Up @@ -1689,10 +1695,18 @@ def __init__(self, **kw):
)

def setup_logger(self):
"""
Configures useful loggers to catch most of the common output and/or error messages during package execution.
.. seealso::
:meth:`insert_package_log`
:func:`retrieve_package_job_log`
"""
# file logger for output
self.log_file = get_status_location_log_path(self.status_location)
log_file_handler = logging.FileHandler(self.log_file)
log_file_formatter = logging.Formatter(fmt=get_log_fmt(), datefmt=get_log_date_fmt())
log_file_formatter.converter = lambda *_: localize_datetime(now())
log_file_handler.setFormatter(log_file_formatter)

# prepare package logger
Expand All @@ -1708,11 +1722,82 @@ def setup_logger(self):
cwl_logger.addHandler(log_file_handler)
cwl_logger.setLevel(self.log_level)

# add stderr/stdout CWL hook to capture logs/prints/echos from subprocess execution
# using same file so all kind of message are kept in chronological order of generation
self.package_log_hook_stderr = PACKAGE_OUTPUT_HOOK_LOG_UUID.format(str(uuid.uuid4()))
self.package_log_hook_stdout = PACKAGE_OUTPUT_HOOK_LOG_UUID.format(str(uuid.uuid4()))
package_outputs = self.package.get("outputs")
if isinstance(package_outputs, list):
package_outputs.extend([{"id": self.package_log_hook_stderr, "type": "stderr"},
{"id": self.package_log_hook_stdout, "type": "stdout"}])
else:
package_outputs.update({self.package_log_hook_stderr: {"type": "stderr"},
self.package_log_hook_stdout: {"type": "stdout"}})
self.package.update({"stderr": "stderr.log", "stdout": "stdout.log"})

# add weaver Tweens logger to current package logger
weaver_tweens_logger = logging.getLogger("weaver.tweens")
weaver_tweens_logger.addHandler(log_file_handler)
weaver_tweens_logger.setLevel(self.log_level)

def insert_package_log(self, result):
"""Retrieves additional `CWL` sub-process logs captures to retrieve internal application output and/or errors.
After execution of this method, the `WPS` output log (which can be obtained by :func:`retrieve_package_job_log`)
will have additional ``stderr/stdout`` entries extracted from the underlying application package tool execution.
The outputs and errors are inserted as best as possible in the logical order to make reading of the merged
logs appear as a natural and chronological order. In the event that both output and errors are available, they
are appended one after another as merging in an orderly fashion cannot be guaranteed by outside `CWL` runner.
:param result: output results returned from the `CWL` package instance execution.
.. todo:: improve for realtime updates when using async routine (https://github.com/crim-ca/weaver/issues/131)
.. seealso::
:meth:`setup_logger`
:func:`retrieve_package_job_log`
"""
try:
stderr_file = result.get(self.package_log_hook_stderr, {}).get("location", "").replace("file://", "")
stdout_file = result.get(self.package_log_hook_stdout, {}).get("location", "").replace("file://", "")
with_stderr_file = os.path.isfile(stderr_file)
with_stdout_file = os.path.isfile(stdout_file)
if not with_stdout_file and not with_stderr_file:
self.log_message(STATUS_RUNNING, "Could not retrieve any internal application log.",
level=logging.WARNING)
return
out_log = []
if with_stdout_file:
with open(stdout_file) as app_log_fd:
out_log = app_log_fd.readlines()
if out_log:
out_log = ["----- Captured Log (stdout) -----\n"] + out_log
err_log = []
if with_stderr_file:
with open(stderr_file) as app_log_fd:
err_log = app_log_fd.readlines()
if err_log:
err_log = ["----- Captured Log (stderr) -----\n"] + err_log
if not out_log and not err_log:
self.log_message(STATUS_RUNNING, "Nothing captured from internal application logs.", level=logging.INFO)
return
with open(self.log_file, "r") as pkg_log_fd:
pkg_log = pkg_log_fd.readlines()
cwl_end_index = -1
cwl_end_search = "[cwltool] [job {}] completed".format(self.package_id) # success/permanentFail
for i in reversed(range(len(pkg_log))):
if cwl_end_search in pkg_log[i]:
cwl_end_index = i
break
merged_log = pkg_log[:cwl_end_index] + out_log + err_log + pkg_log[cwl_end_index:]
with open(self.log_file, "w") as pkg_log_fd:
pkg_log_fd.writelines(merged_log)
except Exception as exc:
# log exception, but non-failing
self.exception_message(PackageExecutionError, exception=exc, level=logging.WARNING, status=STATUS_RUNNING,
message="Error occurred when retrieving internal application log.")

def update_status(self, message, progress, status):
# type: (AnyStr, Number, AnyStatusType) -> None
"""Updates the `PyWPS` real job status from a specified parameters."""
Expand Down Expand Up @@ -1742,19 +1827,26 @@ def log_message(self, status, message, progress=None, level=logging.INFO):
message = get_job_log_msg(status=map_status(status), message=message, progress=progress)
self.logger.log(level, message, exc_info=level > logging.INFO)

def exception_message(self, exception_type, exception=None, message="no message"):
# type: (Type[Exception], Optional[Exception], AnyStr) -> Exception
def exception_message(self, exception_type, exception=None, message="no message",
status=STATUS_EXCEPTION, level=logging.ERROR):
# type: (Type[Exception], Optional[Exception], AnyStr, AnyStatusType, int) -> Exception
exception_msg = " [{}]".format(repr(exception)) if isinstance(exception, Exception) else ""
self.log_message(status=STATUS_EXCEPTION,
message="{0}: {1}{2}".format(exception_type.__name__, message, exception_msg),
level=logging.ERROR)
self.log_message(status=status, level=level,
message="{0}: {1}{2}".format(exception_type.__name__, message, exception_msg))
return exception_type("{0}{1}".format(message, exception_msg))

@classmethod
def map_step_progress(cls, step_index, steps_total):
# type: (int, int) -> Number
"""Calculates the percentage progression of a single step of the full process."""
return map_progress(100 * step_index / steps_total, PACKAGE_PROGRESS_RUN_CWL, PACKAGE_PROGRESS_CWL_DONE)
"""Calculates the percentage progression of a single step of the full process.
.. note::
The step procession is adjusted according to delimited start/end of the underlying `CWL` execution to
provide a continuous progress percentage over the complete execution. Otherwise, we would have values
that jump around according to whichever progress the underlying remote `WPS` or monitored `CWL` employs,
if any is provided.
"""
return map_progress(100 * step_index / steps_total, PACKAGE_PROGRESS_CWL_RUN, PACKAGE_PROGRESS_CWL_DONE)

@staticmethod
def make_location_input(input_type, input_definition):
Expand Down Expand Up @@ -1802,8 +1894,6 @@ def _handler(self, request, response):
try:
try:
self.setup_logger()
# self.response.outputs[PACKAGE_LOG_FILE].file = self.log_file
# self.response.outputs[PACKAGE_LOG_FILE].as_reference = True
self.update_status("Preparing package logs done.", PACKAGE_PROGRESS_PREP_LOG, STATUS_RUNNING)
except Exception as exc:
raise self.exception_message(PackageExecutionError, exc, "Failed preparing package logging.")
Expand All @@ -1821,8 +1911,8 @@ def _handler(self, request, response):
loading_context = None

wps_out_dir_prefix = os.path.join(get_wps_output_dir(settings), "tmp")
runtime_context = RuntimeContext(kwargs={
"no_read_only": True, "outdir": self.workdir, "tmp_outdir_prefix": wps_out_dir_prefix})
runtime_args = {"no_read_only": True, "outdir": self.workdir, "tmp_outdir_prefix": wps_out_dir_prefix}
runtime_context = RuntimeContext(kwargs=runtime_args)
try:
package_inst, _, self.step_packages = _load_package_content(self.package,
package_name=self.package_id,
Expand All @@ -1846,6 +1936,9 @@ def _handler(self, request, response):
request.inputs = opensearch.get_original_collection_id(self.payload, request.inputs)
eoimage_data_sources = opensearch.get_eo_images_data_sources(self.payload, request.inputs)
if eoimage_data_sources:
self.update_status("Found EOImage data-source definitions. "
"Updating inputs with OpenSearch sources.",
PACKAGE_PROGRESS_ADD_EO_IMAGES, STATUS_RUNNING)
accept_mime_types = opensearch.get_eo_images_mime_types(self.payload)
opensearch.insert_max_occurs(self.payload, request.inputs)
request.inputs = opensearch.query_eo_images_from_wps_inputs(request.inputs,
Expand Down Expand Up @@ -1887,7 +1980,7 @@ def _handler(self, request, response):
raise self.exception_message(PackageExecutionError, exc, "Failed to load package inputs.")

try:
self.update_status("Running package...", PACKAGE_PROGRESS_RUN_CWL, STATUS_RUNNING)
self.update_status("Running package...", PACKAGE_PROGRESS_CWL_RUN, STATUS_RUNNING)

# Inputs starting with file:// will be interpreted as ems local files
# If OpenSearch obtain file:// references that must be passed to the ADES use an uri starting
Expand All @@ -1897,6 +1990,8 @@ def _handler(self, request, response):
self.update_status("Package execution done.", PACKAGE_PROGRESS_CWL_DONE, STATUS_RUNNING)
except Exception as exc:
raise self.exception_message(PackageExecutionError, exc, "Failed package execution.")
# FIXME: this won't be necessary using async routine (https://github.com/crim-ca/weaver/issues/131)
self.insert_package_log(result)
try:
for output in request.outputs:
# TODO: adjust output for glob patterns (https://github.com/crim-ca/weaver/issues/24)
Expand Down
13 changes: 7 additions & 6 deletions weaver/wps_restapi/processes/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@
JOB_PROGRESS_GET_OUTPUTS = 6
JOB_PROGRESS_EXECUTE_REQUEST = 8
JOB_PROGRESS_EXECUTE_STATUS_LOCATION = 10
JOB_PROGRESS_EXECUTE_MONITOR_START = 20
JOB_PROGRESS_EXECUTE_MONITOR_LOOP = 30
JOB_PROGRESS_EXECUTE_MONITOR_ERROR = 80
JOB_PROGRESS_EXECUTE_MONITOR_START = 15
JOB_PROGRESS_EXECUTE_MONITOR_LOOP = 20
JOB_PROGRESS_EXECUTE_MONITOR_ERROR = 85
JOB_PROGRESS_EXECUTE_MONITOR_END = 90
JOB_PROGRESS_NOTIFY = 95
JOB_PROGRESS_DONE = 100
Expand Down Expand Up @@ -509,9 +509,10 @@ def get_processes(request):
response_body.update({"providers": providers})
for i, provider in enumerate(providers):
provider_id = get_any_id(provider)
processes = requests.request("GET", "{host}/providers/{provider_id}/processes"
.format(host=request.host_url, provider_id=provider_id),
headers=request.headers, cookies=request.cookies)
response = requests.request("GET", "{host}/providers/{provider_id}/processes"
.format(host=request.host_url, provider_id=provider_id),
headers=request.headers, cookies=request.cookies)
processes = response.json().get("processes", [])
response_body["providers"][i].update({
"processes": processes if detail else [get_any_id(p) for p in processes]
})
Expand Down

0 comments on commit cb71b43

Please sign in to comment.