Skip to content

Commit

Permalink
Refactor ItemGrabber into JobGrabber and InvocationGrabber
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Oct 11, 2023
1 parent 306550c commit e0fc179
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 18 deletions.
44 changes: 30 additions & 14 deletions lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
Dict,
List,
Tuple,
Type,
Union,
)

from sqlalchemy.exc import OperationalError
Expand Down Expand Up @@ -101,19 +103,18 @@ def shutdown(self):


class ItemGrabber:
grab_model: Union[Type[model.Job], Type[model.WorkflowInvocation]]

def __init__(
self,
app,
grab_type="Job",
handler_assignment_method=None,
max_grab=None,
self_handler_tags=None,
handler_tags=None,
):
self.app = app
self.sa_session = app.model.context
self.grab_this = getattr(model, grab_type)
self.grab_type = grab_type
self.handler_assignment_method = handler_assignment_method
self.self_handler_tags = self_handler_tags
self.max_grab = max_grab
Expand All @@ -123,27 +124,35 @@ def __init__(
self._supports_returning = self.app.application_stack.supports_returning()

def setup_query(self):
if self.grab_model is model.Job:
grab_condition = self.grab_model.table.c.state == self.grab_model.states.NEW
elif self.grab_model is model.WorkflowInvocation:
grab_condition = self.grab_model.table.c.state.in_(
(self.grab_model.states.NEW, self.grab_model.states.CANCELLING)
)
else:
raise NotImplementedError(f"Grabbing {self.grab_model} not implemented")
subq = (
select(self.grab_this.id)
select(self.grab_model.id)
.where(
and_(
self.grab_this.table.c.handler.in_(self.self_handler_tags),
self.grab_this.table.c.state == self.grab_this.states.NEW,
self.grab_model.table.c.handler.in_(self.self_handler_tags),
grab_condition,
)
)
.order_by(self.grab_this.table.c.id)
.order_by(self.grab_model.table.c.id)
)
if self.max_grab:
subq = subq.limit(self.max_grab)
if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED:
subq = subq.with_for_update(skip_locked=True)
self._grab_query = (
self.grab_this.table.update()
.where(self.grab_this.table.c.id.in_(subq))
self.grab_model.table.update()
.where(self.grab_model.table.c.id.in_(subq))
.values(handler=self.app.config.server_name)
)
if self._supports_returning:
self._grab_query = self._grab_query.returning(self.grab_this.table.c.id)
self._grab_query = self._grab_query.returning(self.grab_model.table.c.id)
if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION:
self._grab_conn_opts["isolation_level"] = "SERIALIZABLE"
log.info(
Expand Down Expand Up @@ -183,19 +192,27 @@ def grab_unhandled_items(self):
if self._supports_returning:
rows = proxy.fetchall()
if rows:
log.debug(f"Grabbed {self.grab_type}(s): {', '.join(str(row[0]) for row in rows)}")
log.debug(f"Grabbed {type(self.grab_model)}(s): {', '.join(str(row[0]) for row in rows)}")
else:
trans.rollback()
except OperationalError as e:
# If this is a serialization failure on PostgreSQL, then e.orig is a psycopg2 TransactionRollbackError
# and should have attribute `code`. Other engines should just report the message and move on.
if int(getattr(e.orig, "pgcode", -1)) != 40001:
log.debug(
"Grabbing %s failed (serialization failures are ok): %s", self.grab_type, unicodify(e)
"Grabbing %s failed (serialization failures are ok): %s", self.grab_model, unicodify(e)
)
trans.rollback()


class InvocationGrabber(ItemGrabber):
grab_model = model.WorkflowInvocation


class JobGrabber(ItemGrabber):
grab_model = model.Job


class StopSignalException(Exception):
"""Exception raised when queue returns a stop signal."""

Expand Down Expand Up @@ -240,9 +257,8 @@ def __init__(self, app: MinimalManagerApp, dispatcher):
self.app.job_config.handler_assignment_methods
)
if handler_assignment_method:
self.job_grabber = ItemGrabber(
self.job_grabber = JobGrabber(
app=app,
grab_type="Job",
handler_assignment_method=handler_assignment_method,
max_grab=self.app.job_config.handler_max_grab,
self_handler_tags=self.app.job_config.self_handler_tags,
Expand Down
7 changes: 3 additions & 4 deletions lib/galaxy/workflow/scheduling_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import galaxy.workflow.schedulers
from galaxy import model
from galaxy.exceptions import HandlerAssignmentError
from galaxy.jobs.handler import ItemGrabber
from galaxy.jobs.handler import InvocationGrabber
from galaxy.model.base import transaction
from galaxy.util import plugin_config
from galaxy.util.custom_logging import get_logger
Expand Down Expand Up @@ -285,13 +285,12 @@ def __init__(self, app, workflow_scheduling_manager):
self.invocation_grabber = None
self_handler_tags = set(self.app.job_config.self_handler_tags)
self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id)
handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method(
handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method(
self.workflow_scheduling_manager.handler_assignment_methods
)
if handler_assignment_method:
self.invocation_grabber = ItemGrabber(
self.invocation_grabber = InvocationGrabber(
app=app,
grab_type="WorkflowInvocation",
handler_assignment_method=handler_assignment_method,
max_grab=self.workflow_scheduling_manager.handler_max_grab,
self_handler_tags=self_handler_tags,
Expand Down

0 comments on commit e0fc179

Please sign in to comment.