Skip to content

Commit

Permalink
(Bluesky #1682) add OpenTelemetry tracing to status
Browse files Browse the repository at this point in the history
  - Adds opentelemetry-api as a dependency
  - Registers traces on __init__ of Status objects,
    ending when they are marked done
  - Registers traces on Status.wait()
  • Loading branch information
d-perl committed Apr 18, 2024
1 parent 5c03c3f commit 6423707
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
44 changes: 29 additions & 15 deletions ophyd/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from warnings import warn

import numpy as np
from opentelemetry import trace

from .log import logger
from .utils import (
Expand All @@ -16,8 +17,10 @@
adapt_old_callback_signature,
)

tracer = trace.get_tracer(__name__)

class UseNewProperty(RuntimeError):

class UseNewProperty(RuntimeError):
...


Expand Down Expand Up @@ -80,6 +83,7 @@ class StatusBase:

def __init__(self, *, timeout=None, settle_time=0, done=None, success=None):
super().__init__()
self._tracing_span = tracer.start_span("ophyd status")
self._tname = None
self._lock = threading.RLock()
self._event = threading.Event() # state associated with done-ness
Expand Down Expand Up @@ -134,6 +138,9 @@ def __init__(self, *, timeout=None, settle_time=0, done=None, success=None):
)
self.set_exception(exc)

self._tracing_span.set_attribute("type", self.__class__.__name__)
self._tracing_span.set_attribute("object_repr", repr(self))

@property
def timeout(self):
"""
Expand Down Expand Up @@ -278,6 +285,7 @@ def _run_callbacks(self):
self,
)
self._callbacks.clear()
self._tracing_span.end()

def set_exception(self, exc):
"""
Expand Down Expand Up @@ -329,6 +337,7 @@ def set_exception(self, exc):
self._exception = exc
self._settled_event.set()

self._tracing_span.end()
if self._callback_thread is None:
self._run_callbacks()

Expand All @@ -349,6 +358,7 @@ def set_finished(self):
# Note that in either case, the callbacks themselves are run from the
# same thread. This just sets an Event, either from this thread (the
# one calling set_finished) or the thread created below.
self._tracing_span.end()
if self.settle_time > 0:
if self._callback_thread is None:

Expand Down Expand Up @@ -420,6 +430,7 @@ def exception(self, timeout=None):
raise WaitTimeoutError(f"Status {self!r} has not completed yet.")
return self._exception

@tracer.start_as_current_span("ophyd status wait")
def wait(self, timeout=None):
"""
Block until the action completes.
Expand All @@ -446,6 +457,7 @@ def wait(self, timeout=None):
indicates that the action itself raised ``TimeoutError``, distinct
from ``WaitTimeoutError`` above.
"""
trace.get_current_span().set_attribute("object_repr", repr(self))
if not self._event.wait(timeout=timeout):
raise WaitTimeoutError(f"Status {self!r} has not completed yet.")
if self._exception is not None:
Expand Down Expand Up @@ -546,9 +558,11 @@ class AndStatus(StatusBase):
"a Status that has composes two other Status objects using logical and"

def __init__(self, left, right, **kwargs):
super().__init__(**kwargs)
self.left = left
self.right = right
super().__init__(**kwargs)
self._tracing_span.set_attribute("left", repr(self.left))
self._tracing_span.set_attribute("right", repr(self.right))

def inner(status):
with self._lock:
Expand Down Expand Up @@ -581,10 +595,8 @@ def __repr__(self):
return "({self.left!r} & {self.right!r})".format(self=self)

def __str__(self):
return (
"{0}(done={1.done}, "
"success={1.success})"
"".format(self.__class__.__name__, self)
return "{0}(done={1.done}, " "success={1.success})" "".format(
self.__class__.__name__, self
)

def __contains__(self, status: StatusBase) -> bool:
Expand Down Expand Up @@ -628,13 +640,15 @@ def __init__(self, obj=None, timeout=None, settle_time=0, done=None, success=Non
super().__init__(
timeout=timeout, settle_time=settle_time, done=done, success=success
)
(
self._tracing_span.set_attribute("obj", obj)
if obj
else self._tracing_span.set_attribute("no_obj_given", True)
)

def __str__(self):
return (
"{0}(obj={1.obj}, "
"done={1.done}, "
"success={1.success})"
"".format(self.__class__.__name__, self)
return "{0}(obj={1.obj}, " "done={1.done}, " "success={1.success})" "".format(
self.__class__.__name__, self
)

__repr__ = __str__
Expand Down Expand Up @@ -664,17 +678,17 @@ def __init__(self, device, **kwargs):
self.device = device
self._watchers = []
super().__init__(**kwargs)
self._tracing_span.set_attribute("device", repr(self.device))

def _handle_failure(self):
super()._handle_failure()
self.log.debug("Trying to stop %s", repr(self.device))
self.device.stop()

def __str__(self):
return (
"{0}(device={1.device.name}, done={1.done}, "
"success={1.success})"
"".format(self.__class__.__name__, self)
device_name = self.device.name if self.device else "None"
return "{0}(device={2}, done={1.done}, " "success={1.success})" "".format(
self.__class__.__name__, self, device_name
)

def watch(self, func):
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ setup_requires =
install_requires =
networkx>=2.0
numpy
opentelemetry-api
packaging
pint

Expand Down

0 comments on commit 6423707

Please sign in to comment.