diff --git a/.github/workflows/push-pr_workflow.yml b/.github/workflows/push-pr_workflow.yml
index 4b5de2373..1d3c9d958 100644
--- a/.github/workflows/push-pr_workflow.yml
+++ b/.github/workflows/push-pr_workflow.yml
@@ -9,12 +9,29 @@ jobs:
if: github.event_name == 'pull_request'
steps:
- - uses: actions/checkout@v1
+ - name: Checkout code
+ uses: actions/checkout@v2
+ with:
+ fetch-depth: 0 # Checkout the whole history, in case the target is way far behind
+
+ - name: Check if target branch has been merged
+ run: |
+ if git merge-base --is-ancestor ${{ github.event.pull_request.base.sha }} ${{ github.sha }}; then
+ echo "Target branch has been merged into the source branch."
+ else
+ echo "Target branch has not been merged into the source branch. Please merge in target first."
+ exit 1
+ fi
- name: Check that CHANGELOG has been updated
run: |
# If this step fails, this means you haven't updated the CHANGELOG.md file with notes on your contribution.
- git diff --name-only ${{ github.event.pull_request.base.sha }} ${{ github.sha }} | grep '^CHANGELOG.md$' && echo "Thanks for helping keep our CHANGELOG up-to-date!"
+ if git diff --name-only ${{ github.event.pull_request.base.sha }} ${{ github.sha }} | grep -q '^CHANGELOG.md$'; then
+ echo "Thanks for helping keep our CHANGELOG up-to-date!"
+ else
+ echo "Please update the CHANGELOG.md file with notes on your contribution."
+ exit 1
+ fi
Lint:
runs-on: ubuntu-latest
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b69525389..21b4427b1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,40 @@ All notable changes to Merlin will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [1.12.2b1]
+### Added
+- Conflict handler option to the `dict_deep_merge` function in `utils.py`
+- Ability to add module-specific pytest fixtures
+- Added fixtures specifically for testing status functionality
+- Added tests for reading and writing status files, and status conflict handling
+- Added tests for the `dict_deep_merge` function
+- Pytest-mock as a dependency for the test suite (necessary for using mocks and fixtures in the same test)
+- New github action test to make sure target branch has been merged into the source first, so we know histories are ok
+- Check in the status commands to make sure we're not pulling statuses from nested workspaces
+- Added `setuptools` as a requirement for python 3.12 to recognize the `pkg_resources` library
+- Patch to celery results backend to stop ChordErrors being raised and breaking workflows when a single task fails
+- New step return code `$(MERLIN_RAISE_ERROR)` to force an error to be raised by a task (mainly for testing)
+ - Added description of this to docs
+- New test to ensure a single failed task won't break a workflow
+
+### Changed
+- `merlin info` is cleaner and gives python package info
+- merlin version now prints with every banner message
+- Applying filters for `merlin detailed-status` will now log debug statements instead of warnings
+- Modified the unit tests for the `merlin status` command to use pytest rather than unittest
+- Added fixtures for `merlin status` tests that copy the workspace to a temporary directory so you can see exactly what's run in a test
+- Batch block and workers now allow for variables to be used in node settings
+- Task id is now the path to the directory
+
+### Fixed
+- Bugfix for output of `merlin example openfoam_wf_singularity`
+- A bug with the CHANGELOG detection test when the target branch isn't in the ci runner history
+- Link to Merlin banner in readme
+- Issue with escape sequences in ascii art (caught by python 3.12)
+- Bug where Flux wasn't identifying total number of nodes on an allocation
+ - Not supporting Flux versions below 0.17.0
+
+
## [1.12.1]
### Added
- New Priority.RETRY value for the Celery task priorities. This will be the new highest priority.
diff --git a/Makefile b/Makefile
index cfd4cea29..08fb7d0f8 100644
--- a/Makefile
+++ b/Makefile
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/README.md b/README.md
index e0f1ca4ff..e47f7744b 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@
[](https://github.com/LLNL/merlin/pulls)
[](https://lgtm.com/projects/g/LLNL/merlin/context:python)
-
+
## A brief introduction to Merlin
Merlin is a tool for running machine learning based workflows. The goal of
diff --git a/docs/tutorial/4_run_simulation.md b/docs/tutorial/4_run_simulation.md
index d1f596d94..7d1287c08 100644
--- a/docs/tutorial/4_run_simulation.md
+++ b/docs/tutorial/4_run_simulation.md
@@ -72,8 +72,8 @@ In the `openfoam_wf_singularity` directory you should see the following:
`$(MERLIN_SOFT_FAIL)`| Mark this step as a failure, note in the warning log but keep executing the workflow. Unknown return codes get translated to soft fails, so that they can be logged. |
echo "Uh-oh, this sample didn't work"exit $(MERLIN_SOFT_FAIL)
|
| `$(MERLIN_HARD_FAIL)`| Something went terribly wrong and we need to stop the whole workflow. Raises a `HardFailException` and stops all workers connected to that step. Workers will stop after a 60 second delay to allow the step to be acknowledged by the server.
Note
Workers in isolated parts of the workflow not consuming from the bad step will continue. you can stop all workers with `$(MERLIN_STOP_WORKERS)`
echo "Oh no, we've created skynet! Abort!"exit $(MERLIN_HARD_FAIL)
|
| `$(MERLIN_STOP_WORKERS)`| Launch a task to stop all active workers. To allow the current task to finish and acknowledge the results to the server, will happen in 60 seconds. |
# send a signal to all workers to stopexit $(MERLIN_STOP_WORKERS)
|
+| `$(MERLIN_RAISE_ERROR)`| Purposefully raise a general exception. *This is intended to be used for testing, you'll likely want to use `$(MERLIN_SOFT_FAIL)` instead.* |
# send a signal to raise an exceptionexit $(MERLIN_RAISE_ERROR)
|
diff --git a/merlin/__init__.py b/merlin/__init__.py
index b765c71f2..a2d173a8f 100644
--- a/merlin/__init__.py
+++ b/merlin/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -38,7 +38,7 @@
import sys
-__version__ = "1.12.1"
+__version__ = "1.12.2b1"
VERSION = __version__
PATH_TO_PROJ = os.path.join(os.path.dirname(__file__), "")
diff --git a/merlin/ascii_art.py b/merlin/ascii_art.py
index 1f8b04229..5c90a4b12 100644
--- a/merlin/ascii_art.py
+++ b/merlin/ascii_art.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -30,6 +30,9 @@
# pylint: skip-file
+from merlin import VERSION
+
+
"""
Holds ascii art strings.
"""
@@ -93,11 +96,11 @@
__ __ _ _
- | \/ | | (_)
- | \ / | ___ _ __| |_ _ __
- | |\/| |/ _ \ '__| | | '_ \
+ | \\/ | | (_)
+ | \\ / | ___ _ __| |_ _ __
+ | |\\/| |/ _ \\ '__| | | '_ \\
| | | | __/ | | | | | | |
- |_| |_|\___|_| |_|_|_| |_|
+ |_| |_|\\___|_| |_|_|_| |_|
Machine Learning for HPC Workflows
@@ -127,6 +130,7 @@ def _make_banner():
for hat_line, name_line in zip(hat_lines, name_lines):
banner = banner + hat_line + name_line + "\n"
+ banner = banner + f" v. {VERSION}\n"
return banner
diff --git a/merlin/celery.py b/merlin/celery.py
index d35b0dccd..eb10f1a12 100644
--- a/merlin/celery.py
+++ b/merlin/celery.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -36,8 +36,10 @@
from typing import Dict, Optional, Union
import billiard
+import celery
import psutil
-from celery import Celery
+from celery import Celery, states
+from celery.backends.redis import RedisBackend # noqa: F401 ; Needed for celery patch
from celery.signals import worker_process_init
import merlin.common.security.encrypt_backend_traffic
@@ -50,6 +52,37 @@
LOG: logging.Logger = logging.getLogger(__name__)
+def patch_celery():
+ """
+ Patch redis backend so that errors in chords don't break workflows.
+ Celery has error callbacks but they do not work properly on chords that
+ are nested within chains.
+
+ Credit to this function goes to: https://danidee10.github.io/2019/07/09/celery-chords.html
+ """
+
+ def _unpack_chord_result(
+ self,
+ tup,
+ decode,
+ EXCEPTION_STATES=states.EXCEPTION_STATES,
+ PROPAGATE_STATES=states.PROPAGATE_STATES,
+ ):
+ _, tid, state, retval = decode(tup)
+
+ if state in EXCEPTION_STATES:
+ retval = self.exception_to_python(retval)
+ if state in PROPAGATE_STATES:
+ # retval is an Exception
+ retval = f"{retval.__class__.__name__}: {str(retval)}"
+
+ return retval
+
+ celery.backends.redis.RedisBackend._unpack_chord_result = _unpack_chord_result
+
+ return celery
+
+
# This function has to have specific args/return values for celery so ignore pylint
def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=W0613,R1710
"""
@@ -82,7 +115,7 @@ def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: dis
RESULTS_BACKEND_URI = None
# initialize app with essential properties
-app: Celery = Celery(
+app: Celery = patch_celery().Celery(
"merlin",
broker=BROKER_URI,
backend=RESULTS_BACKEND_URI,
diff --git a/merlin/common/__init__.py b/merlin/common/__init__.py
index c76918410..57477ea1f 100644
--- a/merlin/common/__init__.py
+++ b/merlin/common/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/common/abstracts/__init__.py b/merlin/common/abstracts/__init__.py
index c76918410..57477ea1f 100644
--- a/merlin/common/abstracts/__init__.py
+++ b/merlin/common/abstracts/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/common/abstracts/enums/__init__.py b/merlin/common/abstracts/enums/__init__.py
index 61fecf7a8..a95ba0872 100644
--- a/merlin/common/abstracts/enums/__init__.py
+++ b/merlin/common/abstracts/enums/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -48,3 +48,4 @@ class ReturnCode(IntEnum):
DRY_OK = 103
RETRY = 104
STOP_WORKERS = 105
+ RAISE_ERROR = 106
diff --git a/merlin/common/dumper.py b/merlin/common/dumper.py
index 457c62b49..96a940357 100644
--- a/merlin/common/dumper.py
+++ b/merlin/common/dumper.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1
+# This file is part of Merlin, Version: 1.12.2b1
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/common/openfilelist.py b/merlin/common/openfilelist.py
index 4030dfe75..16aa2f87c 100644
--- a/merlin/common/openfilelist.py
+++ b/merlin/common/openfilelist.py
@@ -8,7 +8,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/common/opennpylib.py b/merlin/common/opennpylib.py
index 6d719ea5e..872699ae1 100644
--- a/merlin/common/opennpylib.py
+++ b/merlin/common/opennpylib.py
@@ -8,7 +8,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/common/sample_index.py b/merlin/common/sample_index.py
index 8e42aeac3..c7808bd3b 100644
--- a/merlin/common/sample_index.py
+++ b/merlin/common/sample_index.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/common/sample_index_factory.py b/merlin/common/sample_index_factory.py
index a36a15ef0..eb4fbcc61 100644
--- a/merlin/common/sample_index_factory.py
+++ b/merlin/common/sample_index_factory.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/common/security/__init__.py b/merlin/common/security/__init__.py
index c76918410..57477ea1f 100644
--- a/merlin/common/security/__init__.py
+++ b/merlin/common/security/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/common/security/encrypt.py b/merlin/common/security/encrypt.py
index b0ad464b6..b1932cd28 100644
--- a/merlin/common/security/encrypt.py
+++ b/merlin/common/security/encrypt.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/common/security/encrypt_backend_traffic.py b/merlin/common/security/encrypt_backend_traffic.py
index c59c19ebd..d597f084b 100644
--- a/merlin/common/security/encrypt_backend_traffic.py
+++ b/merlin/common/security/encrypt_backend_traffic.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py
index 33afb3316..143d3bf12 100644
--- a/merlin/common/tasks.py
+++ b/merlin/common/tasks.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -41,6 +41,7 @@
from celery import chain, chord, group, shared_task, signature
from celery.exceptions import MaxRetriesExceededError, OperationalError, TimeoutError # pylint: disable=W0622
from filelock import FileLock, Timeout
+from redis.exceptions import TimeoutError as RedisTimeoutError
from merlin.common.abstracts.enums import ReturnCode
from merlin.common.sample_index import uniform_directories
@@ -49,7 +50,7 @@
from merlin.exceptions import HardFailException, InvalidChainException, RestartException, RetryException
from merlin.router import stop_workers
from merlin.spec.expansion import parameter_substitutions_for_cmd, parameter_substitutions_for_sample
-from merlin.study.status import read_status
+from merlin.study.status import read_status, status_conflict_handler
from merlin.utils import dict_deep_merge
@@ -62,6 +63,7 @@
RetryException,
RestartException,
FileNotFoundError,
+ RedisTimeoutError,
)
LOG = logging.getLogger(__name__)
@@ -181,6 +183,9 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq
shutdown = shutdown_workers.s(None)
shutdown.set(queue=step.get_task_queue())
shutdown.apply_async(countdown=STOP_COUNTDOWN)
+ elif result == ReturnCode.RAISE_ERROR:
+ LOG.warning("*** Raising an error ***")
+ raise Exception("Exception raised by request from the user")
else:
LOG.warning(f"**** Step '{step_name}' in '{step_dir}' had unhandled exit code {result}. Continuing with workflow.")
@@ -309,6 +314,7 @@ def add_merlin_expanded_chain_to_chord( # pylint: disable=R0913,R0914
top_lvl_workspace=top_lvl_workspace,
)
new_step.set(queue=step.get_task_queue())
+ new_step.set(task_id=os.path.join(workspace, relative_paths[sample_id]))
new_chain.append(new_step)
all_chains.append(new_chain)
@@ -377,7 +383,12 @@ def add_simple_chain_to_chord(self, task_type, chain_, adapter_config):
# based off of the parameter substitutions and relative_path for
# a given sample.
- new_steps = [task_type.s(step, adapter_config=adapter_config).set(queue=step.get_task_queue())]
+ new_steps = [
+ task_type.s(step, adapter_config=adapter_config).set(
+ queue=step.get_task_queue(),
+ task_id=step.get_workspace(),
+ )
+ ]
all_chains.append(new_steps)
chain_1d = get_1d_chain(all_chains)
launch_chain(self, chain_1d)
@@ -484,7 +495,7 @@ def gather_statuses(
# Make sure the status for this sample workspace is in a finished state (not initialized or running)
if status[step_name][f"{condensed_workspace}/{path}"]["status"] not in ("INITIALIZED", "RUNNING"):
# Add the status data to the statuses we'll write to the condensed file and remove this status file
- dict_deep_merge(condensed_statuses, status)
+ dict_deep_merge(condensed_statuses, status, conflict_handler=status_conflict_handler)
files_to_remove.append(status_filepath)
files_to_remove.append(lock_filepath) # Remove the lock files as well as the status files
except KeyError:
@@ -556,7 +567,7 @@ def condense_status_files(self, *args: Any, **kwargs: Any) -> ReturnCode: # pyl
existing_condensed_statuses = json.load(condensed_status_file)
# Merging the statuses we're condensing into the already existing statuses
# because it's faster at scale than vice versa
- dict_deep_merge(existing_condensed_statuses, condensed_statuses)
+ dict_deep_merge(existing_condensed_statuses, condensed_statuses, conflict_handler=status_conflict_handler)
condensed_statuses = existing_condensed_statuses
# Write the condensed statuses to the condensed status file
diff --git a/merlin/common/util_sampling.py b/merlin/common/util_sampling.py
index 3ddc47d65..0a6c585cf 100644
--- a/merlin/common/util_sampling.py
+++ b/merlin/common/util_sampling.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/config/__init__.py b/merlin/config/__init__.py
index 6768ee58d..41645e249 100644
--- a/merlin/config/__init__.py
+++ b/merlin/config/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/config/broker.py b/merlin/config/broker.py
index e1c9b8cfc..dc8131c28 100644
--- a/merlin/config/broker.py
+++ b/merlin/config/broker.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/config/celeryconfig.py b/merlin/config/celeryconfig.py
index 652e76db8..5794599dc 100644
--- a/merlin/config/celeryconfig.py
+++ b/merlin/config/celeryconfig.py
@@ -10,7 +10,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/config/configfile.py b/merlin/config/configfile.py
index c01c1554a..1634b833f 100644
--- a/merlin/config/configfile.py
+++ b/merlin/config/configfile.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/config/results_backend.py b/merlin/config/results_backend.py
index 83de7e457..259e249a6 100644
--- a/merlin/config/results_backend.py
+++ b/merlin/config/results_backend.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/config/utils.py b/merlin/config/utils.py
index b7063577e..46672ba1f 100644
--- a/merlin/config/utils.py
+++ b/merlin/config/utils.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/data/celery/__init__.py b/merlin/data/celery/__init__.py
index c76918410..57477ea1f 100644
--- a/merlin/data/celery/__init__.py
+++ b/merlin/data/celery/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/display.py b/merlin/display.py
index 3a7cd66c3..a1af0ac28 100644
--- a/merlin/display.py
+++ b/merlin/display.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -32,9 +32,9 @@
Manages formatting for displaying information to the console.
"""
import logging
+import os
import pprint
import shutil
-import subprocess
import time
import traceback
from datetime import datetime
@@ -46,6 +46,7 @@
from merlin.ascii_art import banner_small
from merlin.study.status_renderers import status_renderer_factory
+from merlin.utils import get_package_versions
LOG = logging.getLogger("merlin")
@@ -212,9 +213,9 @@ def display_multiple_configs(files, configs):
# Might use args here in the future so we'll disable the pylint warning for now
def print_info(args): # pylint: disable=W0613
"""
- Provide version and location information about python and pip to
- facilitate user troubleshooting. 'merlin info' is a CLI tool only for
- developer versions of Merlin.
+ Provide version and location information about python and packages to
+ facilitate user troubleshooting. Also provides info about server connections
+ and configurations.
:param `args`: parsed CLI arguments
"""
@@ -225,14 +226,11 @@ def print_info(args): # pylint: disable=W0613
print("Python Configuration")
print("-" * 25)
print("")
- info_calls = ["which python3", "python3 --version", "which pip3", "pip3 --version"]
- info_str = ""
- for cmd in info_calls:
- info_str += 'echo " $ ' + cmd + '" && ' + cmd + "\n"
- info_str += "echo \n"
- info_str += r"echo \"echo \$PYTHONPATH\" && echo $PYTHONPATH"
- _ = subprocess.run(info_str, shell=True)
- print("")
+ package_list = ["pip", "merlin", "maestrowf", "celery", "kombu", "amqp", "redis"]
+ package_versions = get_package_versions(package_list)
+ print(package_versions)
+ pythonpath = os.environ.get("PYTHONPATH")
+ print(f"$PYTHONPATH: {pythonpath}")
def display_status_task_by_task(status_obj: "DetailedStatus", test_mode: bool = False): # noqa: F821
diff --git a/merlin/examples/__init__.py b/merlin/examples/__init__.py
index c76918410..57477ea1f 100644
--- a/merlin/examples/__init__.py
+++ b/merlin/examples/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/examples/examples.py b/merlin/examples/examples.py
index c50b11b99..371471550 100644
--- a/merlin/examples/examples.py
+++ b/merlin/examples/examples.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/examples/generator.py b/merlin/examples/generator.py
index 942ef9011..a553d703b 100644
--- a/merlin/examples/generator.py
+++ b/merlin/examples/generator.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/examples/workflows/openfoam_wf_singularity/openfoam_wf.yaml b/merlin/examples/workflows/openfoam_wf_singularity/openfoam_wf_singularity.yaml
similarity index 100%
rename from merlin/examples/workflows/openfoam_wf_singularity/openfoam_wf.yaml
rename to merlin/examples/workflows/openfoam_wf_singularity/openfoam_wf_singularity.yaml
diff --git a/merlin/exceptions/__init__.py b/merlin/exceptions/__init__.py
index 89fe89a13..572135aec 100644
--- a/merlin/exceptions/__init__.py
+++ b/merlin/exceptions/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -42,7 +42,6 @@
"HardFailException",
"InvalidChainException",
"RestartException",
- "DeepMergeException",
"NoWorkersException",
)
@@ -96,16 +95,6 @@ def __init__(self):
super().__init__()
-class DeepMergeException(Exception):
- """
- Exception to signal that there's a conflict when trying
- to merge two dicts together
- """
-
- def __init__(self, message):
- super().__init__(message)
-
-
class NoWorkersException(Exception):
"""
Exception to signal that no workers were started
diff --git a/merlin/log_formatter.py b/merlin/log_formatter.py
index b90660f9c..6cd6a745a 100644
--- a/merlin/log_formatter.py
+++ b/merlin/log_formatter.py
@@ -8,7 +8,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/main.py b/merlin/main.py
index 26274a729..4bb005985 100644
--- a/merlin/main.py
+++ b/merlin/main.py
@@ -8,7 +8,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/merlin_templates.py b/merlin/merlin_templates.py
index 8afb7b9c8..5253c79e4 100644
--- a/merlin/merlin_templates.py
+++ b/merlin/merlin_templates.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/router.py b/merlin/router.py
index ec3c83acf..d9114bbcd 100644
--- a/merlin/router.py
+++ b/merlin/router.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/server/__init__.py b/merlin/server/__init__.py
index 1a7410485..522d67d1d 100644
--- a/merlin/server/__init__.py
+++ b/merlin/server/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
diff --git a/merlin/server/server_commands.py b/merlin/server/server_commands.py
index 65d17c42b..be2b944a0 100644
--- a/merlin/server/server_commands.py
+++ b/merlin/server/server_commands.py
@@ -8,7 +8,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/server/server_config.py b/merlin/server/server_config.py
index f4d5d5174..f58c7567a 100644
--- a/merlin/server/server_config.py
+++ b/merlin/server/server_config.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/server/server_util.py b/merlin/server/server_util.py
index db19866e5..aa7c2765b 100644
--- a/merlin/server/server_util.py
+++ b/merlin/server/server_util.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/spec/__init__.py b/merlin/spec/__init__.py
index c76918410..57477ea1f 100644
--- a/merlin/spec/__init__.py
+++ b/merlin/spec/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/spec/all_keys.py b/merlin/spec/all_keys.py
index 10fc8646c..fbb70f8d7 100644
--- a/merlin/spec/all_keys.py
+++ b/merlin/spec/all_keys.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/spec/defaults.py b/merlin/spec/defaults.py
index 01ba8c743..6c9bd9c09 100644
--- a/merlin/spec/defaults.py
+++ b/merlin/spec/defaults.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/spec/expansion.py b/merlin/spec/expansion.py
index a8bf3ac43..ac514a369 100644
--- a/merlin/spec/expansion.py
+++ b/merlin/spec/expansion.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -57,6 +57,7 @@
"MERLIN_HARD_FAIL",
"MERLIN_RETRY",
"MERLIN_STOP_WORKERS",
+ "MERLIN_RAISE_ERROR",
}
MERLIN_RESERVED = STEP_AWARE | PROVENANCE_REPLACE
RESERVED = MAESTRO_RESERVED | MERLIN_RESERVED
@@ -215,6 +216,7 @@ def parameter_substitutions_for_cmd(glob_path, sample_paths):
substitutions.append(("$(MERLIN_HARD_FAIL)", str(int(ReturnCode.HARD_FAIL))))
substitutions.append(("$(MERLIN_RETRY)", str(int(ReturnCode.RETRY))))
substitutions.append(("$(MERLIN_STOP_WORKERS)", str(int(ReturnCode.STOP_WORKERS))))
+ substitutions.append(("$(MERLIN_RAISE_ERROR)", str(int(ReturnCode.RAISE_ERROR))))
return substitutions
diff --git a/merlin/spec/merlinspec.json b/merlin/spec/merlinspec.json
index 4b8ca3633..7e9c912e4 100644
--- a/merlin/spec/merlinspec.json
+++ b/merlin/spec/merlinspec.json
@@ -221,7 +221,8 @@
"nodes": {
"anyOf": [
{"type": "null"},
- {"type": "integer", "minimum": 1}
+ {"type": "integer", "minimum": 1},
+ {"type": "string", "pattern": "^\\$\\(\\w+\\)$"}
]
},
"batch": {
@@ -279,7 +280,12 @@
"launch_pre": {"type": "string", "minLength": 1},
"launch_args": {"type": "string", "minLength": 1},
"worker_launch": {"type": "string", "minLength": 1},
- "nodes": {"type": "integer", "minimum": 1},
+ "nodes": {
+ "anyOf": [
+ {"type": "integer", "minimum": 1},
+ {"type": "string","pattern": "^\\$\\(\\w+\\)$"}
+ ]
+ },
"walltime": {
"anyOf": [
{"type": "string", "minLength": 1},
diff --git a/merlin/spec/override.py b/merlin/spec/override.py
index 83a831c59..316e76f86 100644
--- a/merlin/spec/override.py
+++ b/merlin/spec/override.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/spec/specification.py b/merlin/spec/specification.py
index e5ebc858c..5ffffd959 100644
--- a/merlin/spec/specification.py
+++ b/merlin/spec/specification.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/study/__init__.py b/merlin/study/__init__.py
index c76918410..57477ea1f 100644
--- a/merlin/study/__init__.py
+++ b/merlin/study/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/study/batch.py b/merlin/study/batch.py
index 201e41ff3..16482f399 100644
--- a/merlin/study/batch.py
+++ b/merlin/study/batch.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -40,7 +40,7 @@
import subprocess
from typing import Dict, Optional, Union
-from merlin.utils import convert_timestring, get_flux_alloc, get_yaml_var
+from merlin.utils import convert_timestring, get_flux_alloc, get_flux_version, get_yaml_var
LOG = logging.getLogger(__name__)
@@ -126,7 +126,7 @@ def get_batch_type(scheduler_legend, default=None):
return default
-def get_node_count(default=1):
+def get_node_count(parsed_batch: Dict, default=1):
"""
Determine a default node count based on the environment.
@@ -134,6 +134,20 @@ def get_node_count(default=1):
the environment cannot be determined.
:param returns: (int) The number of nodes to use.
"""
+
+ # Flux version check
+ flux_ver = get_flux_version(parsed_batch["flux exe"], no_errors=True)
+ major, minor, _ = map(int, flux_ver.split("."))
+ if major < 1 and minor < 17:
+ raise ValueError("Flux version is too old. Supported versions are 0.17.0+.")
+
+ # If flux is the scheduler, we can get the size of the allocation with this
+ try:
+ get_size_proc = subprocess.run("flux getattr size", shell=True, capture_output=True, text=True)
+ return int(get_size_proc.stdout)
+ except Exception:
+ pass
+
if "SLURM_JOB_NUM_NODES" in os.environ:
return int(os.environ["SLURM_JOB_NUM_NODES"])
@@ -246,7 +260,7 @@ def batch_worker_launch(
# Get the number of nodes from the environment if unset
if nodes is None or nodes == "all":
- nodes = get_node_count(default=1)
+ nodes = get_node_count(parsed_batch, default=1)
elif not isinstance(nodes, int):
raise TypeError("Nodes was passed into batch_worker_launch with an invalid type (likely a string other than 'all').")
diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py
index 556953259..5b5bdd419 100644
--- a/merlin/study/celeryadapter.py
+++ b/merlin/study/celeryadapter.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/study/dag.py b/merlin/study/dag.py
index 098cdb92b..c1b9dff78 100644
--- a/merlin/study/dag.py
+++ b/merlin/study/dag.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/study/script_adapter.py b/merlin/study/script_adapter.py
index 69dea2b63..6380b1a9b 100644
--- a/merlin/study/script_adapter.py
+++ b/merlin/study/script_adapter.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -470,6 +470,8 @@ def submit(self, step, path, cwd, job_map=None, env=None): # pylint: disable=R0
step.restart = False
elif retcode == ReturnCode.STOP_WORKERS:
LOG.debug("Execution returned status STOP_WORKERS")
+ elif retcode == ReturnCode.RAISE_ERROR:
+ LOG.debug("Execution returned status RAISE_ERROR")
else:
LOG.warning(f"Unrecognized Merlin Return code: {retcode}, returning SOFT_FAIL")
submission_record.add_info("retcode", retcode)
diff --git a/merlin/study/status.py b/merlin/study/status.py
index d11a403e2..fbeb4d46d 100644
--- a/merlin/study/status.py
+++ b/merlin/study/status.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1
+# This file is part of Merlin, Version: 1.12.2b1
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -37,10 +37,11 @@
from datetime import datetime
from glob import glob
from traceback import print_exception
-from typing import Dict, List, Optional, Tuple, Union
+from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from filelock import FileLock, Timeout
+from maestrowf.utils import get_duration
from tabulate import tabulate
from merlin.common.dumper import dump_handler
@@ -343,7 +344,8 @@ def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict
Given a step workspace and the name of the step, read in all the statuses
for the step and return them in a dict.
- :param `step_workspace`: The path to the step we're going to read statuses from
+ :param step_workspace: The path to the step we're going to read statuses from
+ :param started_step_name: The name of the step that we're gathering statuses for
:returns: A dict of statuses for the given step
"""
step_statuses = {}
@@ -353,7 +355,12 @@ def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict
# Traverse the step workspace and look for MERLIN_STATUS files
LOG.debug(f"Traversing '{step_workspace}' to find MERLIN_STATUS.json files...")
- for root, _, _ in os.walk(step_workspace):
+ for root, dirs, _ in os.walk(step_workspace, topdown=True):
+ # Look for nested workspaces and skip them
+ timestamp_regex = r"\d{8}-\d{6}$"
+ curr_dir = os.path.split(root)[1]
+ dirs[:] = [d for d in dirs if not re.search(timestamp_regex, curr_dir)]
+
# Search for a status file
status_filepath = os.path.join(root, "MERLIN_STATUS.json")
matching_files = glob(status_filepath)
@@ -363,7 +370,7 @@ def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict
statuses_read = read_status(status_filepath, f"{root}/status.lock")
# Merge the statuses we read with the dict tracking all statuses for this step
- dict_deep_merge(step_statuses, statuses_read)
+ dict_deep_merge(step_statuses, statuses_read, conflict_handler=status_conflict_handler)
# Add full step name to the tracker and count number of statuses we just read in
for full_step_name, status_info in statuses_read.items():
@@ -391,7 +398,7 @@ def load_requested_statuses(self):
for sstep in self.step_tracker["started_steps"]:
step_workspace = f"{self.workspace}/{sstep}"
step_statuses = self.get_step_statuses(step_workspace, sstep)
- dict_deep_merge(self.requested_statuses, step_statuses)
+ dict_deep_merge(self.requested_statuses, step_statuses, conflict_handler=status_conflict_handler)
# Calculate run time average and standard deviation for this step
self.get_runtime_avg_std_dev(step_statuses, sstep)
@@ -531,8 +538,9 @@ def format_status_for_csv(self) -> Dict:
# Loop through information for each step
for step_info_key, step_info_value in overall_step_info.items():
- # Skip the workers entry at the top level; this will be added in the else statement below on a task-by-task basis
- if step_info_key == "workers":
+ # Skip the workers entry at the top level;
+ # this will be added in the else statement below on a task-by-task basis
+ if step_info_key in ("workers", "worker_name"):
continue
# Format task queue entry
if step_info_key == "task_queue":
@@ -833,13 +841,15 @@ def apply_filters(self):
filtered_statuses = {}
for step_name, overall_step_info in self.requested_statuses.items():
filtered_statuses[step_name] = {}
- # Add the non-workspace keys to the filtered_status dict so we don't accidentally miss any of this information while filtering
+ # Add the non-workspace keys to the filtered_status dict so we
+ # don't accidentally miss any of this information while filtering
for non_ws_key in NON_WORKSPACE_KEYS:
try:
filtered_statuses[step_name][non_ws_key] = overall_step_info[non_ws_key]
except KeyError:
LOG.debug(
- f"Tried to add {non_ws_key} to filtered_statuses dict but it was not found in requested_statuses[{step_name}]"
+ f"Tried to add {non_ws_key} to filtered_statuses dict "
+ f"but it was not found in requested_statuses[{step_name}]"
)
# Go through the actual statuses and filter them as necessary
@@ -865,8 +875,7 @@ def apply_filters(self):
if matches_found == self.args.max_tasks:
break
else:
- # If our filters aren't a match for this task then delete it
- LOG.warning(f"No matching filter for '{sub_step_workspace}'.")
+ LOG.debug(f"No matching filter for '{sub_step_workspace}'.")
# If we've hit the limit set by args.max_tasks, break out of the outer loop
if matches_found == self.args.max_tasks:
@@ -916,7 +925,7 @@ def apply_max_tasks_limit(self):
self.args.max_tasks -= len(sub_step_workspaces)
# Merge in the task statuses that we're allowing
- dict_deep_merge(new_status_dict[step_name], overall_step_info)
+ dict_deep_merge(new_status_dict[step_name], overall_step_info, conflict_handler=status_conflict_handler)
LOG.info(f"Limited the number of tasks to display to {max_tasks} tasks.")
@@ -1099,6 +1108,95 @@ def display(self, test_mode: Optional[bool] = False):
LOG.warning("No statuses to display.")
+# Pylint complains that args is unused but we can ignore that
+def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613
+ """
+ The conflict handler function to apply to any status entries that have conflicting
+ values while merging two status files together.
+
+ kwargs should include:
+ - dict_a_val: The conflicting value from the dictionary that we're merging into
+ - dict_b_val: The conflicting value from the dictionary that we're pulling from
+ - key: The key into each dictionary that has a conflict
+ - path: The path down the dictionary tree that `dict_deep_merge` is currently at
+
+ When we're reading in status files, we're merging all of the statuses into one dictionary.
+ This function defines the merge rules in case there is a merge conflict. We ignore the list
+ and dictionary entries since `dict_deep_merge` from `utils.py` handles these scenarios already.
+
+ There are currently 4 rules:
+ - string-concatenate: take the two conflicting values and concatenate them in a string
+ - use-dict_b-and-log-debug: use the value from dict_b and log a debug message
+ - use-longest-time: use the longest time between the two conflicting values
+ - use-max: use the larger integer between the two conflicting values
+
+ :returns: The value to merge into dict_a at `key`
+ """
+ # Grab the arguments passed into this function
+ dict_a_val = kwargs.get("dict_a_val", None)
+ dict_b_val = kwargs.get("dict_b_val", None)
+ key = kwargs.get("key", None)
+ path = kwargs.get("path", None)
+
+ merge_rules = {
+ "task_queue": "string-concatenate",
+ "worker_name": "string-concatenate",
+ "status": "use-dict_b-and-log-debug",
+ "return_code": "use-dict_b-and-log-debug",
+ "elapsed_time": "use-longest-time",
+ "run_time": "use-longest-time",
+ "restarts": "use-max",
+ }
+
+ # TODO
+ # - make status tracking more modular (see https://lc.llnl.gov/gitlab/weave/merlin/-/issues/58)
+ # - once it's more modular, move the below code and the above merge_rules dict to a property in
+ # one of the new status classes (the one that has condensing maybe? or upstream from that?)
+
+ # params = self.spec.get_parameters()
+ # for token in params.parameters:
+ # merge_rules[token] = "use-dict_b-and-log-debug"
+
+ # Set parameter token key rules (commented for loop would be better but it's
+ # only possible if this conflict handler is contained within Status object; however,
+ # since this function needs to be imported outside of this file we can't do that)
+ if path is not None and "parameters" in path:
+ merge_rules[key] = "use-dict_b-and-log-debug"
+
+ try:
+ merge_rule = merge_rules[key]
+ except KeyError:
+ LOG.warning(f"The key '{key}' does not have a merge rule defined. Setting this merge to None.")
+ return None
+
+ merge_val = None
+
+ if merge_rule == "string-concatenate":
+ merge_val = f"{dict_a_val}, {dict_b_val}"
+ elif merge_rule == "use-dict_b-and-log-debug":
+ LOG.debug(
+ f"Conflict at key '{key}' while merging status files. Using the updated value. "
+ "This could lead to incorrect status information, you may want to re-run in debug mode and "
+ "check the files in the output directory for this task."
+ )
+ merge_val = dict_b_val
+ elif merge_rule == "use-longest-time":
+ if dict_a_val == "--:--:--":
+ merge_val = dict_b_val
+ elif dict_b_val == "--:--:--":
+ merge_val = dict_a_val
+ else:
+ dict_a_time = convert_to_timedelta(dict_a_val)
+ dict_b_time = convert_to_timedelta(dict_b_val)
+ merge_val = get_duration(max(dict_a_time, dict_b_time))
+ elif merge_rule == "use-max":
+ merge_val = max(dict_a_val, dict_b_val)
+ else:
+ LOG.warning(f"The merge_rule '{merge_rule}' was provided but it has no implementation.")
+
+ return merge_val
+
+
def read_status(
status_filepath: str, lock_file: str, display_fnf_message: bool = True, raise_errors: bool = False, timeout: int = 10
) -> Dict:
@@ -1112,6 +1210,8 @@ def read_status(
:param timeout: An integer representing how long to hold a lock for before timing out.
:returns: A dict of the contents in the status file
"""
+ statuses_read = {}
+
# Pylint complains that we're instantiating an abstract class but this is correct usage
lock = FileLock(lock_file) # pylint: disable=abstract-class-instantiated
try:
@@ -1122,25 +1222,24 @@ def read_status(
# Handle timeouts
except Timeout as to_exc:
LOG.warning(f"Timed out when trying to read status from '{status_filepath}'")
- statuses_read = {}
if raise_errors:
- raise Timeout from to_exc
+ raise to_exc
# Handle FNF errors
except FileNotFoundError as fnf_exc:
if display_fnf_message:
LOG.warning(f"Could not find '{status_filepath}'")
- statuses_read = {}
if raise_errors:
- raise FileNotFoundError from fnf_exc
+ raise fnf_exc
# Handle JSONDecode errors (this is likely due to an empty status file)
except json.decoder.JSONDecodeError as json_exc:
LOG.warning(f"JSONDecodeError raised when trying to read status from '{status_filepath}'")
if raise_errors:
- raise json.decoder.JSONDecodeError from json_exc
+ raise json_exc
# Catch all exceptions so that we don't crash the workers
except Exception as exc: # pylint: disable=broad-except
LOG.warning(
- f"An exception was raised while trying to read status from '{status_filepath}'!\n{print_exception(type(exc), exc, exc.__traceback__)}"
+ f"An exception was raised while trying to read status from '{status_filepath}'!\n"
+ f"{print_exception(type(exc), exc, exc.__traceback__)}"
)
if raise_errors:
raise exc
@@ -1167,5 +1266,6 @@ def write_status(status_to_write: Dict, status_filepath: str, lock_file: str, ti
# Catch all exceptions so that we don't crash the workers
except Exception as exc: # pylint: disable=broad-except
LOG.warning(
- f"An exception was raised while trying to write status to '{status_filepath}'!\n{print_exception(type(exc), exc, exc.__traceback__)}"
+ f"An exception was raised while trying to write status to '{status_filepath}'!\n"
+ f"{print_exception(type(exc), exc, exc.__traceback__)}"
)
diff --git a/merlin/study/status_constants.py b/merlin/study/status_constants.py
index 84af51e53..b7dfe7fa3 100644
--- a/merlin/study/status_constants.py
+++ b/merlin/study/status_constants.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1
+# This file is part of Merlin, Version: 1.12.2b1
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/study/status_renderers.py b/merlin/study/status_renderers.py
index b062a5d9b..02d6ab948 100644
--- a/merlin/study/status_renderers.py
+++ b/merlin/study/status_renderers.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1
+# This file is part of Merlin, Version: 1.12.2b1
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/study/step.py b/merlin/study/step.py
index 95f8e7622..26d737e14 100644
--- a/merlin/study/step.py
+++ b/merlin/study/step.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/study/study.py b/merlin/study/study.py
index 2cd2e0fc9..f30e36058 100644
--- a/merlin/study/study.py
+++ b/merlin/study/study.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/merlin/utils.py b/merlin/utils.py
index 070638c38..2e69f5779 100644
--- a/merlin/utils.py
+++ b/merlin/utils.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -37,17 +37,18 @@
import re
import socket
import subprocess
+import sys
from contextlib import contextmanager
from copy import deepcopy
from datetime import datetime, timedelta
from types import SimpleNamespace
-from typing import List, Optional, Union
+from typing import Callable, List, Optional, Union
import numpy as np
+import pkg_resources
import psutil
import yaml
-
-from merlin.exceptions import DeepMergeException
+from tabulate import tabulate
try:
@@ -559,33 +560,51 @@ def needs_merlin_expansion(
return False
-def dict_deep_merge(dict_a, dict_b, path=None):
+def dict_deep_merge(dict_a: dict, dict_b: dict, path: str = None, conflict_handler: Callable = None):
"""
This function recursively merges dict_b into dict_a. The built-in
merge of dictionaries in python (dict(dict_a) | dict(dict_b)) does not do a
deep merge so this function is necessary. This will only merge in new keys,
- it will NOT update existing ones.
+ it will NOT update existing ones, unless you specify a conflict handler function.
Credit to this stack overflow post: https://stackoverflow.com/a/7205107.
:param `dict_a`: A dict that we'll merge dict_b into
:param `dict_b`: A dict that we want to merge into dict_a
:param `path`: The path down the dictionary tree that we're currently at
+ :param `conflict_handler`: An optional function to handle conflicts between values at the same key.
+ The function should return the value to be used in the merged dictionary.
+ The default behavior without this argument is to log a warning.
"""
+
+ # Check to make sure we have valid dict_a and dict_b input
+ msgs = [
+ f"{name} '{actual_dict}' is not a dict"
+ for name, actual_dict in [("dict_a", dict_a), ("dict_b", dict_b)]
+ if not isinstance(actual_dict, dict)
+ ]
+ if len(msgs) > 0:
+ LOG.warning(f"Problem with dict_deep_merge: {', '.join(msgs)}. Ignoring this merge call.")
+ return
+
if path is None:
path = []
for key in dict_b:
if key in dict_a:
if isinstance(dict_a[key], dict) and isinstance(dict_b[key], dict):
- dict_deep_merge(dict_a[key], dict_b[key], path + [str(key)])
- elif key == "workers": # specifically for status merging
- all_workers = [dict_a[key], dict_b[key]]
- dict_a[key] = list(set().union(*all_workers))
+ dict_deep_merge(dict_a[key], dict_b[key], path=path + [str(key)], conflict_handler=conflict_handler)
elif isinstance(dict_a[key], list) and isinstance(dict_a[key], list):
dict_a[key] += dict_b[key]
elif dict_a[key] == dict_b[key]:
pass # same leaf value
else:
- raise DeepMergeException(f"Conflict at {'.'.join(path + [str(key)])}")
+ if conflict_handler is not None:
+ merged_val = conflict_handler(
+ dict_a_val=dict_a[key], dict_b_val=dict_b[key], key=key, path=path + [str(key)]
+ )
+ dict_a[key] = merged_val
+ else:
+ # Want to just output a warning instead of raising an exception so that the workflow doesn't crash
+ LOG.warning(f"Conflict at {'.'.join(path + [str(key)])}. Ignoring the update to key '{key}'.")
else:
dict_a[key] = dict_b[key]
@@ -619,6 +638,11 @@ def convert_to_timedelta(timestr: Union[str, int]) -> timedelta:
"""
# make sure it's a string in case we get an int
timestr = str(timestr)
+
+ # remove time unit characters (if any exist)
+ time_unit_chars = r"[dhms]"
+ timestr = re.sub(time_unit_chars, "", timestr)
+
nfields = len(timestr.split(":"))
if nfields > 4:
raise ValueError(f"Cannot convert {timestr} to a timedelta. Valid format: days:hours:minutes:seconds.")
@@ -723,3 +747,26 @@ def ws_time_to_dt(ws_time: str) -> datetime:
minute = int(ws_time[11:13])
second = int(ws_time[13:])
return datetime(year, month, day, hour=hour, minute=minute, second=second)
+
+
+def get_package_versions(package_list: List[str]) -> str:
+ """
+ Return a table of the versions and locations of installed packages, including python.
+ If the package is not installed says "Not installed"
+
+ :param `package_list`: A list of packages.
+ :returns: A string that's a formatted table.
+ """
+ table = []
+ for package in package_list:
+ try:
+ distribution = pkg_resources.get_distribution(package)
+ version = distribution.version
+ location = distribution.location
+ table.append([package, version, location])
+ except pkg_resources.DistributionNotFound:
+ table.append([package, "Not installed", "N/A"])
+
+ table.insert(0, ["python", sys.version.split()[0], sys.executable])
+ table_str = tabulate(table, headers=["Package", "Version", "Location"], tablefmt="simple")
+ return f"Python Packages\n\n{table_str}\n"
diff --git a/requirements/dev.txt b/requirements/dev.txt
index 6e8722b4b..3695c6164 100644
--- a/requirements/dev.txt
+++ b/requirements/dev.txt
@@ -12,3 +12,4 @@ alabaster
johnnydep
deepdiff
pytest-order
+pytest-mock
diff --git a/requirements/release.txt b/requirements/release.txt
index 11fd85129..dcdb9b81b 100644
--- a/requirements/release.txt
+++ b/requirements/release.txt
@@ -9,5 +9,6 @@ numpy
parse
psutil>=5.1.0
pyyaml>=5.1.2
+setuptools
tabulate
redis>=4.3.4
\ No newline at end of file
diff --git a/setup.py b/setup.py
index 3fe594fc8..0ee113e0a 100644
--- a/setup.py
+++ b/setup.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/tests/README.md b/tests/README.md
index a6bf7005a..22efc5470 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -58,17 +58,47 @@ not connected> quit
## The Fixture Process Explained
-Pytest fixtures play a fundamental role in establishing a consistent foundation for test execution,
-thus ensuring reliable and predictable test outcomes. This section will delve into essential aspects
-of these fixtures, including how to integrate fixtures into tests, the utilization of fixtures within other fixtures,
-their scope, and the yielding of fixture results.
+In the world of pytest testing, fixtures are like the building blocks that create a sturdy foundation for your tests.
+They ensure that every test starts from the same fresh ground, leading to reliable and consistent results. This section
+will dive into the nitty-gritty of these fixtures, showing you how they're architected in this test suite, how to use
+them in your tests here, how to combine them for more complex scenarios, how long they stick around during testing, and
+what it means to yield a fixture.
+
+### Fixture Architecture
+
+Fixtures can be defined in two locations:
+
+1. `tests/conftest.py`: This file located at the root of the test suite houses common fixtures that are utilized
+across various test modules
+2. `tests/fixtures/`: This directory contains specific test module fixtures. Each fixture file is named according
+to the module(s) that the fixtures defined within are for.
+
+Credit for this setup must be given to [this Medium article](https://medium.com/@nicolaikozel/modularizing-pytest-fixtures-fd40315c5a93).
+
+#### Fixture Naming Conventions
+
+For fixtures defined within the `tests/fixtures/` directory, the fixture name should be prefixed by the name of the
+fixture file they are defined in.
+
+#### Importing Fixtures as Plugins
+
+Fixtures located in the `tests/fixtures/` directory are technically plugins. Therefore, to use them we must
+register them as plugins within the `conftest.py` file (see the top of said file for the implementation).
+This allows them to be discovered and used by test modules throughout the suite.
+
+**You do not have to register the fixtures you define as plugins in `conftest.py` since the registration there
+uses `glob` to grab everything from the `tests/fixtures/` directory automatically.**
### How to Integrate Fixtures Into Tests
Probably the most important part of fixtures is understanding how to use them. Luckily, this process is very
-simple and can be dumbed down to 2 steps:
+simple and can be dumbed down to just a couple steps:
+
+0. **[Module-specific fixtures only]** If you're creating a module-specific fixture (i.e. a fixture that won't be used throughout the entire test
+suite), then create a file in the `tests/fixtures/` directory.
-1. Create a fixture in the `conftest.py` file by using the `@pytest.fixture` decorator. For example:
+1. Create a fixture in either the `conftest.py` file or the file you created in the `tests/fixtures/` directory
+by using the `@pytest.fixture` decorator. For example:
```
@pytest.fixture
@@ -131,10 +161,10 @@ scopes come to save the day.
### Fixture Scopes
-There are several different scopes that you can set for fixtures. The majority of our fixtures use a `session`
-scope so that we only have to create the fixtures one time (as some of them can take a few seconds to set up).
-The goal is to create fixtures with the most general use-case in mind so that we can re-use them for larger
-scopes, which helps with efficiency.
+There are several different scopes that you can set for fixtures. The majority of our fixtures in `conftest.py`
+use a `session` scope so that we only have to create the fixtures one time (as some of them can take a few seconds
+to set up). The goal is to create fixtures with the most general use-case in mind so that we can re-use them for
+larger scopes, which helps with efficiency.
For more info on scopes, see
[Pytest's Fixture Scope documentation](https://docs.pytest.org/en/6.2.x/fixture.html#scope-sharing-fixtures-across-classes-modules-packages-or-session).
diff --git a/tests/celery_test_workers.py b/tests/celery_test_workers.py
index d97229664..ad81d30e6 100644
--- a/tests/celery_test_workers.py
+++ b/tests/celery_test_workers.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/tests/conftest.py b/tests/conftest.py
index e180ad910..bea07f64c 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -33,6 +33,7 @@
"""
import os
import subprocess
+from glob import glob
from time import sleep
from typing import Dict
@@ -45,6 +46,14 @@
from tests.celery_test_workers import CeleryTestWorkersManager
+#######################################
+# Loading in Module Specific Fixtures #
+#######################################
+pytest_plugins = [
+ fixture_file.replace("/", ".").replace(".py", "") for fixture_file in glob("tests/fixtures/[!__]*.py", recursive=True)
+]
+
+
class RedisServerError(Exception):
"""
Exception to signal that the server wasn't pinged properly.
diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py
new file mode 100644
index 000000000..ab3e56590
--- /dev/null
+++ b/tests/fixtures/__init__.py
@@ -0,0 +1,16 @@
+"""
+This directory is for help modularizing fixture definitions so that we don't have to
+store every single fixture in the `conftest.py` file.
+
+Fixtures must start with the same name as the file they're defined in. For instance,
+if our fixture file was named `example.py` then our fixtures in this file would have
+to start with "example_":
+
+```title="example.py"
+import pytest
+
+@pytest.fixture
+def example_test_data():
+ return {"key": "val"}
+```
+"""
diff --git a/tests/fixtures/status.py b/tests/fixtures/status.py
new file mode 100644
index 000000000..f26cea37c
--- /dev/null
+++ b/tests/fixtures/status.py
@@ -0,0 +1,146 @@
+"""
+Fixtures specifically for help testing the functionality related to
+status/detailed-status.
+"""
+
+import os
+import shutil
+from argparse import Namespace
+from pathlib import Path
+
+import pytest
+import yaml
+
+from tests.unit.study.status_test_files import status_test_variables
+
+
+@pytest.fixture(scope="session")
+def status_testing_dir(temp_output_dir: str) -> str:
+ """
+ A pytest fixture to set up a temporary directory to write files to for testing status.
+
+ :param temp_output_dir: The path to the temporary output directory we'll be using for this test run
+ :returns: The path to the temporary testing directory for status testing
+ """
+ testing_dir = f"{temp_output_dir}/status_testing/"
+ if not os.path.exists(testing_dir):
+ os.mkdir(testing_dir)
+
+ return testing_dir
+
+
+@pytest.fixture(scope="session")
+def status_empty_file(status_testing_dir: str) -> str: # pylint: disable=W0621
+ """
+ A pytest fixture to create an empty status file.
+
+ :param status_testing_dir: A pytest fixture that defines a path to the the output
+ directory we'll write to
+ :returns: The path to the empty status file
+ """
+ empty_file = Path(f"{status_testing_dir}/empty_status.json")
+ if not empty_file.exists():
+ empty_file.touch()
+
+ return empty_file
+
+
+@pytest.fixture(scope="session")
+def status_spec_path(status_testing_dir: str) -> str: # pylint: disable=W0621
+ """
+ Copy the test spec to the temp directory and modify the OUTPUT_PATH in the spec
+ to point to the temp location.
+
+ :param status_testing_dir: A pytest fixture that defines a path to the the output
+ directory we'll write to
+ :returns: The path to the spec file
+ """
+ test_spec = f"{os.path.dirname(__file__)}/../unit/study/status_test_files/status_test_spec.yaml"
+ spec_in_temp_dir = f"{status_testing_dir}/status_test_spec.yaml"
+ shutil.copy(test_spec, spec_in_temp_dir) # copy test spec to temp directory
+
+ # Modify the OUTPUT_PATH variable to point to the temp directory
+ with open(spec_in_temp_dir, "r") as spec_file:
+ spec_contents = yaml.load(spec_file, yaml.Loader)
+ spec_contents["env"]["variables"]["OUTPUT_PATH"] = status_testing_dir
+ with open(spec_in_temp_dir, "w") as spec_file:
+ yaml.dump(spec_contents, spec_file)
+
+ return spec_in_temp_dir
+
+
+def set_sample_path(output_workspace: str):
+ """
+ A pytest fixture to set the path to the samples file in the test spec.
+
+ :param output_workspace: The workspace that we'll pull the spec file to update from
+ """
+ temp_merlin_info_path = f"{output_workspace}/merlin_info"
+ expanded_spec_path = f"{temp_merlin_info_path}/status_test_spec.expanded.yaml"
+
+ # Read in the contents of the expanded spec
+ with open(expanded_spec_path, "r") as expanded_file:
+ expanded_contents = yaml.load(expanded_file, yaml.Loader)
+
+ # Modify the samples file path
+ expanded_contents["merlin"]["samples"]["file"] = f"{temp_merlin_info_path}/samples.csv"
+
+ # Write the new contents to the expanded spec
+ with open(expanded_spec_path, "w") as expanded_file:
+ yaml.dump(expanded_contents, expanded_file)
+
+
+@pytest.fixture(scope="session")
+def status_output_workspace(status_testing_dir: str) -> str: # pylint: disable=W0621
+ """
+ A pytest fixture to copy the test output workspace for status to the temporary
+ status testing directory.
+
+ :param status_testing_dir: A pytest fixture that defines a path to the the output
+ directory we'll write to
+ :returns: The path to the output workspace in the temp status testing directory
+ """
+ output_workspace = f"{status_testing_dir}/{status_test_variables.VALID_WORKSPACE}"
+ shutil.copytree(status_test_variables.VALID_WORKSPACE_PATH, output_workspace) # copy over the files
+ set_sample_path(output_workspace) # set the path to the samples file in the expanded yaml
+ return output_workspace
+
+
+@pytest.fixture(scope="function")
+def status_args():
+ """
+ A pytest fixture to set up a namespace with all the arguments necessary for
+ the Status object.
+
+ :returns: The namespace with necessary arguments for the Status object
+ """
+ return Namespace(
+ subparsers="status",
+ level="INFO",
+ detailed=False,
+ output_path=None,
+ task_server="celery",
+ cb_help=False,
+ dump=None,
+ no_prompts=True, # We'll set this to True here since it's easier to test this way
+ )
+
+
+@pytest.fixture(scope="session")
+def status_nested_workspace(status_testing_dir: str) -> str: # pylint: disable=W0621
+ """
+ Create an output workspace that contains another output workspace within one of its
+ steps. In this case it will copy the status test workspace then within the 'just_samples'
+ step we'll copy the status test workspace again but with a different name.
+
+ :param status_testing_dir: A pytest fixture that defines a path to the the output
+ directory we'll write to
+ :returns: The path to the top level workspace
+ """
+ top_level_workspace = f"{status_testing_dir}/status_test_study_nested_20240520-163524"
+ nested_workspace = f"{top_level_workspace}/just_samples/nested_workspace_20240520-163524"
+ shutil.copytree(status_test_variables.VALID_WORKSPACE_PATH, top_level_workspace) # copy over the top level workspace
+ shutil.copytree(status_test_variables.VALID_WORKSPACE_PATH, nested_workspace) # copy over the nested workspace
+ set_sample_path(top_level_workspace) # set the path to the samples file in the expanded yaml of the top level workspace
+ set_sample_path(nested_workspace) # set the path to the samples file in the expanded yaml of the nested workspace
+ return top_level_workspace
diff --git a/tests/integration/conditions.py b/tests/integration/conditions.py
index 535e350de..83f07aafe 100644
--- a/tests/integration/conditions.py
+++ b/tests/integration/conditions.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -307,8 +307,9 @@ class PathExists(Condition):
A condition for checking if a path to a file or directory exists
"""
- def __init__(self, pathname) -> None:
+ def __init__(self, pathname, negate=False) -> None:
self.pathname = pathname
+ self.negate = negate
def path_exists(self) -> bool:
"""Check if a path exists"""
@@ -319,7 +320,7 @@ def __str__(self) -> str:
@property
def passes(self):
- return self.path_exists()
+ return not self.path_exists() if self.negate else self.path_exists()
class FileHasRegex(Condition):
diff --git a/tests/integration/definitions.py b/tests/integration/definitions.py
index 28eac2d0e..59c1fa256 100644
--- a/tests/integration/definitions.py
+++ b/tests/integration/definitions.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -128,6 +128,7 @@ def define_tests(): # pylint: disable=R0914,R0915
lsf = f"{examples}/lsf/lsf_par.yaml"
mul_workers_demo = f"{dev_examples}/multiple_workers.yaml"
cli_substitution_wf = f"{test_specs}/cli_substitution_test.yaml"
+ chord_err_wf = f"{test_specs}/chord_err.yaml"
# Other shortcuts
black = "black --check --target-version py36"
@@ -294,6 +295,11 @@ def define_tests(): # pylint: disable=R0914,R0915
"conditions": [HasReturnCode(), HasRegex(r"default_worker", negate=True)],
"run type": "local",
},
+ "run-workers echo variable for worker nodes": {
+ "cmds": f"{workers_flux} {flux_native} --echo",
+ "conditions": [HasReturnCode(), HasRegex(r"-N 4")],
+ "run type": "local",
+ },
}
wf_format_tests = {
"local minimum_format": {
@@ -827,6 +833,30 @@ def define_tests(): # pylint: disable=R0914,R0915
"run type": "distributed",
},
}
+ distributed_error_checks = {
+ "check chord error continues wf": {
+ "cmds": [
+ f"{workers} {chord_err_wf} --vars OUTPUT_PATH=./{OUTPUT_DIR}",
+ f"{run} {chord_err_wf} --vars OUTPUT_PATH=./{OUTPUT_DIR}; sleep 40; tree {OUTPUT_DIR}",
+ ],
+ "conditions": [
+ HasReturnCode(),
+ PathExists( # Check that the sample that's supposed to raise an error actually raises an error
+ f"{OUTPUT_DIR}/process_samples/01/MERLIN_FINISHED",
+ negate=True,
+ ),
+ StepFileExists( # Check that step 3 is actually started and completes
+ "step_3",
+ "MERLIN_FINISHED",
+ "chord_err",
+ OUTPUT_DIR,
+ ),
+ ],
+ "run type": "distributed",
+ "cleanup": KILL_WORKERS,
+ "num procs": 2,
+ }
+ }
# combine and return test dictionaries
all_tests = {}
@@ -849,6 +879,7 @@ def define_tests(): # pylint: disable=R0914,R0915
stop_workers_tests,
query_workers_tests,
distributed_tests,
+ distributed_error_checks,
]:
all_tests.update(test_dict)
diff --git a/tests/integration/run_tests.py b/tests/integration/run_tests.py
index e54e5d603..ef2bef882 100644
--- a/tests/integration/run_tests.py
+++ b/tests/integration/run_tests.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/tests/integration/samples_files/samples.csv b/tests/integration/samples_files/samples.csv
new file mode 100644
index 000000000..38ef6a076
--- /dev/null
+++ b/tests/integration/samples_files/samples.csv
@@ -0,0 +1,3 @@
+SUCCESS_1
+RAISE
+SUCCESS_2
diff --git a/tests/integration/test_specs/chord_err.yaml b/tests/integration/test_specs/chord_err.yaml
new file mode 100644
index 000000000..3da99ae03
--- /dev/null
+++ b/tests/integration/test_specs/chord_err.yaml
@@ -0,0 +1,54 @@
+description:
+ name: chord_err
+ description: test the chord err problem
+
+env:
+ variables:
+ OUTPUT_PATH: ./studies
+
+global.parameters:
+ TEST_PARAM:
+ values: [2, 4]
+ label: TEST_PARAM.%%
+
+study:
+- name: process_samples
+ description: Process samples. Purposefully try to trigger the chord err
+ run:
+ cmd: |
+ if [ $(SAMPLE) == "RAISE" ];
+ then
+ exit $(MERLIN_RAISE_ERROR)
+ else
+ echo "Success for sample $(SAMPLE)"
+ fi
+- name: samples_and_params
+ description: step with samples and parameters
+ run:
+ cmd: |
+ echo "sample: $(SAMPLE); param: $(TEST_PARAM)"
+ if [ -f $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)/MERLIN_FINISHED ];
+ then
+ echo "MERLIN finished file found at $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)"
+ else
+ echo "MERLIN finished file NOT found at $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)"
+ fi
+ depends: [process_samples_*]
+- name: step_3
+ description: funnel step
+ run:
+ cmd: |
+ echo "Running step_3"
+ depends: [samples_and_params_*]
+
+merlin:
+ samples:
+ column_labels: [SAMPLE]
+ file: $(MERLIN_INFO)/samples.csv
+ generate:
+ cmd: cp $(SPECROOT)/../samples_files/samples.csv $(MERLIN_INFO)/samples.csv
+ resources:
+ workers:
+ merlin_test_worker:
+ args: -l INFO --concurrency 1 --prefetch-multiplier 1 -Ofair
+ steps: [process_samples, samples_and_params, step_3]
diff --git a/tests/integration/test_specs/flux_par_native_test.yaml b/tests/integration/test_specs/flux_par_native_test.yaml
index 8eaf4b024..6fd9021a4 100644
--- a/tests/integration/test_specs/flux_par_native_test.yaml
+++ b/tests/integration/test_specs/flux_par_native_test.yaml
@@ -14,6 +14,7 @@ env:
OUTPUT_PATH: ./studies
N_SAMPLES: 10
SCRIPTS: $(SPECROOT)/../../../merlin/examples/workflows/flux/scripts
+ WORKER_NODES: 4
study:
- description: Build the code
@@ -71,6 +72,7 @@ merlin:
simworkers:
args: -l INFO --concurrency 1 --prefetch-multiplier 1 -Ofair
steps: [runs, data]
+ nodes: $(WORKER_NODES)
samples:
column_labels: [V1, V2]
file: $(MERLIN_INFO)/samples.npy
diff --git a/tests/unit/study/__init__.py b/tests/unit/study/__init__.py
index c76918410..57477ea1f 100644
--- a/tests/unit/study/__init__.py
+++ b/tests/unit/study/__init__.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/tests/unit/study/status_test_files/combine_status_files.py b/tests/unit/study/status_test_files/combine_status_files.py
index 7d5413e6a..f7021a97a 100644
--- a/tests/unit/study/status_test_files/combine_status_files.py
+++ b/tests/unit/study/status_test_files/combine_status_files.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/tests/unit/study/status_test_files/shared_tests.py b/tests/unit/study/status_test_files/shared_tests.py
index b003b993e..fb31b96a7 100644
--- a/tests/unit/study/status_test_files/shared_tests.py
+++ b/tests/unit/study/status_test_files/shared_tests.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/tests/unit/study/status_test_files/status_test_variables.py b/tests/unit/study/status_test_files/status_test_variables.py
index c941ad0c5..ffb3cba31 100644
--- a/tests/unit/study/status_test_files/status_test_variables.py
+++ b/tests/unit/study/status_test_files/status_test_variables.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/tests/unit/study/test_celeryadapter.py b/tests/unit/study/test_celeryadapter.py
index d94f16047..0572d6c66 100644
--- a/tests/unit/study/test_celeryadapter.py
+++ b/tests/unit/study/test_celeryadapter.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
diff --git a/tests/unit/study/test_detailed_status.py b/tests/unit/study/test_detailed_status.py
index 8c7f0f600..ae278c975 100644
--- a/tests/unit/study/test_detailed_status.py
+++ b/tests/unit/study/test_detailed_status.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -212,16 +212,31 @@ def test_json_dump_with_filters(self):
dump functionalities. The file needs to exist already for an append so it's
better to keep these tests together.
"""
- # Set filters for failed and cancelled tasks, and then reload the requested_statuses
- self.detailed_status_obj.args.task_status = ["FAILED", "CANCELLED"]
- self.detailed_status_obj.load_requested_statuses()
-
- # Set the dump file
- json_dump_file = f"{status_test_variables.PATH_TO_TEST_FILES}/detailed_dump_test.json"
- self.detailed_status_obj.args.dump = json_dump_file
+ # Need to create a new DetailedStatus object so that filters are loaded from the beginning
+ args = Namespace(
+ subparsers="detailed-status",
+ level="INFO",
+ detailed=True,
+ output_path=None,
+ task_server="celery",
+ dump=f"{status_test_variables.PATH_TO_TEST_FILES}/detailed_dump_test.json", # Set the dump file
+ no_prompts=True,
+ max_tasks=None,
+ return_code=None,
+ steps=["all"],
+ task_queues=None,
+ task_status=["FAILED", "CANCELLED"], # Set filters for failed and cancelled tasks
+ workers=None,
+ disable_pager=True,
+ disable_theme=False,
+ layout="default",
+ )
+ detailed_status_obj = DetailedStatus(
+ args=args, spec_display=False, file_or_ws=status_test_variables.VALID_WORKSPACE_PATH
+ )
# Run the json dump test (we should only get failed and cancelled statuses)
- shared_tests.run_json_dump_test(self.detailed_status_obj, status_test_variables.REQUESTED_STATUSES_FAIL_AND_CANCEL)
+ shared_tests.run_json_dump_test(detailed_status_obj, status_test_variables.REQUESTED_STATUSES_FAIL_AND_CANCEL)
def test_csv_dump_with_filters(self):
"""
@@ -229,17 +244,32 @@ def test_csv_dump_with_filters(self):
dump functionalities. The file needs to exist already for an append so it's
better to keep these tests together.
"""
- # Set filters for failed and cancelled tasks, and then reload the requested_statuses
- self.detailed_status_obj.args.task_status = ["FAILED", "CANCELLED"]
- self.detailed_status_obj.load_requested_statuses()
-
- # Set the dump file
- csv_dump_file = f"{status_test_variables.PATH_TO_TEST_FILES}/detailed_dump_test.csv"
- self.detailed_status_obj.args.dump = csv_dump_file
+ # Need to create a new DetailedStatus object so that filters are loaded from the beginning
+ args = Namespace(
+ subparsers="detailed-status",
+ level="INFO",
+ detailed=True,
+ output_path=None,
+ task_server="celery",
+ dump=f"{status_test_variables.PATH_TO_TEST_FILES}/detailed_dump_test.csv", # Set the dump file
+ no_prompts=True,
+ max_tasks=None,
+ return_code=None,
+ steps=["all"],
+ task_queues=None,
+ task_status=["FAILED", "CANCELLED"], # Set filters for failed and cancelled tasks
+ workers=None,
+ disable_pager=True,
+ disable_theme=False,
+ layout="default",
+ )
+ detailed_status_obj = DetailedStatus(
+ args=args, spec_display=False, file_or_ws=status_test_variables.VALID_WORKSPACE_PATH
+ )
# Run the csv dump test (we should only get failed and cancelled statuses)
expected_output = shared_tests.build_row_list(status_test_variables.FORMATTED_STATUSES_FAIL_AND_CANCEL)
- shared_tests.run_csv_dump_test(self.detailed_status_obj, expected_output)
+ shared_tests.run_csv_dump_test(detailed_status_obj, expected_output)
class TestPromptFunctionality(TestBaseDetailedStatus):
diff --git a/tests/unit/study/test_status.py b/tests/unit/study/test_status.py
index 695af17f3..9d602848f 100644
--- a/tests/unit/study/test_status.py
+++ b/tests/unit/study/test_status.py
@@ -6,7 +6,7 @@
#
# LLNL-CODE-797170
# All rights reserved.
-# This file is part of Merlin, Version: 1.12.1.
+# This file is part of Merlin, Version: 1.12.2b1.
#
# For details, see https://github.com/LLNL/merlin.
#
@@ -30,174 +30,609 @@
"""
Tests for the Status class in the status.py module
"""
-import unittest
+import json
+import logging
+import os
from argparse import Namespace
-from copy import deepcopy
from datetime import datetime
+from json.decoder import JSONDecodeError
-import yaml
+import pytest
from deepdiff import DeepDiff
+from filelock import Timeout
from merlin.spec.expansion import get_spec_with_expansion
-from merlin.study.status import Status
+from merlin.study.status import Status, read_status, status_conflict_handler, write_status
+from merlin.study.status_constants import NON_WORKSPACE_KEYS
from tests.unit.study.status_test_files import shared_tests, status_test_variables
-class TestMerlinStatus(unittest.TestCase):
- """Test the logic for methods in the Status class."""
+class TestStatusReading:
+ """Test the logic for reading in status files"""
+
+ cancel_step_dir = f"{status_test_variables.VALID_WORKSPACE_PATH}/cancel_step"
+ status_file = f"{cancel_step_dir}/MERLIN_STATUS.json"
+ lock_file = f"{cancel_step_dir}/status.lock"
+
+ def test_basic_read(self):
+ """
+ Test the basic reading functionality of `read_status`. There should
+ be no errors thrown and the correct status dict should be returned.
+ """
+ actual_statuses = read_status(self.status_file, self.lock_file)
+ read_status_diff = DeepDiff(
+ actual_statuses, status_test_variables.REQUESTED_STATUSES_JUST_CANCELLED_STEP, ignore_order=True
+ )
+ assert read_status_diff == {}
+
+ def test_timeout_raise_errors_disabled(self, mocker: "Fixture", caplog: "Fixture"): # noqa: F821
+ """
+ Test the timeout functionality of the `read_status` function with
+ `raise_errors` set to False. This should log a warning message and
+ return an empty dict.
+ This test will create a mock of the FileLock object in order to
+ force a timeout to be raised.
+
+ :param mocker: A built-in fixture from the pytest-mock library to create a Mock object
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+
+ # Set the mock to raise a timeout
+ mock_filelock = mocker.patch("merlin.study.status.FileLock")
+ mock_lock = mocker.MagicMock()
+ mock_lock.acquire.side_effect = Timeout(self.lock_file)
+ mock_filelock.return_value = mock_lock
+
+ # Check that the return is as we expect
+ actual_status = read_status(self.status_file, self.lock_file)
+ assert actual_status == {}
+
+ # Check that a warning is logged
+ expected_log = f"Timed out when trying to read status from '{self.status_file}'"
+ assert expected_log in caplog.text, "Missing expected log message"
+
+ def test_timeout_raise_errors_enabled(self, mocker: "Fixture", caplog: "Fixture"): # noqa: F821
+ """
+ Test the timeout functionality of the `read_status` function with
+ `raise_errors` set to True. This should log a warning message and
+ raise a Timeout exception.
+ This test will create a mock of the FileLock object in order to
+ force a timeout to be raised.
+
+ :param mocker: A built-in fixture from the pytest-mock library to create a Mock object
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+
+ # Set the mock to raise a timeout
+ mock_filelock = mocker.patch("merlin.study.status.FileLock")
+ mock_lock = mocker.MagicMock()
+ mock_lock.acquire.side_effect = Timeout(self.lock_file)
+ mock_filelock.return_value = mock_lock
+
+ # Check that a Timeout exception is raised
+ with pytest.raises(Timeout):
+ read_status(self.status_file, self.lock_file, raise_errors=True)
+
+ # Check that a warning is logged
+ expected_log = f"Timed out when trying to read status from '{self.status_file}'"
+ assert expected_log in caplog.text, "Missing expected log message"
+
+ def test_file_not_found_no_fnf_no_errors(self, caplog: "Fixture"): # noqa: F821
+ """
+ Test the file not found functionality with the `display_fnf_message`
+ and `raise_errors` options both set to False. This should just return
+ an empty dict and not log anything.
+
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+ dummy_file = "i_dont_exist.json"
+ actual_status = read_status(dummy_file, self.lock_file, display_fnf_message=False, raise_errors=False)
+ assert actual_status == {}
+ assert caplog.text == ""
+
+ def test_file_not_found_with_fnf_no_errors(self, caplog: "Fixture"): # noqa: F821
+ """
+ Test the file not found functionality with the `display_fnf_message`
+ set to True and the `raise_errors` option set to False. This should
+ return an empty dict and log a warning.
+
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+ dummy_file = "i_dont_exist.json"
+ actual_status = read_status(dummy_file, self.lock_file, display_fnf_message=True, raise_errors=False)
+ assert actual_status == {}
+ assert f"Could not find '{dummy_file}'" in caplog.text
+
+ def test_file_not_found_no_fnf_with_errors(self, caplog: "Fixture"): # noqa: F821
+ """
+ Test the file not found functionality with the `display_fnf_message`
+ set to False and the `raise_errors` option set to True. This should
+ raise a FileNotFound error and not log anything.
+
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+ dummy_file = "i_dont_exist.json"
+ with pytest.raises(FileNotFoundError):
+ read_status(dummy_file, self.lock_file, display_fnf_message=False, raise_errors=True)
+ assert caplog.text == ""
+
+ def test_file_not_found_with_fnf_and_errors(self, caplog: "Fixture"): # noqa: F821
+ """
+ Test the file not found functionality with the `display_fnf_message`
+ and `raise_errors` options both set to True. This should raise a FileNotFound
+ error and log a warning.
+
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+ dummy_file = "i_dont_exist.json"
+ with pytest.raises(FileNotFoundError):
+ read_status(dummy_file, self.lock_file, display_fnf_message=True, raise_errors=True)
+ assert f"Could not find '{dummy_file}'" in caplog.text
+
+ def test_json_decode_raise_errors_disabled(self, caplog: "Fixture", status_empty_file: str): # noqa: F821
+ """
+ Test the json decode error functionality with `raise_errors` disabled.
+ This should log a warning and return an empty dict.
+
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ :param status_empty_file: A pytest fixture to give us an empty status file
+ """
+ actual_status = read_status(status_empty_file, self.lock_file, raise_errors=False)
+ assert actual_status == {}
+ assert f"JSONDecodeError raised when trying to read status from '{status_empty_file}'" in caplog.text
+
+ def test_json_decode_raise_errors_enabled(self, caplog: "Fixture", status_empty_file: str): # noqa: F821
+ """
+ Test the json decode error functionality with `raise_errors` enabled.
+ This should log a warning and raise a JSONDecodeError.
+
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ :param status_empty_file: A pytest fixture to give us an empty status file
+ """
+ with pytest.raises(JSONDecodeError):
+ read_status(status_empty_file, self.lock_file, raise_errors=True)
+ assert f"JSONDecodeError raised when trying to read status from '{status_empty_file}'" in caplog.text
+
+ @pytest.mark.parametrize("exception", [TypeError, ValueError, NotImplementedError, IOError, UnicodeError, OSError])
+ def test_broad_exception_handler_raise_errors_disabled(
+ self, mocker: "Fixture", caplog: "Fixture", exception: Exception # noqa: F821
+ ):
+ """
+ Test the broad exception handler with `raise_errors` disabled. This should
+ log a warning and return an empty dict.
+
+ :param mocker: A built-in fixture from the pytest-mock library to create a Mock object
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ :param exception: An exception to force `read_status` to raise.
+ Values for this are obtained from parametrized list above.
+ """
+
+ # Set the mock to raise an exception
+ mock_filelock = mocker.patch("merlin.study.status.FileLock")
+ mock_lock = mocker.MagicMock()
+ mock_lock.acquire.side_effect = exception()
+ mock_filelock.return_value = mock_lock
+
+ actual_status = read_status(self.status_file, self.lock_file, raise_errors=False)
+ assert actual_status == {}
+ assert f"An exception was raised while trying to read status from '{self.status_file}'!" in caplog.text
+
+ @pytest.mark.parametrize("exception", [TypeError, ValueError, NotImplementedError, IOError, UnicodeError, OSError])
+ def test_broad_exception_handler_raise_errors_enabled(
+ self, mocker: "Fixture", caplog: "Fixture", exception: Exception # noqa: F821
+ ):
+ """
+ Test the broad exception handler with `raise_errors` enabled. This should
+ log a warning and raise whichever exception is passed in (see list of
+ parametrized exceptions in the decorator above).
+
+ :param mocker: A built-in fixture from the pytest-mock library to create a Mock object
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ :param exception: An exception to force `read_status` to raise.
+ Values for this are obtained from parametrized list above.
+ """
+
+ # Set the mock to raise an exception
+ mock_filelock = mocker.patch("merlin.study.status.FileLock")
+ mock_lock = mocker.MagicMock()
+ mock_lock.acquire.side_effect = exception()
+ mock_filelock.return_value = mock_lock
+
+ with pytest.raises(exception):
+ read_status(self.status_file, self.lock_file, raise_errors=True)
+ assert f"An exception was raised while trying to read status from '{self.status_file}'!" in caplog.text
+
+
+class TestStatusWriting:
+ """Test the logic for writing to status files"""
+
+ status_to_write = {"status": "TESTING"}
+
+ def test_basic_write(self, status_testing_dir: str):
+ """
+ Test the basic functionality of the `write_status` function. This
+ should write status to a file.
+
+ :param status_testing_dir: A pytest fixture defined in `tests/fixtures/status.py`
+ that defines a path to the the output directory we'll write to
+ """
+
+ # Test variables
+ status_filepath = f"{status_testing_dir}/basic_write.json"
+ lock_file = f"{status_testing_dir}/basic_write.lock"
- @classmethod
- def setUpClass(cls):
+ # Run the test
+ write_status(self.status_to_write, status_filepath, lock_file)
+
+ # Check that the path exists and that it contains the dummy status content
+ assert os.path.exists(status_filepath)
+ with open(status_filepath, "r") as sfp:
+ dummy_status = json.load(sfp)
+ assert dummy_status == self.status_to_write
+
+ @pytest.mark.parametrize("exception", [TypeError, ValueError, NotImplementedError, IOError, UnicodeError, OSError])
+ def test_exception_raised(
+ self, mocker: "Fixture", caplog: "Fixture", status_testing_dir: str, exception: Exception # noqa: F821
+ ):
+ """
+ Test the exception handler using several different exceptions defined in the
+ parametrized list in the decorator above. This should log a warning and not
+ create the status file that we provide.
+
+ :param mocker: A built-in fixture from the pytest-mock library to create a Mock object
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ :param status_testing_dir: A pytest fixture defined in `tests/fixtures/status.py`
+ that defines a path to the the output directory we'll write to
+ :param exception: An exception to force `read_status` to raise.
+ Values for this are obtained from parametrized list above.
+ """
+
+ # Set the mock to raise an exception
+ mock_filelock = mocker.patch("merlin.study.status.FileLock")
+ mock_lock = mocker.MagicMock()
+ mock_lock.acquire.side_effect = exception()
+ mock_filelock.return_value = mock_lock
+
+ # Test variables
+ status_filepath = f"{status_testing_dir}/exception_{exception.__name__}.json"
+ lock_file = f"{status_testing_dir}/exception_{exception.__name__}.lock"
+
+ write_status(self.status_to_write, status_filepath, lock_file)
+ assert f"An exception was raised while trying to write status to '{status_filepath}'!" in caplog.text
+ assert not os.path.exists(status_filepath)
+
+
+class TestStatusConflictHandler:
+ """Test the functionality of the `status_conflict_handler` function."""
+
+ def test_parameter_conflict(self, caplog: "Fixture"): # noqa: F821
+ """
+ Test that conflicting parameters are handled properly. This is a special
+ case of the use-dict_b-and-log-debug rule since parameter tokens vary
+ and have to be added to the `merge_rules` dict on the fly.
+
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+ caplog.set_level(logging.DEBUG)
+
+ # Create two dicts with conflicting parameter values
+ key = "TOKEN"
+ dict_a = {"parameters": {"cmd": {key: "value"}, "restart": None}}
+ dict_b = {"parameters": {"cmd": {key: "new_value"}, "restart": None}}
+ path = ["parameters", "cmd"]
+
+ # Run the test
+ merged_val = status_conflict_handler(
+ dict_a_val=dict_a["parameters"]["cmd"][key], dict_b_val=dict_b["parameters"]["cmd"][key], key=key, path=path
+ )
+
+ # Check that everything ran properly
+ expected_log = (
+ f"Conflict at key '{key}' while merging status files. Using the updated value. "
+ "This could lead to incorrect status information, you may want to re-run in debug mode and "
+ "check the files in the output directory for this task."
+ )
+ assert merged_val == "new_value"
+ assert expected_log in caplog.text
+
+ def test_non_existent_key(self, caplog: "Fixture"): # noqa: F821
+ """
+ Test providing `status_conflict_handler` a key that doesn't exist in
+ the `merge_rule` dict. This should log a warning and return None.
+
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+ key = "i_dont_exist"
+ merged_val = status_conflict_handler(key=key)
+ assert merged_val is None
+ assert f"The key '{key}' does not have a merge rule defined. Setting this merge to None." in caplog.text
+
+ def test_rule_string_concatenate(self):
"""
- We need to modify the path to the samples file in the expanded spec for these tests.
- This will only happen once when these tests are initialized.
+ Test the string-concatenate merge rule. This should combine
+ the strings provided in `dict_a_val` and `dict_b_val` into one
+ comma-delimited string.
"""
- # Read in the contents of the expanded spec
- with open(status_test_variables.EXPANDED_SPEC_PATH, "r") as expanded_file:
- cls.initial_expanded_contents = yaml.load(expanded_file, yaml.Loader)
- # Create a copy of the contents so we can reset the file when these tests are done
- modified_contents = deepcopy(cls.initial_expanded_contents)
+ # Create two dicts with conflicting task-queue values
+ key = "task_queue"
+ val1 = "existing_task_queue"
+ val2 = "new_task_queue"
+ dict_a = {key: val1}
+ dict_b = {key: val2}
+
+ # Run the test and make sure the values are being concatenated
+ merged_val = status_conflict_handler(
+ dict_a_val=dict_a[key],
+ dict_b_val=dict_b[key],
+ key=key,
+ )
+ assert merged_val == f"{val1}, {val2}"
+
+ def test_rule_use_initial_and_log_debug(self, caplog: "Fixture"): # noqa: F821
+ """
+ Test the use-dict_b-and-log-debug merge rule. This should
+ return the value passed in to `dict_b_val` and log a debug
+ message.
+
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+ caplog.set_level(logging.DEBUG)
+
+ # Create two dicts with conflicting status values
+ key = "status"
+ dict_a = {key: "SUCCESS"}
+ dict_b = {key: "FAILED"}
+
+ # Run the test
+ merged_val = status_conflict_handler(
+ dict_a_val=dict_a[key],
+ dict_b_val=dict_b[key],
+ key=key,
+ )
+
+ # Check that everything ran properly
+ expected_log = (
+ f"Conflict at key '{key}' while merging status files. Using the updated value. "
+ "This could lead to incorrect status information, you may want to re-run in debug mode and "
+ "check the files in the output directory for this task."
+ )
+ assert merged_val == "FAILED"
+ assert expected_log in caplog.text
- # Modify the samples file path
- modified_contents["merlin"]["samples"]["file"] = status_test_variables.SAMPLES_PATH
+ def test_rule_use_longest_time_no_dict_a_time(self):
+ """
+ Test the use-longest-time merge rule with no time set for `dict_a_val`.
+ This should default to using the time in `dict_b_val`.
+ """
+ key = "elapsed_time"
+ expected_time = "12h:34m:56s"
+ dict_a = {key: "--:--:--"}
+ dict_b = {key: expected_time}
- # Write the new contents to the expanded spec
- with open(status_test_variables.EXPANDED_SPEC_PATH, "w") as expanded_file:
- yaml.dump(modified_contents, expanded_file)
+ merged_val = status_conflict_handler(
+ dict_a_val=dict_a[key],
+ dict_b_val=dict_b[key],
+ key=key,
+ )
+ assert merged_val == expected_time
- @classmethod
- def tearDownClass(cls):
+ def test_rule_use_longest_time_no_dict_b_time(self):
"""
- When these tests are done we'll reset the contents of the expanded spec path
- to their initial states.
+ Test the use-longest-time merge rule with no time set for `dict_b_val`.
+ This should default to using the time in `dict_a_val`.
"""
- with open(status_test_variables.EXPANDED_SPEC_PATH, "w") as expanded_file:
- yaml.dump(cls.initial_expanded_contents, expanded_file)
+ key = "run_time"
+ expected_time = "12h:34m:56s"
+ dict_a = {key: expected_time}
+ dict_b = {key: "--:--:--"}
+
+ merged_val = status_conflict_handler(
+ dict_a_val=dict_a[key],
+ dict_b_val=dict_b[key],
+ key=key,
+ )
+ assert merged_val == expected_time
- def setUp(self):
+ def test_rule_use_longest_time(self):
"""
- We'll create an argparse namespace here that can be modified on a
- test-by-test basis.
+ Test the use-longest-time merge rule with times set for both `dict_a_val`
+ and `dict_b_val`. This should use whichever time is longer.
"""
- # We'll set all of the args needed to create the DetailedStatus object here and then
- # just modify them on a test-by-test basis
- self.args = Namespace(
- subparsers="status",
- level="INFO",
- detailed=False,
- output_path=None,
- task_server="celery",
- cb_help=False,
- dump=None,
- no_prompts=True, # We'll set this to True here since it's easier to test this way
+
+ # Set up test variables
+ key = "run_time"
+ short_time = "01h:04m:33s"
+ long_time = "12h:34m:56s"
+
+ # Run test with dict b having the longer time
+ dict_a = {key: short_time}
+ dict_b = {key: long_time}
+ merged_val = status_conflict_handler(
+ dict_a_val=dict_a[key],
+ dict_b_val=dict_b[key],
+ key=key,
+ )
+ assert merged_val == "0d:" + long_time # Time manipulation in status_conflict_handler will prepend '0d:'
+
+ # Run test with dict a having the longer time
+ dict_a_2 = {key: long_time}
+ dict_b_2 = {key: short_time}
+ merged_val_2 = status_conflict_handler(
+ dict_a_val=dict_a_2[key],
+ dict_b_val=dict_b_2[key],
+ key=key,
+ )
+ assert merged_val_2 == "0d:" + long_time
+
+ @pytest.mark.parametrize(
+ "dict_a_val, dict_b_val, expected",
+ [
+ (0, 0, 0),
+ (0, 1, 1),
+ (1, 0, 1),
+ (-1, 0, 0),
+ (0, -1, 0),
+ (23, 20, 23),
+ (17, 21, 21),
+ ],
+ )
+ def test_rule_use_max(self, dict_a_val: int, dict_b_val: int, expected: int):
+ """
+ Test the use-max merge rule. This should take the maximum of 2 values.
+
+ :param dict_a_val: The value to pass in for dict_a_val
+ :param dict_b_val: The value to pass in for dict_b_val
+ :param expected: The expected value from this test
+ """
+ key = "restarts"
+ merged_val = status_conflict_handler(
+ dict_a_val=dict_a_val,
+ dict_b_val=dict_b_val,
+ key=key,
)
+ assert merged_val == expected
- def test_spec_setup_nonexistent_file(self):
+
+class TestMerlinStatus:
+ """Test the logic for methods in the Status class."""
+
+ def test_spec_setup_nonexistent_file(self, status_args: Namespace):
"""
Test the creation of a Status object using a nonexistent spec file.
This should not let us create the object and instead throw an error.
+
+ :param status_args: A namespace of args needed for the status object
"""
- with self.assertRaises(ValueError):
+ with pytest.raises(ValueError):
invalid_spec_path = f"{status_test_variables.PATH_TO_TEST_FILES}/nonexistent.yaml"
- self.args.specification = invalid_spec_path
- self.args.spec_provided = get_spec_with_expansion(self.args.specification)
- _ = Status(args=self.args, spec_display=True, file_or_ws=invalid_spec_path)
+ status_args.specification = invalid_spec_path
+ status_args.spec_provided = get_spec_with_expansion(status_args.specification)
+ _ = Status(args=status_args, spec_display=True, file_or_ws=invalid_spec_path)
- def test_spec_setup_no_prompts(self):
+ def test_spec_setup_no_prompts(self, status_spec_path: str, status_args: Namespace, status_output_workspace: str):
"""
Test the creation of a Status object using a valid spec file with no
prompts allowed. By default for this test class, no_prompts is True.
This also tests that the attributes created upon initialization are
correct. The methods covered here are _load_from_spec and _obtain_study,
as well as any methods covered in assert_correct_attribute_creation
+
+ :param status_spec_path: The path to the spec file in our temporary output directory
+ :param status_args: A namespace of args needed for the status object
+ :param status_output_workspace: A fixture that sets up the output workspace we'll need for this test
"""
- self.args.specification = status_test_variables.SPEC_PATH
- self.args.spec_provided = get_spec_with_expansion(self.args.specification)
- status_obj = Status(args=self.args, spec_display=True, file_or_ws=status_test_variables.SPEC_PATH)
+ status_args.specification = status_spec_path
+ status_args.spec_provided = get_spec_with_expansion(status_args.specification)
+ status_obj = Status(args=status_args, spec_display=True, file_or_ws=status_spec_path)
assert isinstance(status_obj, Status)
shared_tests.assert_correct_attribute_creation(status_obj)
- def test_prompt_for_study_with_valid_input(self):
+ def test_prompt_for_study_with_valid_input(
+ self, status_spec_path: str, status_args: Namespace, status_output_workspace: str
+ ):
"""
This is testing the prompt that's displayed when multiple study output
directories are found. This tests the _obtain_study method using valid inputs.
+
+ :param status_spec_path: The path to the spec file in our temporary output directory
+ :param status_args: A namespace of args needed for the status object
+ :param status_output_workspace: A fixture that sets up the output workspace we'll need for this test
"""
# We need to load in the MerlinSpec object and save it to the args we'll give to Status
- self.args.specification = status_test_variables.SPEC_PATH
- self.args.spec_provided = get_spec_with_expansion(self.args.specification)
+ status_args.specification = status_spec_path
+ status_args.spec_provided = get_spec_with_expansion(status_args.specification)
# We're going to load in a status object without prompts first and then use that to call the method
# that prompts the user for input
- status_obj = Status(args=self.args, spec_display=True, file_or_ws=status_test_variables.SPEC_PATH)
+ status_obj = Status(args=status_args, spec_display=True, file_or_ws=status_spec_path)
shared_tests.run_study_selector_prompt_valid_input(status_obj)
- def test_prompt_for_study_with_invalid_input(self):
+ def test_prompt_for_study_with_invalid_input(
+ self, status_spec_path: str, status_args: Namespace, status_output_workspace: str
+ ):
"""
This is testing the prompt that's displayed when multiple study output
directories are found. This tests the _obtain_study method using invalid inputs.
+
+ :param status_spec_path: The path to the spec file in our temporary output directory
+ :param status_args: A namespace of args needed for the status object
+ :param status_output_workspace: A fixture that sets up the output workspace we'll need for this test
"""
# We need to load in the MerlinSpec object and save it to the args we'll give to Status
- self.args.specification = status_test_variables.SPEC_PATH
- self.args.spec_provided = get_spec_with_expansion(self.args.specification)
+ status_args.specification = status_spec_path
+ status_args.spec_provided = get_spec_with_expansion(status_args.specification)
# We're going to load in a status object without prompts first and then use that to call the method
# that prompts the user for input
- status_obj = Status(args=self.args, spec_display=True, file_or_ws=status_test_variables.SPEC_PATH)
+ status_obj = Status(args=status_args, spec_display=True, file_or_ws=status_spec_path)
shared_tests.run_study_selector_prompt_invalid_input(status_obj)
- def test_workspace_setup_nonexistent_workspace(self):
+ def test_workspace_setup_nonexistent_workspace(self, status_args: Namespace):
"""
Test the creation of a Status object using a nonexistent workspace directory.
This should not let us create the object and instead throw an error.
+
+ :param status_args: A namespace of args needed for the status object
"""
# Testing non existent workspace (in reality main.py should deal with this for us but we'll check it just in case)
- with self.assertRaises(ValueError):
+ with pytest.raises(ValueError):
invalid_workspace = f"{status_test_variables.PATH_TO_TEST_FILES}/nonexistent_20230101-000000/"
- _ = Status(args=self.args, spec_display=False, file_or_ws=invalid_workspace)
+ _ = Status(args=status_args, spec_display=False, file_or_ws=invalid_workspace)
- def test_workspace_setup_not_a_merlin_directory(self):
+ def test_workspace_setup_not_a_merlin_directory(self, status_args: Namespace):
"""
Test the creation of a Status object using an existing directory that is NOT
an output directory from a merlin study (i.e. the directory does not have a
merlin_info/ subdirectory). This should not let us create the object and instead
throw an error.
+
+ :param status_args: A namespace of args needed for the status object
"""
- with self.assertRaises(ValueError):
- _ = Status(args=self.args, spec_display=False, file_or_ws=status_test_variables.DUMMY_WORKSPACE_PATH)
+ with pytest.raises(ValueError):
+ _ = Status(args=status_args, spec_display=False, file_or_ws=status_test_variables.DUMMY_WORKSPACE_PATH)
- def test_workspace_setup_valid_workspace(self):
+ def test_workspace_setup_valid_workspace(self, status_args: Namespace, status_output_workspace: str):
"""
Test the creation of a Status object using a valid workspace directory.
This also tests that the attributes created upon initialization are
correct. The _load_from_workspace method is covered here, as well as any
methods covered in assert_correct_attribute_creation.
+
+ :param status_args: A namespace of args needed for the status object
+ :param status_output_workspace: A fixture that sets up the output workspace we'll need for this test
"""
- status_obj = Status(args=self.args, spec_display=False, file_or_ws=status_test_variables.VALID_WORKSPACE_PATH)
+ status_obj = Status(args=status_args, spec_display=False, file_or_ws=status_output_workspace)
assert isinstance(status_obj, Status)
shared_tests.assert_correct_attribute_creation(status_obj)
- def test_json_formatter(self):
+ def test_json_formatter(self, status_args: Namespace, status_output_workspace: str):
"""
Test the json formatter for the dump method. This covers the format_json_dump method.
+
+ :param status_args: A namespace of args needed for the status object
+ :param status_output_workspace: A fixture that sets up the output workspace we'll need for this test
"""
# Create a timestamp and the status object that we'll run tests on
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- status_obj = Status(args=self.args, spec_display=False, file_or_ws=status_test_variables.VALID_WORKSPACE_PATH)
+ status_obj = Status(args=status_args, spec_display=False, file_or_ws=status_output_workspace)
# Test json formatter
json_format_diff = DeepDiff(status_obj.format_json_dump(date), {date: status_test_variables.ALL_REQUESTED_STATUSES})
- self.assertEqual(json_format_diff, {})
+ assert json_format_diff == {}
- def test_csv_formatter(self):
+ def test_csv_formatter(self, status_args: Namespace, status_output_workspace: str):
"""
Test the csv formatter for the dump method. This covers the format_csv_dump method.
+
+ :param status_args: A namespace of args needed for the status object
+ :param status_output_workspace: A fixture that sets up the output workspace we'll need for this test
"""
# Create a timestamp and the status object that we'll run tests on
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- status_obj = Status(args=self.args, spec_display=False, file_or_ws=status_test_variables.VALID_WORKSPACE_PATH)
+ status_obj = Status(args=status_args, spec_display=False, file_or_ws=status_output_workspace)
# Build the correct format and store each row in a list (so we can ignore the order)
correct_csv_format = {"time_of_status": [date] * len(status_test_variables.ALL_FORMATTED_STATUSES["step_name"])}
@@ -209,50 +644,61 @@ def test_csv_formatter(self):
# Compare differences (should be none)
csv_format_diff = DeepDiff(actual_csv_format, correct_csv_format, ignore_order=True)
- self.assertEqual(csv_format_diff, {})
+ assert csv_format_diff == {}
- def test_json_dump(self):
+ def test_json_dump(self, status_args: Namespace, status_output_workspace: str, status_testing_dir: str):
"""
Test the json dump functionality. This tests both the write and append
dump functionalities. The file needs to exist already for an append so it's
better to keep these tests together. This covers the dump method.
+
+ :param status_args: A namespace of args needed for the status object
+ :param status_output_workspace: A fixture that sets up the output workspace we'll need for this test
+ :param status_testing_dir: The temporary output directory for status tests
"""
# Create the status object that we'll run tests on
- status_obj = Status(args=self.args, spec_display=False, file_or_ws=status_test_variables.VALID_WORKSPACE_PATH)
+ status_obj = Status(args=status_args, spec_display=False, file_or_ws=status_output_workspace)
# Set the dump file
- json_dump_file = f"{status_test_variables.PATH_TO_TEST_FILES}/dump_test.json"
+ json_dump_file = f"{status_testing_dir}/dump_test.json"
status_obj.args.dump = json_dump_file
# Run the json dump test
shared_tests.run_json_dump_test(status_obj, status_test_variables.ALL_REQUESTED_STATUSES)
- def test_csv_dump(self):
+ def test_csv_dump(self, status_args: Namespace, status_output_workspace: str, status_testing_dir: str):
"""
Test the csv dump functionality. This tests both the write and append
dump functionalities. The file needs to exist already for an append so it's
better to keep these tests together. This covers the format_status_for_csv
and dump methods.
+
+ :param status_args: A namespace of args needed for the status object
+ :param status_output_workspace: A fixture that sets up the output workspace we'll need for this test
+ :param status_testing_dir: The temporary output directory for status tests
"""
# Create the status object that we'll run tests on
- status_obj = Status(args=self.args, spec_display=False, file_or_ws=status_test_variables.VALID_WORKSPACE_PATH)
+ status_obj = Status(args=status_args, spec_display=False, file_or_ws=status_output_workspace)
# Set the dump file
- csv_dump_file = f"{status_test_variables.PATH_TO_TEST_FILES}/dump_test.csv"
+ csv_dump_file = f"{status_testing_dir}/dump_test.csv"
status_obj.args.dump = csv_dump_file
# Run the csv dump test
expected_output = shared_tests.build_row_list(status_test_variables.ALL_FORMATTED_STATUSES)
shared_tests.run_csv_dump_test(status_obj, expected_output)
- def test_display(self):
+ def test_display(self, status_args: Namespace, status_output_workspace: str):
"""
Test the status display functionality without actually displaying anything.
Running the display in test_mode will just provide us with the state_info
dict created for each step that is typically used for display. We'll ensure
this state_info dict is created properly here. This covers the display method.
+
+ :param status_args: A namespace of args needed for the status object
+ :param status_output_workspace: A fixture that sets up the output workspace we'll need for this test
"""
# Create the status object that we'll run tests on
- status_obj = Status(args=self.args, spec_display=False, file_or_ws=status_test_variables.VALID_WORKSPACE_PATH)
+ status_obj = Status(args=status_args, spec_display=False, file_or_ws=status_output_workspace)
# Get the status info that display would use if it were printing output
all_status_info = status_obj.display(test_mode=True)
@@ -267,12 +713,15 @@ def test_display(self):
# Make sure all the state info dicts for each step match what they should be
state_info_diff = DeepDiff(state_info, status_test_variables.DISPLAY_INFO[step_name], ignore_order=True)
- self.assertEqual(state_info_diff, {})
+ assert state_info_diff == {}
- def test_get_runtime_avg_std_dev(self):
+ def test_get_runtime_avg_std_dev(self, status_args: Namespace, status_output_workspace: str):
"""
Test the functionality that calculates the run time average and standard
deviation for each step. This test covers the get_runtime_avg_std_dev method.
+
+ :param status_args: A namespace of args needed for the status object
+ :param status_output_workspace: A fixture that sets up the output workspace we'll need for this test
"""
dummy_step_status = {
"dummy_step_PARAM.1": {
@@ -313,7 +762,7 @@ def test_get_runtime_avg_std_dev(self):
},
}
- status_obj = Status(args=self.args, spec_display=False, file_or_ws=status_test_variables.VALID_WORKSPACE_PATH)
+ status_obj = Status(args=status_args, spec_display=False, file_or_ws=status_output_workspace)
status_obj.get_runtime_avg_std_dev(dummy_step_status, "dummy_step")
# Set expected values
@@ -321,9 +770,26 @@ def test_get_runtime_avg_std_dev(self):
expected_std_dev = "±16m:40s" # Std dev is 1000 seconds = 16m:40s
# Make sure the values were calculated as expected
- self.assertEqual(status_obj.run_time_info["dummy_step"]["avg_run_time"], expected_avg)
- self.assertEqual(status_obj.run_time_info["dummy_step"]["run_time_std_dev"], expected_std_dev)
+ assert status_obj.run_time_info["dummy_step"]["avg_run_time"] == expected_avg
+ assert status_obj.run_time_info["dummy_step"]["run_time_std_dev"] == expected_std_dev
+
+ def test_nested_workspace_ignored(self, status_args: Namespace, status_nested_workspace: str):
+ """
+ Test that nested workspaces are not counted in the status output.
+
+ :param status_args: A namespace of args needed for the status object
+ :param status_nested_workspace: The path to a workspace that has a nested workspace for testing
+ """
+ # Check that the initial loading process was correct
+ status_obj = Status(args=status_args, spec_display=False, file_or_ws=status_nested_workspace)
+ assert status_obj.num_requested_statuses == status_test_variables.NUM_ALL_REQUESTED_STATUSES
-if __name__ == "__main__":
- unittest.main()
+ # Reset the requested status dict and re-run the test on just the directory that contains the
+ # nested workspace (in this case the 'just_samples' step)
+ status_obj.requested_statuses = {}
+ step_statuses = status_obj.get_step_statuses(f"{status_nested_workspace}/just_samples", "just_samples")
+ num_just_samples_statuses = 0
+ for overall_step_info in step_statuses.values():
+ num_just_samples_statuses += len(overall_step_info.keys() - NON_WORKSPACE_KEYS) # Don't count non-workspace keys
+ assert num_just_samples_statuses == status_test_variables.TASKS_PER_STEP["just_samples"]
diff --git a/tests/unit/utils/test_dict_deep_merge.py b/tests/unit/utils/test_dict_deep_merge.py
new file mode 100644
index 000000000..133897f36
--- /dev/null
+++ b/tests/unit/utils/test_dict_deep_merge.py
@@ -0,0 +1,276 @@
+"""
+Tests for the `dict_deep_merge` function defined in the `utils.py` module.
+"""
+
+from typing import Any, Dict, List
+
+import pytest
+
+from merlin.utils import dict_deep_merge
+
+
+def run_invalid_check(dict_a: Any, dict_b: Any, expected_log: str, caplog: "Fixture"): # noqa: F821
+ """
+ Helper function to run invalid input tests on the `dict_deep_merge` function.
+
+ :param dict_a: The value of dict_a that we're testing against
+ :param dict_b: The value of dict_b that we're testing against
+ :param expected_log: The log that we're expecting `dict_deep_merge` to write
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+
+ # Store initial value of dict_a
+ if isinstance(dict_a, list):
+ dict_a_initial = dict_a.copy()
+ else:
+ dict_a_initial = dict_a
+
+ # Check that dict_deep_merge returns None and that dict_a wasn't modified
+ assert dict_deep_merge(dict_a, dict_b) is None
+ assert dict_a_initial == dict_a
+
+ # Check that dict_deep_merge logs a warning
+ print(f"caplog.text: {caplog.text}")
+ assert expected_log in caplog.text, "Missing expected log message"
+
+
+@pytest.mark.parametrize(
+ "dict_a, dict_b",
+ [
+ (None, None),
+ (None, ["no lists allowed!"]),
+ (["no lists allowed!"], None),
+ (["no lists allowed!"], ["no lists allowed!"]),
+ ("no strings allowed!", None),
+ (None, "no strings allowed!"),
+ ("no strings allowed!", "no strings allowed!"),
+ (10, None),
+ (None, 10),
+ (10, 10),
+ (10.5, None),
+ (None, 10.5),
+ (10.5, 10.5),
+ (("no", "tuples"), None),
+ (None, ("no", "tuples")),
+ (("no", "tuples"), ("no", "tuples")),
+ (True, None),
+ (None, True),
+ (True, True),
+ ],
+)
+def test_dict_deep_merge_both_dicts_invalid(dict_a: Any, dict_b: Any, caplog: "Fixture"): # noqa: F821
+ """
+ Test the `dict_deep_merge` function with both `dict_a` and `dict_b`
+ parameters being an invalid type. This should log a message and do
+ nothing.
+
+ :param dict_a: The value of dict_a that we're testing against
+ :param dict_b: The value of dict_b that we're testing against
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+
+ # The expected log that's output by dict_deep_merge
+ expected_log = f"Problem with dict_deep_merge: dict_a '{dict_a}' is not a dict, dict_b '{dict_b}' is not a dict. Ignoring this merge call."
+
+ # Run the actual test
+ run_invalid_check(dict_a, dict_b, expected_log, caplog)
+
+
+@pytest.mark.parametrize(
+ "dict_a, dict_b",
+ [
+ (None, {"test_key": "test_val"}),
+ (["no lists allowed!"], {"test_key": "test_val"}),
+ ("no strings allowed!", {"test_key": "test_val"}),
+ (10, {"test_key": "test_val"}),
+ (10.5, {"test_key": "test_val"}),
+ (("no", "tuples"), {"test_key": "test_val"}),
+ (True, {"test_key": "test_val"}),
+ ],
+)
+def test_dict_deep_merge_dict_a_invalid(dict_a: Any, dict_b: Dict[str, str], caplog: "Fixture"): # noqa: F821
+ """
+ Test the `dict_deep_merge` function with the `dict_a` parameter
+ being an invalid type. This should log a message and do nothing.
+
+ :param dict_a: The value of dict_a that we're testing against
+ :param dict_b: The value of dict_b that we're testing against
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+
+ # The expected log that's output by dict_deep_merge
+ expected_log = f"Problem with dict_deep_merge: dict_a '{dict_a}' is not a dict. Ignoring this merge call."
+
+ # Run the actual test
+ run_invalid_check(dict_a, dict_b, expected_log, caplog)
+
+
+@pytest.mark.parametrize(
+ "dict_a, dict_b",
+ [
+ ({"test_key": "test_val"}, None),
+ ({"test_key": "test_val"}, ["no lists allowed!"]),
+ ({"test_key": "test_val"}, "no strings allowed!"),
+ ({"test_key": "test_val"}, 10),
+ ({"test_key": "test_val"}, 10.5),
+ ({"test_key": "test_val"}, ("no", "tuples")),
+ ({"test_key": "test_val"}, True),
+ ],
+)
+def test_dict_deep_merge_dict_b_invalid(dict_a: Dict[str, str], dict_b: Any, caplog: "Fixture"): # noqa: F821
+ """
+ Test the `dict_deep_merge` function with the `dict_b` parameter
+ being an invalid type. This should log a message and do nothing.
+
+ :param dict_a: The value of dict_a that we're testing against
+ :param dict_b: The value of dict_b that we're testing against
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+
+ # The expected log that's output by dict_deep_merge
+ expected_log = f"Problem with dict_deep_merge: dict_b '{dict_b}' is not a dict. Ignoring this merge call."
+
+ # Run the actual test
+ run_invalid_check(dict_a, dict_b, expected_log, caplog)
+
+
+@pytest.mark.parametrize(
+ "dict_a, dict_b, expected",
+ [
+ ({"test_key": {}}, {"test_key": {}}, {}), # Testing merge of two empty dicts
+ ({"test_key": {}}, {"test_key": {"new_key": "new_val"}}, {"new_key": "new_val"}), # Testing dict_a empty dict merge
+ (
+ {"test_key": {"existing_key": "existing_val"}},
+ {"test_key": {}},
+ {"existing_key": "existing_val"},
+ ), # Testing dict_b empty dict merge
+ (
+ {"test_key": {"existing_key": "existing_val"}},
+ {"test_key": {"new_key": "new_val"}},
+ {"existing_key": "existing_val", "new_key": "new_val"},
+ ), # Testing merge of dicts with content
+ ],
+)
+def test_dict_deep_merge_dict_merge(
+ dict_a: Dict[str, Dict[Any, Any]], dict_b: Dict[str, Dict[Any, Any]], expected: Dict[Any, Any]
+):
+ """
+ Test the `dict_deep_merge` function with dicts that need to be merged.
+ NOTE we're keeping the test values of this function simple since the other tests
+ related to `dict_deep_merge` should be hitting the other possible scenarios.
+
+ :param dict_a: The value of dict_a that we're testing against
+ :param dict_b: The value of dict_b that we're testing against
+ :param expected: The dict that we're expecting to now be in dict_a at 'test_key'
+ """
+ dict_deep_merge(dict_a, dict_b)
+ assert dict_a["test_key"] == expected
+
+
+@pytest.mark.parametrize(
+ "dict_a, dict_b, expected",
+ [
+ ({"test_key": []}, {"test_key": []}, []), # Testing merge of two empty lists
+ ({"test_key": []}, {"test_key": ["new_val"]}, ["new_val"]), # Testing dict_a empty list merge
+ ({"test_key": ["existing_val"]}, {"test_key": []}, ["existing_val"]), # Testing dict_b empty list merge
+ (
+ {"test_key": ["existing_val"]},
+ {"test_key": ["new_val"]},
+ ["existing_val", "new_val"],
+ ), # Testing merge of list of strings
+ ({"test_key": [None]}, {"test_key": [None]}, [None, None]), # Testing merge of list of None
+ ({"test_key": [0]}, {"test_key": [1]}, [0, 1]), # Testing merge of list of integers
+ ({"test_key": [True]}, {"test_key": [False]}, [True, False]), # Testing merge of list of bools
+ ({"test_key": [0.0]}, {"test_key": [1.0]}, [0.0, 1.0]), # Testing merge of list of floats
+ (
+ {"test_key": [(True, False)]},
+ {"test_key": [(False, True)]},
+ [(True, False), (False, True)],
+ ), # Testing merge of list of tuples
+ (
+ {"test_key": [{"existing_key": "existing_val"}]},
+ {"test_key": [{"new_key": "new_val"}]},
+ [{"existing_key": "existing_val"}, {"new_key": "new_val"}],
+ ), # Testing merge of list of dicts
+ (
+ {"test_key": ["existing_val", 0]},
+ {"test_key": [True, 1.0, None]},
+ ["existing_val", 0, True, 1.0, None],
+ ), # Testing merge of list of multiple types
+ ],
+)
+def test_dict_deep_merge_list_merge(dict_a: Dict[str, List[Any]], dict_b: Dict[str, List[Any]], expected: List[Any]):
+ """
+ Test the `dict_deep_merge` function with lists that need to be merged.
+
+ :param dict_a: The value of dict_a that we're testing against
+ :param dict_b: The value of dict_b that we're testing against
+ :param expected: The list that we're expecting to now be in dict_a at 'test_key'
+ """
+ dict_deep_merge(dict_a, dict_b)
+ assert dict_a["test_key"] == expected
+
+
+@pytest.mark.parametrize(
+ "dict_a, dict_b, expected",
+ [
+ ({"test_key": None}, {"test_key": None}, None), # Testing merge of None
+ ({"test_key": "test_val"}, {"test_key": "test_val"}, "test_val"), # Testing merge of string
+ ({"test_key": 1}, {"test_key": 1}, 1), # Testing merge of int
+ ({"test_key": 1.0}, {"test_key": 1.0}, 1.0), # Testing merge of float
+ ({"test_key": False}, {"test_key": False}, False), # Testing merge of bool
+ ],
+)
+def test_dict_deep_merge_same_leaf(dict_a: Dict[str, Any], dict_b: Dict[str, Any], expected: Any):
+ """
+ Test the `dict_deep_merge` function with equivalent values in dict_a and dict_b.
+ Nothing should happen here so dict_a["test_key"] should be the exact same.
+
+ :param dict_a: The value of dict_a that we're testing against
+ :param dict_b: The value of dict_b that we're testing against
+ :param expected: The value that we're expecting to now be in dict_a at 'test_key'
+ """
+ dict_deep_merge(dict_a, dict_b)
+ assert dict_a["test_key"] == expected
+
+
+def test_dict_deep_merge_conflict_no_conflict_handler(caplog: "Fixture"): # noqa: F821
+ """
+ Test the `dict_deep_merge` function with a conflicting value in dict_b
+ and no conflict handler. Since there's no conflict handler this should
+ log a warning and ignore any merge for the key that has the conflict.
+
+ :param caplog: A built-in fixture from the pytest library to capture logs
+ """
+ dict_a = {"test_key": "existing_value"}
+ dict_b = {"test_key": "new_value"}
+
+ # Call deep merge and make sure "test_key" in dict_a wasn't updated
+ dict_deep_merge(dict_a, dict_b)
+ assert dict_a["test_key"] == "existing_value"
+
+ # Check that dict_deep_merge logs a warning
+ assert "Conflict at test_key. Ignoring the update to key 'test_key'." in caplog.text, "Missing expected log message"
+
+
+def test_dict_deep_merge_conflict_with_conflict_handler():
+ """
+ Test the `dict_deep_merge` function with a conflicting value in dict_b
+ and a conflict handler. Our conflict handler will just concatenate the
+ conflicting strings.
+ """
+ dict_a = {"test_key": "existing_value"}
+ dict_b = {"test_key": "new_value"}
+
+ def conflict_handler(*args, **kwargs):
+ """
+ The conflict handler that we'll be passing in to `dict_deep_merge`.
+ This will concatenate the conflicting strings.
+ """
+ dict_a_val = kwargs.get("dict_a_val", None)
+ dict_b_val = kwargs.get("dict_b_val", None)
+ return ", ".join([dict_a_val, dict_b_val])
+
+ # Call deep merge and make sure "test_key" in dict_a wasn't updated
+ dict_deep_merge(dict_a, dict_b, conflict_handler=conflict_handler)
+ assert dict_a["test_key"] == "existing_value, new_value"
diff --git a/tests/unit/utils/test_get_package_version.py b/tests/unit/utils/test_get_package_version.py
new file mode 100644
index 000000000..fad4623cc
--- /dev/null
+++ b/tests/unit/utils/test_get_package_version.py
@@ -0,0 +1,56 @@
+import sys
+from unittest.mock import patch
+
+import pytest
+from tabulate import tabulate
+
+from merlin.utils import get_package_versions
+
+
+fake_package_list = [
+ ("python", sys.version.split()[0], sys.executable),
+ ("merlin", "1.2.3", "/path/to/merlin"),
+ ("celery", "4.5.1", "/path/to/celery"),
+ ("kombu", "4.6.11", "/path/to/kombu"),
+ ("redis", "3.5.3", "/path/to/redis"),
+ ("amqp", "2.6.1", "/path/to/amqp"),
+]
+
+
+@pytest.fixture
+def mock_get_distribution():
+ """Mock call to get python distribution"""
+ with patch("pkg_resources.get_distribution") as mock_get_distribution:
+ mock_get_distribution.side_effect = [mock_distribution(*package) for package in fake_package_list[1:]]
+ yield mock_get_distribution
+
+
+class mock_distribution:
+ """A mock python distribution"""
+
+ def __init__(self, package, version, location):
+ self.key = package
+ self.version = version
+ self.location = location
+
+
+def test_get_package_versions(mock_get_distribution):
+ """Test ability to get versions and format as correct table"""
+ package_list = ["merlin", "celery", "kombu", "redis", "amqp"]
+ fake_table = tabulate(fake_package_list, headers=["Package", "Version", "Location"], tablefmt="simple")
+ expected_result = f"Python Packages\n\n{fake_table}\n"
+ result = get_package_versions(package_list)
+ assert result == expected_result
+
+
+def test_bad_package():
+ """Test that it only gets the things we have in our real environment."""
+ bogus_packages = ["garbage_package_1", "junk_package_2"]
+ result = get_package_versions(bogus_packages)
+ expected_data = [fake_package_list[0]] # python
+ for package in bogus_packages:
+ expected_data.append([package, "Not installed", "N/A"])
+
+ expected_table = tabulate(expected_data, headers=["Package", "Version", "Location"], tablefmt="simple")
+ expected_result = f"Python Packages\n\n{expected_table}\n"
+ assert result == expected_result