Skip to content

Commit

Permalink
Fix pylint errors
Browse files Browse the repository at this point in the history
This is a combination of 4 commits.

fix stop-workers, refactor integration tests, and fix pylint errors

fix pylint errors in merlin/ directory

fixed minor schema validation bugs

fix stop-workers flags and refactor integration tests

fix stop-workers flags and refactor integration tests

added a mapping getter to MerlinSpec object

update gitignore to ignore test outputs

add ability to see all tests in table format

update integration tests to use a different format and add stop-workers tests

fix flags for stop-workers

update CHANGELOG

add Brian to contributors

add a cleanup step to the stop-workers tests

fix schema validation bugs

pylint fixes to top level of merlin/ dir

pylint fixes for merlin/common/ dir

pylint fixes for merlin/config/ dir

pylint fixes for merlin/examples/ dir

pylint fixes for merlin/exceptions/ dir

pylint fixes for merlin/server/ dir

pylint fixes for merlin/spec/ dir

pylint fixes for merlin/study/ dir

pylint fixes for tests/integration/ dir

pylint configuration changes

fix-style changes

remove a pylint config option that didn't work

add negationchecks to two stop-workers tests

update changelog to show pylint fix

merge changes from develop

fix some merge/pylint issues

update github workflow to run all distributed tests

modify wording in the docs related to stop-workers

new pylint fixes after merging develop

fixing issues with rebase

fix merge issues that somehow got through

remove duplicate entries from changelog

fix more pylint errors in common/ and config/

fix additional pylint errors in examples/ exceptions/ and merlin/ top-level

fix additional pylint errors in server/ and spec/

refactor batch.py for pylint fixes and general cleanup

update variable name to make things cleaner

move a celery specific function to celery.py

add explanation of why we disable similar code warning

fix additional pylint errors in study/ directory

additional pylint fixes for utils.py and conditions.py

final pylint updates

refactor construct_worker_launch_command

update CHANGELOG

run fix-style

fix import issues

fix conflict between black and isort

move import back inside function to fix issues

move another import back inside function

add some logging messages to debug why tests fail on github

add extra log statements for debugging

remove batch refactor
  • Loading branch information
bgunnar5 committed Mar 28, 2023
1 parent e67f0e0 commit b17721e
Show file tree
Hide file tree
Showing 43 changed files with 631 additions and 434 deletions.
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- The learn.py script in the openfoam_wf* examples will now create the missing Energy v Lidspeed plot
- Fixed the flags associated with the `stop-workers` command (--spec, --queues, --workers)
- Fixed the --step flag for the `run-workers` command
- Fixed most of the pylint errors that we're showing up when you ran `make check-style`
- Some errors have been disabled rather than fixed. These include:
- Any pylint errors in merlin_template.py since it's deprecated now
- A "duplicate code" instance between a function in `expansion.py` and a method in `study.py`
- The function is explicitly not creating a MerlinStudy object so the code *must* be duplicate here
- Invalid-name (C0103): These errors typically relate to the names of short variables (i.e. naming files something like f or errors e)
- Unused-argument (W0613): These have been disabled for celery-related functions since celery *does* use these arguments behind the scenes
- Broad-exception (W0718): Pylint wants a more specific exception but sometimes it's ok to have a broad exception
- Import-outside-toplevel (C0415): Sometimes it's necessary for us to import inside a function. Where this is the case, these errors are disabled
- Too-many-statements (R0915): This is disabled for the `setup_argparse` function in `main.py` since it's necessary to be big. It's disabled in `tasks.py` and `celeryadapter.py` too until we can get around to refactoring some code there
- No-else-return (R1705): These are disabled in `router.py` until we refactor the file
- Consider-using-with (R1732): Pylint wants us to consider using with for calls to subprocess.run or subprocess.Popen but it's not necessary
- Too-many-arguments (R0913): These are disabled for functions that I believe *need* to have several arguments
- Note: these could be fixed by using *args and **kwargs but it makes the code harder to follow so I'm opting to not do that
- Too-many-local-variables (R0914): These are disabled for functions that have a lot of variables
- It may be a good idea at some point to go through these and try to find ways to shorten the number of variables used or split the functions up
- Too-many-branches (R0912): These are disabled for certain functions that require a good amount of branching
- Might be able to fix this in the future if we split functions up more
- Too-few-public-methods (R0903): These are disabled for classes we may add to in the future or "wrapper" classes
- Attribute-defined-outside-init (W0201): These errors are only disabled in `specification.py` as they occur in class methods so init() won't be called

### Added
- Now loads np.arrays of dtype='object', allowing mix-type sample npy
Expand All @@ -27,6 +47,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added the --disable-logs flag to the `run-workers` command
- Merlin will now assign `default_worker` to any step not associated with a worker
- Added `get_step_worker_map()` as a method in `specification.py`
- Added `tabulate_info()` function in `display.py` to help with table formatting

### Changed
- Changed celery_regex to celery_slurm_regex in test_definitions.py
Expand All @@ -42,6 +63,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Modified the `merlinspec.json` file:
- the minimum `gpus per task` is now 0 instead of 1
- variables defined in the `env` block of a spec file can now be arrays
- Refactored `batch.py`:
- Merged 4 functions (`check_for_slurm`, `check_for_lsf`, `check_for_flux`, and `check_for_pbs`) into 1 function named `check_for_scheduler`
- Modified `get_batch_type` to accommodate this change
- Added a function `parse_batch_block` to handle all the logic of reading in the batch block and storing it in one dict
- Added a function `get_flux_launch` to help decrease the amount of logic taking place in `batch_worker_launch`
- Modified `batch_worker_launch` to use the new `parse_batch_block` function
- Added a function `construct_scheduler_legend` to build a dict that keeps as much information as we need about each scheduler stored in one place
- Cleaned up the `construct_worker_launch_command` function to utilize the newly added functions and decrease the amount of repeated code


## [1.9.1]
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ Joe Koning <[email protected]>
Jeremy White <[email protected]>
Aidan Keogh
Ryan Lee <[email protected]>
Brian Gunnarson <[email protected]>
Brian Gunnarson <[email protected]>
4 changes: 2 additions & 2 deletions docs/source/merlin_commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ To send out a stop signal to some or all connected workers, use:
$ merlin stop-workers [--spec <input.yaml>] [--queues <queues>] [--workers <regex>] [--task_server celery]
The default behavior will send a stop to all connected workers,
The default behavior will send a stop to all connected workers across all workflows,
having them shutdown softly.

The ``--spec`` option targets only workers named in the ``merlin`` block of the spec file.
Expand All @@ -390,7 +390,7 @@ The ``--queues`` option allows you to pass in the names of specific queues to st
# Stop all workers on these queues, no matter their name
$ merlin stop-workers --queues queue1 queue2
The ``--workers`` option allows you to pass in a regular expression of names of queues to stop:
The ``--workers`` option allows you to pass in regular expressions of names of workers to stop:

.. code:: bash
Expand Down
2 changes: 1 addition & 1 deletion docs/source/modules/port_your_application.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ The scripts defined in the workflow steps are also written to the output directo

.. where are the worker logs, and what might show up there that .out and .err won't see? -> these more developer focused output?
When a bug crops up in a running study with many parameters, there are a few other commands to make use of. Rather than trying to spam ``Ctrl-c`` to kill all the workers, you will want to instead use ``merlin stop-workers <workflow_name>.yaml`` to stop the workers. This should then be followed up with ``merlin purge <workflow_name>.yaml`` to clear out the task queue to prevent the same
When a bug crops up in a running study with many parameters, there are a few other commands to make use of. Rather than trying to spam ``Ctrl-c`` to kill all the workers, you will want to instead use ``merlin stop-workers --spec <workflow_name>.yaml`` to stop the workers for that workflow. This should then be followed up with ``merlin purge <workflow_name>.yaml`` to clear out the task queue to prevent the same
buggy tasks from continuing to run the next time ``run-workers`` is invoked.

.. last item from board: use merlin status to see if have workers ... is that 'dangling tasks' in the image?
Expand Down
14 changes: 12 additions & 2 deletions merlin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,22 @@
from merlin.config import broker, celeryconfig, results_backend
from merlin.config.configfile import CONFIG
from merlin.config.utils import Priority, get_priority
from merlin.router import route_for_task
from merlin.utils import nested_namespace_to_dicts


LOG: logging.Logger = logging.getLogger(__name__)


# This function has to have specific args/return values for celery so ignore pylint
def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=W0613,R1710
"""
Custom task router for queues
"""
if ":" in name:
queue, _ = name.split(":")
return {"queue": queue}


merlin.common.security.encrypt_backend_traffic.set_backend_funcs()


Expand Down Expand Up @@ -84,7 +94,7 @@
# set task priority defaults to prioritize workflow tasks over task-expansion tasks
task_priority_defaults: Dict[str, Union[int, Priority]] = {
"task_queue_max_priority": 10,
"task_default_priority": get_priority(Priority.mid),
"task_default_priority": get_priority(Priority.MID),
}
if CONFIG.broker.name.lower() == "redis":
app.conf.broker_transport_options = {
Expand Down
12 changes: 1 addition & 11 deletions merlin/common/abstracts/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,7 @@
from enum import IntEnum


__all__ = (
"ReturnCode",
"OK_VALUE",
"ERROR_VALUE",
"RESTART_VALUE",
"SOFT_FAIL_VALUE",
"HARD_FAIL_VALUE",
"DRY_OK_VALUE",
"RETRY_VALUE",
"STOP_WORKERS_VALUE",
)
__all__ = ("ReturnCode",)


class ReturnCode(IntEnum):
Expand Down
3 changes: 3 additions & 0 deletions merlin/common/openfilelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
"""

# This file is not currently used so we don't care what pylint has to say
# pylint: skip-file

import copy


Expand Down
3 changes: 3 additions & 0 deletions merlin/common/opennpylib.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@
print a.dtype # dtype of array
"""
# This file is not currently used so we don't care what pylint has to say
# pylint: skip-file

from typing import List, Tuple

import numpy as np
Expand Down
6 changes: 3 additions & 3 deletions merlin/common/sample_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class SampleIndex:
# Class variable to indicate depth (mostly used for pretty printing).
depth = -1

def __init__(self, minid, maxid, children, name, leafid=-1, num_bundles=0, address=""):
def __init__(self, minid, maxid, children, name, leafid=-1, num_bundles=0, address=""): # pylint: disable=R0913
"""The constructor."""

# The direct children of this node, generally also of type SampleIndex.
Expand Down Expand Up @@ -251,14 +251,14 @@ def get_path_to_sample(self, sample_id):
"""
path = self.name
for child_val in self.children.values():
if sample_id >= child_val.min and sample_id < child_val.max:
if child_val.min <= sample_id < child_val.max:
path = os.path.join(path, child_val.get_path_to_sample(sample_id))
return path

def write_single_sample_index_file(self, path):
"""Writes the index file associated with this node."""
if not self.is_directory:
return
return None

fname = os.path.join(path, self.name, "sample_index.txt")
with open(fname, "w") as _file:
Expand Down
5 changes: 5 additions & 0 deletions merlin/common/sample_index_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
from merlin.utils import cd


# These pylint errors I've disabled are for "too many arguments"
# and "too many local variables". I think the functions are still clear
# pylint: disable=R0913,R0914


def create_hierarchy(
num_samples,
bundle_size,
Expand Down
12 changes: 7 additions & 5 deletions merlin/common/security/encrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
# SOFTWARE.
###############################################################################

"""
TODO
"""
"""This module handles encryption logic"""

import logging
import os
Expand All @@ -40,6 +38,10 @@
from merlin.config.configfile import CONFIG


# This disables all the errors about short variable names (like using f to represent a file)
# pylint: disable=invalid-name


LOG = logging.getLogger(__name__)


Expand All @@ -52,8 +54,8 @@ def _get_key_path():

try:
key_filepath = os.path.abspath(os.path.expanduser(key_filepath))
except KeyError:
raise ValueError("Error! No password provided for RabbitMQ")
except KeyError as e:
raise ValueError("Error! No password provided for RabbitMQ") from e
return key_filepath


Expand Down
53 changes: 33 additions & 20 deletions merlin/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import os
from typing import Any, Dict, Optional

# Need to disable an overwrite warning here since celery has an exception that we need that directly
# overwrites a python built-in exception
from celery import chain, chord, group, shared_task, signature
from celery.exceptions import MaxRetriesExceededError, OperationalError, TimeoutError
from celery.exceptions import MaxRetriesExceededError, OperationalError, TimeoutError # pylint: disable=W0622

from merlin.common.abstracts.enums import ReturnCode
from merlin.common.sample_index import uniform_directories
Expand All @@ -62,14 +64,21 @@

STOP_COUNTDOWN = 60

# TODO: most of the pylint errors that are disabled in this file are the ones listed below.
# We should refactor this file so that we use more functions to solve all of these errors
# R0912: too many branches
# R0913: too many arguments
# R0914: too many local variables
# R0915: too many statements


@shared_task( # noqa: C901
bind=True,
autoretry_for=retry_exceptions,
retry_backoff=True,
priority=get_priority(Priority.high),
priority=get_priority(Priority.HIGH),
)
def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noqa: C901
def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noqa: C901 pylint: disable=R0912,R0915
"""
Executes a Merlin Step
:param args: The arguments, one of which should be an instance of Step
Expand Down Expand Up @@ -123,7 +132,8 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq
self.retry(countdown=step.retry_delay)
except MaxRetriesExceededError:
LOG.warning(
f"*** Step '{step_name}' in '{step_dir}' exited with a MERLIN_RESTART command, but has already reached its retry limit ({self.max_retries}). Continuing with workflow."
f"""*** Step '{step_name}' in '{step_dir}' exited with a MERLIN_RESTART command,
but has already reached its retry limit ({self.max_retries}). Continuing with workflow."""
)
result = ReturnCode.SOFT_FAIL
elif result == ReturnCode.RETRY:
Expand All @@ -135,7 +145,8 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq
self.retry(countdown=step.retry_delay)
except MaxRetriesExceededError:
LOG.warning(
f"*** Step '{step_name}' in '{step_dir}' exited with a MERLIN_RETRY command, but has already reached its retry limit ({self.max_retries}). Continuing with workflow."
f"""*** Step '{step_name}' in '{step_dir}' exited with a MERLIN_RETRY command,
but has already reached its retry limit ({self.max_retries}). Continuing with workflow."""
)
result = ReturnCode.SOFT_FAIL
elif result == ReturnCode.SOFT_FAIL:
Expand Down Expand Up @@ -226,9 +237,9 @@ def prepare_chain_workspace(sample_index, chain_):
bind=True,
autoretry_for=retry_exceptions,
retry_backoff=True,
priority=get_priority(Priority.low),
priority=get_priority(Priority.LOW),
)
def add_merlin_expanded_chain_to_chord(
def add_merlin_expanded_chain_to_chord( # pylint: disable=R0913,R0914
self,
task_type,
chain_,
Expand Down Expand Up @@ -364,14 +375,14 @@ def add_chains_to_chord(self, all_chains):
# generates a new task signature, so we need to make
# sure we are modifying task signatures before adding them to the
# kwargs.
for g in reversed(range(len(all_chains))):
if g < len(all_chains) - 1:
for j in reversed(range(len(all_chains))):
if j < len(all_chains) - 1:
# fmt: off
new_kwargs = signature(all_chains[g][i]).kwargs.update(
{"next_in_chain": all_chains[g + 1][i]}
new_kwargs = signature(all_chains[j][i]).kwargs.update(
{"next_in_chain": all_chains[j + 1][i]}
)
# fmt: on
all_chains[g][i] = all_chains[g][i].replace(kwargs=new_kwargs)
all_chains[j][i] = all_chains[j][i].replace(kwargs=new_kwargs)
chain_steps.append(all_chains[0][i])

for sig in chain_steps:
Expand All @@ -387,9 +398,9 @@ def add_chains_to_chord(self, all_chains):
bind=True,
autoretry_for=retry_exceptions,
retry_backoff=True,
priority=get_priority(Priority.low),
priority=get_priority(Priority.LOW),
)
def expand_tasks_with_samples(
def expand_tasks_with_samples( # pylint: disable=R0913,R0914
self,
dag,
chain_,
Expand All @@ -398,7 +409,6 @@ def expand_tasks_with_samples(
task_type,
adapter_config,
level_max_dirs,
**kwargs,
):
"""
Generate a group of celery chains of tasks from a chain of task names, using merlin
Expand Down Expand Up @@ -492,16 +502,17 @@ def expand_tasks_with_samples(
LOG.debug("simple chain task queued")


# Pylint complains that "self" is unused but it's needed behind the scenes with celery
@shared_task(
bind=True,
autoretry_for=retry_exceptions,
retry_backoff=True,
acks_late=False,
reject_on_worker_lost=False,
name="merlin:shutdown_workers",
priority=get_priority(Priority.high),
priority=get_priority(Priority.HIGH),
)
def shutdown_workers(self, shutdown_queues):
def shutdown_workers(self, shutdown_queues): # pylint: disable=W0613
"""
This task issues a call to shutdown workers.
Expand All @@ -518,13 +529,15 @@ def shutdown_workers(self, shutdown_queues):
return stop_workers("celery", None, shutdown_queues, None)


# Pylint complains that these args are unused but celery passes args
# here behind the scenes and won't work if these aren't here
@shared_task(
autoretry_for=retry_exceptions,
retry_backoff=True,
name="merlin:chordfinisher",
priority=get_priority(Priority.low),
priority=get_priority(Priority.LOW),
)
def chordfinisher(*args, **kwargs):
def chordfinisher(*args, **kwargs): # pylint: disable=W0613
""".
It turns out that chain(group,group) in celery does not execute one group
after another, but executes the groups as if they were independent from
Expand All @@ -539,7 +552,7 @@ def chordfinisher(*args, **kwargs):
autoretry_for=retry_exceptions,
retry_backoff=True,
name="merlin:queue_merlin_study",
priority=get_priority(Priority.low),
priority=get_priority(Priority.LOW),
)
def queue_merlin_study(study, adapter):
"""
Expand Down
5 changes: 4 additions & 1 deletion merlin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
from merlin.utils import nested_dict_to_namespaces


class Config:
# Pylint complains that there's too few methods here but this class might
# be useful if we ever need to do extra stuff with the configuration so we'll
# ignore it for now
class Config: # pylint: disable=R0903
"""
The Config class, meant to store all Merlin config settings in one place.
Regardless of the config data loading method, this class is meant to
Expand Down
Loading

0 comments on commit b17721e

Please sign in to comment.