Skip to content

Commit

Permalink
feature/retry_priority (#468)
Browse files Browse the repository at this point in the history
* remove a merge conflict statement that was missed

* add a 'pip freeze' call in github workflow to view reqs versions

* add new retry priority as highest task priority

* update CHANGELOG

* add in MID priority

* change default priority to use priority map MID value
  • Loading branch information
bgunnar5 authored Feb 22, 2024
1 parent 801d0bf commit 9390448
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 23 deletions.
1 change: 1 addition & 0 deletions .github/workflows/push-pr_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ jobs:
python3 -m pip install --upgrade pip
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
pip3 install -r requirements/dev.txt
pip freeze
- name: Install singularity
run: |
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to Merlin will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- New Priority.RETRY value for the Celery task priorities. This will be the new highest priority.

## [1.12.0]
### Added
- A new command `merlin queue-info` that will print the status of your celery queues
Expand Down
4 changes: 2 additions & 2 deletions merlin/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq
f"Step '{step_name}' in '{step_dir}' is being restarted ({self.request.retries + 1}/{self.max_retries})..."
)
step.mstep.mark_restart()
self.retry(countdown=step.retry_delay)
self.retry(countdown=step.retry_delay, priority=get_priority(Priority.RETRY))
except MaxRetriesExceededError:
LOG.warning(
f"""*** Step '{step_name}' in '{step_dir}' exited with a MERLIN_RESTART command,
Expand All @@ -155,7 +155,7 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq
f"Step '{step_name}' in '{step_dir}' is being retried ({self.request.retries + 1}/{self.max_retries})..."
)
step.mstep.mark_restart()
self.retry(countdown=step.retry_delay)
self.retry(countdown=step.retry_delay, priority=get_priority(Priority.RETRY))
except MaxRetriesExceededError:
LOG.warning(
f"""*** Step '{step_name}' in '{step_dir}' exited with a MERLIN_RETRY command,
Expand Down
48 changes: 27 additions & 21 deletions merlin/config/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"""This module contains priority handling"""

import enum
from typing import List
from typing import Dict

from merlin.config.configfile import CONFIG

Expand All @@ -41,6 +41,7 @@ class Priority(enum.Enum):
HIGH = 1
MID = 2
LOW = 3
RETRY = 4


def is_rabbit_broker(broker: str) -> bool:
Expand All @@ -53,26 +54,31 @@ def is_redis_broker(broker: str) -> bool:
return broker in ["redis", "rediss", "redis+socket"]


def determine_priority_map(broker_name: str) -> Dict[Priority, int]:
"""
Returns the priority mapping for the given broker name.
:param broker_name: The name of the broker that we need the priority map for
:returns: The priority map associated with `broker_name`
"""
if is_rabbit_broker(broker_name):
return {Priority.LOW: 1, Priority.MID: 5, Priority.HIGH: 9, Priority.RETRY: 10}
if is_redis_broker(broker_name):
return {Priority.LOW: 10, Priority.MID: 5, Priority.HIGH: 2, Priority.RETRY: 1}

raise ValueError(f"Unsupported broker name: {broker_name}")


def get_priority(priority: Priority) -> int:
"""
Get the priority based on the broker. For a rabbit broker
a low priority is 1 and high is 10. For redis it's the opposite.
:returns: An int representing the priority level
Gets the priority level as an integer based on the broker.
For a rabbit broker a low priority is 1 and high is 10. For redis it's the opposite.
:param priority: The priority value that we want
:returns: The priority value as an integer
"""
broker: str = CONFIG.broker.name.lower()
priorities: List[Priority] = [Priority.HIGH, Priority.MID, Priority.LOW]
if not isinstance(priority, Priority):
raise TypeError(f"Unrecognized priority '{priority}'! Priority enum options: {[x.name for x in priorities]}")
if priority == Priority.MID:
return 5
if is_rabbit_broker(broker):
if priority == Priority.LOW:
return 1
if priority == Priority.HIGH:
return 10
if is_redis_broker(broker):
if priority == Priority.LOW:
return 10
if priority == Priority.HIGH:
return 1
raise ValueError(f"Function get_priority has reached unknown state! Maybe unsupported broker {broker}?")
if priority not in Priority:
raise ValueError(f"Invalid priority: {priority}")

priority_map = determine_priority_map(CONFIG.broker.name.lower())
return priority_map.get(priority, priority_map[Priority.MID]) # Default to MID priority for unknown priorities

0 comments on commit 9390448

Please sign in to comment.