Skip to content

Commit

Permalink
bugfix/stop-workers-flags (#405)
Browse files Browse the repository at this point in the history
fix bugs for stop-workers and schema validation
  • Loading branch information
bgunnar5 authored Mar 22, 2023
2 parents 060826f + cea8576 commit e67f0e0
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 153 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Pip wheel wasn't including .sh files for merlin examples
- The learn.py script in the openfoam_wf* examples will now create the missing Energy v Lidspeed plot
- Fixed the flags associated with the `stop-workers` command (--spec, --queues, --workers)
- Fixed the --step flag for the `run-workers` command

### Added
- Now loads np.arrays of dtype='object', allowing mix-type sample npy
Expand All @@ -22,13 +24,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added the --distributed and --display-table flags to run_tests.py
- --distributed: only run distributed tests
- --display-tests: displays a table of all existing tests and the id associated with each test
- Added the --disable-logs flag to the `run-workers` command
- Merlin will now assign `default_worker` to any step not associated with a worker
- Added `get_step_worker_map()` as a method in `specification.py`

### Changed
- Changed celery_regex to celery_slurm_regex in test_definitions.py
- Reformatted how integration tests are defined and part of how they run
- Test values are now dictionaries rather than tuples
- Stopped using `subprocess.Popen()` and `subprocess.communicate()` to run tests and now instead use `subprocess.run()` for simplicity and to keep things up-to-date with the latest subprocess release (`run()` will call `Popen()` and `communicate()` under the hood so we don't have to handle that anymore)
- Rewrote the README in the integration tests folder to explain the new integration test format
- Reformatted `start_celery_workers()` in `celeryadapter.py` file. This involved:
- Modifying `verify_args()` to return the arguments it verifies/updates
- Changing `launch_celery_worker()` to launch the subprocess (no longer builds the celery command)
- Creating `get_celery_cmd()` to do what `launch_celery_worker()` used to do and build the celery command to run
- Creating `_get_steps_to_start()`, `_create_kwargs()`, and `_get_workers_to_start()` as helper functions to simplify logic in `start_celery_workers()`
- Modified the `merlinspec.json` file:
- the minimum `gpus per task` is now 0 instead of 1
- variables defined in the `env` block of a spec file can now be arrays

## [1.9.1]
### Fixed
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Joe Koning <[email protected]>
Jeremy White <[email protected]>
Aidan Keogh
Ryan Lee <[email protected]>
Brian Gunnarson <[email protected]>
20 changes: 18 additions & 2 deletions merlin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class HelpParser(ArgumentParser):
print the help message when an error happens."""

def error(self, message):
sys.stderr.write("error: %s\n" % message)
sys.stderr.write(f"error: {message}\n")
self.print_help()
sys.exit(2)

Expand Down Expand Up @@ -222,7 +222,7 @@ def launch_workers(args):
spec, filepath = get_merlin_spec_with_override(args)
if not args.worker_echo_only:
LOG.info(f"Launching workers from '{filepath}'")
status = router.launch_workers(spec, args.worker_steps, args.worker_args, args.worker_echo_only)
status = router.launch_workers(spec, args.worker_steps, args.worker_args, args.disable_logs, args.worker_echo_only)
if args.worker_echo_only:
print(status)
else:
Expand Down Expand Up @@ -280,13 +280,17 @@ def stop_workers(args):
"""
print(banner_small)
worker_names = []

# Load in the spec if one was provided via the CLI
if args.spec:
spec_path = verify_filepath(args.spec)
spec = MerlinSpec.load_specification(spec_path)
worker_names = spec.get_worker_names()
for worker_name in worker_names:
if "$" in worker_name:
LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?")

# Send stop command to router
router.stop_workers(args.task_server, worker_names, args.queues, args.workers)


Expand Down Expand Up @@ -344,6 +348,10 @@ def process_monitor(args):


def process_server(args: Namespace):
"""
Route to the correct function based on the command
given via the CLI
"""
if args.commands == "init":
init_server()
elif args.commands == "start":
Expand Down Expand Up @@ -755,6 +763,12 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None:
help="Specify desired Merlin variable values to override those found in the specification. Space-delimited. "
"Example: '--vars LEARN=path/to/new_learn.py EPOCHS=3'",
)
run_workers.add_argument(
"--disable-logs",
action="store_true",
help="Turn off the logs for the celery workers. Note: having the -l flag "
"in your workers' args section will overwrite this flag for that worker.",
)

# merlin query-workers
query: ArgumentParser = subparsers.add_parser("query-workers", help="List connected task server workers.")
Expand Down Expand Up @@ -787,6 +801,8 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None:
stop.add_argument(
"--workers",
type=str,
action="store",
nargs="+",
default=None,
help="regex match for specific workers to stop",
)
Expand Down
30 changes: 17 additions & 13 deletions merlin/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@


try:
import importlib.resources as resources
from importlib import resources
except ImportError:
import importlib_resources as resources

Expand All @@ -74,7 +74,7 @@ def run_task_server(study, run_mode=None):
LOG.error("Celery is not specified as the task server!")


def launch_workers(spec, steps, worker_args="", just_return_command=False):
def launch_workers(spec, steps, worker_args="", disable_logs=False, just_return_command=False):
"""
Launches workers for the specified study.
Expand All @@ -83,12 +83,13 @@ def launch_workers(spec, steps, worker_args="", just_return_command=False):
:param `worker_args`: Optional arguments for the workers
:param `just_return_command`: Don't execute, just return the command
"""
if spec.merlin["resources"]["task_server"] == "celery":
if spec.merlin["resources"]["task_server"] == "celery": # pylint: disable=R1705
# Start workers
cproc = start_celery_workers(spec, steps, worker_args, just_return_command)
cproc = start_celery_workers(spec, steps, worker_args, disable_logs, just_return_command)
return cproc
else:
LOG.error("Celery is not specified as the task server!")
return "No workers started"


def purge_tasks(task_server, spec, force, steps):
Expand All @@ -103,12 +104,13 @@ def purge_tasks(task_server, spec, force, steps):
"""
LOG.info(f"Purging queues for steps = {steps}")

if task_server == "celery":
if task_server == "celery": # pylint: disable=R1705
queues = spec.make_queue_string(steps)
# Purge tasks
return purge_celery_tasks(queues, force)
else:
LOG.error("Celery is not specified as the task server!")
return -1


def query_status(task_server, spec, steps, verbose=True):
Expand All @@ -122,12 +124,13 @@ def query_status(task_server, spec, steps, verbose=True):
if verbose:
LOG.info(f"Querying queues for steps = {steps}")

if task_server == "celery":
if task_server == "celery": # pylint: disable=R1705
queues = spec.get_queue_list(steps)
# Query the queues
return query_celery_queues(queues)
else:
LOG.error("Celery is not specified as the task server!")
return []


def dump_status(query_return, csv_file):
Expand All @@ -141,7 +144,7 @@ def dump_status(query_return, csv_file):
fmode = "a"
else:
fmode = "w"
with open(csv_file, mode=fmode) as f:
with open(csv_file, mode=fmode) as f: # pylint: disable=W1514,C0103
if f.mode == "w": # add the header
f.write("# time")
for name, job, consumer in query_return:
Expand All @@ -162,7 +165,7 @@ def query_workers(task_server):
LOG.info("Searching for workers...")

if task_server == "celery":
return query_celery_workers()
query_celery_workers()
else:
LOG.error("Celery is not specified as the task server!")

Expand All @@ -174,10 +177,11 @@ def get_workers(task_server):
:return: A list of all connected workers
:rtype: list
"""
if task_server == "celery":
if task_server == "celery": # pylint: disable=R1705
return get_workers_from_app()
else:
LOG.error("Celery is not specified as the task server!")
return []


def stop_workers(task_server, spec_worker_names, queues, workers_regex):
Expand All @@ -191,14 +195,14 @@ def stop_workers(task_server, spec_worker_names, queues, workers_regex):
"""
LOG.info("Stopping workers...")

if task_server == "celery":
if task_server == "celery": # pylint: disable=R1705
# Stop workers
return stop_celery_workers(queues, spec_worker_names, workers_regex)
stop_celery_workers(queues, spec_worker_names, workers_regex)
else:
LOG.error("Celery is not specified as the task server!")


def route_for_task(name, args, kwargs, options, task=None, **kw):
def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=W0613,R1710
"""
Custom task router for queues
"""
Expand Down Expand Up @@ -249,7 +253,7 @@ def check_merlin_status(args, spec):

total_jobs = 0
total_consumers = 0
for name, jobs, consumers in queue_status:
for _, jobs, consumers in queue_status:
total_jobs += jobs
total_consumers += consumers

Expand Down
5 changes: 3 additions & 2 deletions merlin/spec/merlinspec.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"type": {"type": "string", "minLength": 1}
}
},
"gpus per task": {"type": "integer", "minimum": 1},
"gpus per task": {"type": "integer", "minimum": 0},
"max_retries": {"type": "integer", "minimum": 1},
"task_queue": {"type": "string", "minLength": 1},
"nodes": {
Expand Down Expand Up @@ -146,7 +146,8 @@
"^.*": {
"anyOf": [
{"type": "string", "minLength": 1},
{"type": "number"}
{"type": "number"},
{"type": "array"}
]
}
}
Expand Down
Loading

0 comments on commit e67f0e0

Please sign in to comment.