Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery Manager #489

Open
wants to merge 32 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
78ef619
Include celerymanager and update celeryadapter to check the status of…
ryannova Jul 23, 2024
8561a18
Fixed issue where the update status was outside of if statement for c…
ryannova Jul 23, 2024
1120dd7
Include worker status stop and add template for merlin restart
ryannova Aug 1, 2024
f41938f
Added comment to the CeleryManager init
ryannova Aug 2, 2024
690115e
Increment db_num instead of being fixed
ryannova Aug 2, 2024
de4ffd0
Added other subprocess parameters and created a linking system for re…
ryannova Aug 2, 2024
67e9268
Implemented stopping of celery workers and restarting workers properly
ryannova Aug 6, 2024
406e4c2
Update stopped to stalled for when the worker doesn't respond to restart
ryannova Aug 6, 2024
78e4525
Working merlin manager run but start and stop not working properly
ryannova Aug 7, 2024
eca74ac
Made fix for subprocess to start new shell and fixed manager start an…
ryannova Aug 7, 2024
ec8aa78
Added comments and update changelog
ryannova Aug 7, 2024
3f04d24
Include style fixes
ryannova Aug 7, 2024
5538f4b
Fix style for black
ryannova Aug 7, 2024
b6bcd33
Revert launch_job script that was edited when doing automated lint
ryannova Aug 7, 2024
9b97f8b
Move importing of CONFIG to be within redis_connection due to error o…
ryannova Aug 7, 2024
c9dfd31
Added space to fix style
ryannova Aug 7, 2024
a9bd865
Revert launch_jobs.py:
ryannova Aug 7, 2024
ddc7614
Update import of all merlin.config to be in the function
ryannova Aug 7, 2024
353a66b
suggested changes plus beginning work on monitor/manager collab
bgunnar5 Aug 17, 2024
1a4d416
move managers to their own folder and fix ssl problems
bgunnar5 Aug 22, 2024
875f137
final PR touch ups
bgunnar5 Sep 3, 2024
9020aa0
Merge pull request #2 from bgunnar5/monitor_manager_collab
ryannova Sep 3, 2024
58da9bc
Fix lint style changes
ryannova Sep 3, 2024
e75dcc2
Fixed issue with context manager
ryannova Sep 4, 2024
11f9e7c
Reset file that was incorrect changed
ryannova Sep 4, 2024
7204e46
Check for ssl cert before applying to Redis connection
ryannova Sep 4, 2024
53d8f32
Comment out Active tests for celerymanager
ryannova Sep 4, 2024
a5ccb2d
Fix lint issue with unused import after commenting out Active celery …
ryannova Sep 9, 2024
2b0e8a6
Fixed style for import
ryannova Sep 9, 2024
e49f378
Fixed kwargs being modified when making a copy for saving to redis wo…
ryannova Sep 12, 2024
352e7df
Added password check and omit if a password doesn't exist
ryannova Sep 13, 2024
63b7b51
Merged changes from develop
ryannova Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to Merlin will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [unreleased]
### Added
- Merlin manager capability to monitor celery workers.

## [1.12.2b1]
### Added
- Conflict handler option to the `dict_deep_merge` function in `utils.py`
Expand Down
98 changes: 98 additions & 0 deletions merlin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from merlin.server.server_commands import config_server, init_server, restart_server, start_server, status_server, stop_server
from merlin.spec.expansion import RESERVED, get_spec_with_expansion
from merlin.spec.specification import MerlinSpec
from merlin.study.celerymanageradapter import run_manager, start_manager, stop_manager
from merlin.study.status import DetailedStatus, Status
from merlin.study.status_constants import VALID_RETURN_CODES, VALID_STATUS_FILTERS
from merlin.study.status_renderers import status_renderer_factory
Expand Down Expand Up @@ -400,6 +401,23 @@ def process_example(args: Namespace) -> None:
setup_example(args.workflow, args.path)


def process_manager(args: Namespace):
bgunnar5 marked this conversation as resolved.
Show resolved Hide resolved
if args.command == "run":
run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout)
elif args.command == "start":
if start_manager(
query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout
):
LOG.info("Manager started successfully.")
bgunnar5 marked this conversation as resolved.
Show resolved Hide resolved
elif args.command == "stop":
if stop_manager():
LOG.info("Manager stopped successfully.")
else:
LOG.error("Unable to stop manager.")
else:
print("Run manager with a command. Try 'merlin manager -h' for more details")


def process_monitor(args):
"""
CLI command to monitor merlin workers and queues to keep
Expand Down Expand Up @@ -897,6 +915,86 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None:
help="regex match for specific workers to stop",
)

# merlin manager
manager: ArgumentParser = subparsers.add_parser(
"manager",
help="Watchdog application to manage workers",
description="A daemon process that helps to restart and communicate with workers while running.",
formatter_class=ArgumentDefaultsHelpFormatter,
)
manager.set_defaults(func=process_manager)

manager_commands: ArgumentParser = manager.add_subparsers(dest="command")
manager_run = manager_commands.add_parser(
"run",
help="Run the daemon process",
description="Run manager",
formatter_class=ArgumentDefaultsHelpFormatter,
)
manager_run.add_argument(
"-qf",
"--query_frequency",
action="store",
type=int,
default=60,
help="The frequency at which workers will be queried for response.",
)
manager_run.add_argument(
"-qt",
"--query_timeout",
action="store",
type=float,
default=0.5,
help="The timeout for the query response that are sent to workers.",
)
manager_run.add_argument(
"-wt",
"--worker_timeout",
action="store",
type=int,
default=180,
help="The sum total(query_frequency*tries) time before an attempt is made to restart worker.",
)
manager_run.set_defaults(func=process_manager)
manager_start = manager_commands.add_parser(
"start",
help="Start the daemon process",
description="Start manager",
formatter_class=ArgumentDefaultsHelpFormatter,
)
manager_start.add_argument(
"-qf",
"--query_frequency",
action="store",
type=int,
default=60,
help="The frequency at which workers will be queried for response.",
)
manager_start.add_argument(
"-qt",
"--query_timeout",
action="store",
type=float,
default=0.5,
help="The timeout for the query response that are sent to workers.",
)
manager_start.add_argument(
"-wt",
"--worker_timeout",
action="store",
type=int,
default=180,
help="The sum total(query_frequency*tries) time before an attempt is made to restart worker.",
)
bgunnar5 marked this conversation as resolved.
Show resolved Hide resolved
manager_start.set_defaults(func=process_manager)
manager_stop = manager_commands.add_parser(
"stop",
help="Stop the daemon process",
description="Stop manager",
formatter_class=ArgumentDefaultsHelpFormatter,
)
manager_stop.set_defaults(func=process_manager)

# merlin monitor
monitor: ArgumentParser = subparsers.add_parser(
"monitor",
Expand Down
34 changes: 33 additions & 1 deletion merlin/study/celeryadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
from merlin.common.dumper import dump_handler
from merlin.config import Config
from merlin.study.batch import batch_check_parallel, batch_worker_launch
from merlin.study.celerymanager import CeleryManager
from merlin.study.celerymanageradapter import add_monitor_workers, remove_monitor_workers
from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running


Expand Down Expand Up @@ -761,8 +763,37 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs):
:side effect: Launches a celery worker via a subprocess
"""
try:
_ = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732
process = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732
# Get the worker name from worker_cmd and add to be monitored by celery manager
worker_cmd_list = worker_cmd.split()
worker_name = worker_cmd_list[worker_cmd_list.index("-n") + 1].replace("%h", kwargs["env"]["HOSTNAME"])
worker_name = "celery@" + worker_name
worker_list.append(worker_cmd)

# Adding the worker args to redis db
redis_connection = CeleryManager.get_worker_args_redis_connection()
args = kwargs
# Save worker command with the arguements
args["worker_cmd"] = worker_cmd
# Store the nested dictionaries into a separate key with a link.
# Note: This only support single nested dicts(for simplicity) and
# further nesting can be accomplished by making this recursive.
for key in kwargs:
if type(kwargs[key]) is dict:
key_name = worker_name + "_" + key
redis_connection.hmset(name=key_name, mapping=kwargs[key])
args[key] = "link:" + key_name
if type(kwargs[key]) is bool:
if kwargs[key]:
args[key] = "True"
else:
args[key] = "False"
redis_connection.hmset(name=worker_name, mapping=args)
redis_connection.quit()

# Adding the worker to redis db to be monitored
add_monitor_workers(workers=((worker_name, process.pid),))
LOG.info(f"Added {worker_name} to be monitored")
except Exception as e: # pylint: disable=C0103
LOG.error(f"Cannot start celery workers, {e}")
raise
Expand Down Expand Up @@ -866,6 +897,7 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None):
if workers_to_stop:
LOG.info(f"Sending stop to these workers: {workers_to_stop}")
app.control.broadcast("shutdown", destination=workers_to_stop)
remove_monitor_workers(workers=workers_to_stop)
else:
LOG.warning("No workers found to stop")

Expand Down
Loading
Loading