diff --git a/quiz/tasks.py b/quiz/tasks.py index 94dd083..49c6dea 100644 --- a/quiz/tasks.py +++ b/quiz/tasks.py @@ -14,6 +14,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with Anime Quiz. If not, see . +import contextlib import json import logging import os @@ -30,6 +31,18 @@ from quiz.animethemes import request_anime, AnimeThemesTryLater + +try: + from opentelemetry import trace + tracer = trace.get_tracer(__name__) + start_span = tracer.start_as_current_span +except ImportError: + class DummySpan: + def set_attribute(self, _attribute, _value): + pass + start_span = lambda _: contextlib.nullcontext(DummySpan()) + + logger = logging.getLogger(__name__) exit_event = threading.Event() @@ -80,54 +93,61 @@ def listen_for_tasks(self) -> None: logger.info(f"Listening for tasks %s", self.name) while not exit_event.is_set(): - response = None - while response is None and not exit_event.is_set(): - response = self._client.bzpopmin(self.name, timeout=1) + with start_span(f"task_{self.name}") as span: + span.set_attribute("db.system", "redis") + span.set_attribute("db.connection_string", settings.QUEUE_DB) + span.set_attribute("task.name", self.name) - if response is None: - break - _name, serialized_kwargs, self._current_priority = response + response = None + while response is None and not exit_event.is_set(): + response = self._client.bzpopmin(self.name, timeout=1) - kwargs = json.loads(serialized_kwargs) + if response is None: + break + _name, serialized_kwargs, self._current_priority = response + span.set_attribute("task.kwargs", serialized_kwargs) + span.set_attribute("task.priority", self._current_priority) - logger.info( - f"Executing task %s, kwargs %s, priority %s", - self.name, - kwargs, - self._current_priority - ) + kwargs = json.loads(serialized_kwargs) - try: - self.run(**kwargs) - except RetryTask: logger.info( - "Task %s, kwargs %s is being added back to the queue.", - self.name, - kwargs - ) - self.add_tasks([(kwargs, self._current_priority)]) - except Exception as e: # noqa - logger.exception( - "Failed to execute task %s, kwargs %s!", + f"Executing task %s, kwargs %s, priority %s", self.name, - kwargs + kwargs, + self._current_priority ) - if settings.BUGSNAG is not None: - bugsnag.notify( - e, - metadata={ - "task": self.name, - "task_kwargs": kwargs - } + + try: + self.run(**kwargs) + except RetryTask: + logger.info( + "Task %s, kwargs %s is being added back to the queue.", + self.name, + kwargs ) - else: - logger.info( - "Executed task %s, kwargs %s successfully!", - self.name, - kwargs - ) - finally: - self.after_return(**kwargs) + self.add_tasks([(kwargs, self._current_priority)]) + except Exception as e: # noqa + logger.exception( + "Failed to execute task %s, kwargs %s!", + self.name, + kwargs + ) + if settings.BUGSNAG is not None: + bugsnag.notify( + e, + metadata={ + "task": self.name, + "task_kwargs": kwargs + } + ) + else: + logger.info( + "Executed task %s, kwargs %s successfully!", + self.name, + kwargs + ) + finally: + self.after_return(**kwargs) logger.info("Finished task %s cleanly.", self.name)