Skip to content

Commit

Permalink
OpenTelemetry: create a single span for each background task
Browse files Browse the repository at this point in the history
  • Loading branch information
przemub committed Nov 28, 2024
1 parent cb85309 commit f93df99
Showing 1 changed file with 60 additions and 40 deletions.
100 changes: 60 additions & 40 deletions quiz/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Anime Quiz. If not, see <https://www.gnu.org/licenses/>.
import contextlib
import json
import logging
import os
Expand All @@ -30,6 +31,18 @@

from quiz.animethemes import request_anime, AnimeThemesTryLater


try:
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
start_span = tracer.start_as_current_span
except ImportError:
class DummySpan:
def set_attribute(self, _attribute, _value):
pass
start_span = lambda _: contextlib.nullcontext(DummySpan())


logger = logging.getLogger(__name__)
exit_event = threading.Event()

Expand Down Expand Up @@ -80,54 +93,61 @@ def listen_for_tasks(self) -> None:
logger.info(f"Listening for tasks %s", self.name)

while not exit_event.is_set():
response = None
while response is None and not exit_event.is_set():
response = self._client.bzpopmin(self.name, timeout=1)
with start_span(f"task_{self.name}") as span:
span.set_attribute("db.system", "redis")
span.set_attribute("db.connection_string", settings.QUEUE_DB)
span.set_attribute("task.name", self.name)

if response is None:
break
_name, serialized_kwargs, self._current_priority = response
response = None
while response is None and not exit_event.is_set():
response = self._client.bzpopmin(self.name, timeout=1)

kwargs = json.loads(serialized_kwargs)
if response is None:
break
_name, serialized_kwargs, self._current_priority = response
span.set_attribute("task.kwargs", serialized_kwargs)
span.set_attribute("task.priority", self._current_priority)

logger.info(
f"Executing task %s, kwargs %s, priority %s",
self.name,
kwargs,
self._current_priority
)
kwargs = json.loads(serialized_kwargs)

try:
self.run(**kwargs)
except RetryTask:
logger.info(
"Task %s, kwargs %s is being added back to the queue.",
self.name,
kwargs
)
self.add_tasks([(kwargs, self._current_priority)])
except Exception as e: # noqa
logger.exception(
"Failed to execute task %s, kwargs %s!",
f"Executing task %s, kwargs %s, priority %s",
self.name,
kwargs
kwargs,
self._current_priority
)
if settings.BUGSNAG is not None:
bugsnag.notify(
e,
metadata={
"task": self.name,
"task_kwargs": kwargs
}

try:
self.run(**kwargs)
except RetryTask:
logger.info(
"Task %s, kwargs %s is being added back to the queue.",
self.name,
kwargs
)
else:
logger.info(
"Executed task %s, kwargs %s successfully!",
self.name,
kwargs
)
finally:
self.after_return(**kwargs)
self.add_tasks([(kwargs, self._current_priority)])
except Exception as e: # noqa
logger.exception(
"Failed to execute task %s, kwargs %s!",
self.name,
kwargs
)
if settings.BUGSNAG is not None:
bugsnag.notify(
e,
metadata={
"task": self.name,
"task_kwargs": kwargs
}
)
else:
logger.info(
"Executed task %s, kwargs %s successfully!",
self.name,
kwargs
)
finally:
self.after_return(**kwargs)

logger.info("Finished task %s cleanly.", self.name)

Expand Down

0 comments on commit f93df99

Please sign in to comment.