From 4c1198a36129af54844d1a6c8eddfd2138dca4ea Mon Sep 17 00:00:00 2001 From: Keith Ralphs Date: Thu, 31 Oct 2024 13:30:14 +0000 Subject: [PATCH] Test failure diagnosis --- src/blueapi/service/runner.py | 4 +- src/blueapi/worker/task_worker.py | 25 ++- tests/conftest.py | 20 +++ tests/unit_tests/client/test_client.py | 167 ++++++++++++++++++++ tests/unit_tests/service/test_runner.py | 9 ++ tests/unit_tests/utils/test_tracing.py | 65 ++++++++ tests/unit_tests/worker/test_task_worker.py | 36 ++++- 7 files changed, 315 insertions(+), 11 deletions(-) create mode 100644 tests/unit_tests/utils/test_tracing.py diff --git a/src/blueapi/service/runner.py b/src/blueapi/service/runner.py index 8e2cd2958..658fe8a5e 100644 --- a/src/blueapi/service/runner.py +++ b/src/blueapi/service/runner.py @@ -5,7 +5,7 @@ from importlib import import_module from multiprocessing import Pool, set_start_method from multiprocessing.pool import Pool as PoolClass -from typing import Any, Concatenate, ParamSpec, TypeVar +from typing import Any, ParamSpec, TypeVar from observability_utils.tracing import ( add_span_attributes, @@ -104,7 +104,7 @@ def stop(self): @start_as_current_span(TRACER, "function", "args", "kwargs") def run( self, - function: Callable[Concatenate[dict[str, Any], P], T], + function: Callable[P, T], *args: P.args, **kwargs: P.kwargs, ) -> T: diff --git a/src/blueapi/worker/task_worker.py b/src/blueapi/worker/task_worker.py index 10cee7017..2c8ba35d7 100644 --- a/src/blueapi/worker/task_worker.py +++ b/src/blueapi/worker/task_worker.py @@ -222,6 +222,7 @@ def mark_task_as_started(event: WorkerEvent, _: str | None) -> None: LOGGER.info(f"Submitting: {trackable_task}") try: + self._current_task_otel_context = get_trace_context() sub = self.worker_events.subscribe(mark_task_as_started) """ Cache the current trace context as the one for this task id """ self._task_channel.put_nowait(trackable_task) @@ -314,14 +315,22 @@ def _cycle(self) -> None: LOGGER.info("Awaiting task") next_task: TrackableTask | KillSignal = self._task_channel.get() if isinstance(next_task, TrackableTask): - LOGGER.info(f"Got new task: {next_task}") - self._current = next_task # Informing mypy that the task is not None - - self._current_task_otel_context = get_trace_context() - add_span_attributes({"next_task.task_id": next_task.task_id}) - - self._current.is_pending = False - self._current.task.do_task(self._ctx) + if self._current_task_otel_context is not None: + with TRACER.start_as_current_span( + "_cycle", + context=self._current_task_otel_context, + kind=SpanKind.SERVER, + ): + LOGGER.info(f"Got new task: {next_task}") + self._current = ( + next_task # Informing mypy that the task is not None + ) + + self._current_task_otel_context = get_trace_context() + add_span_attributes({"next_task.task_id": next_task.task_id}) + + self._current.is_pending = False + self._current.task.do_task(self._ctx) elif isinstance(next_task, KillSignal): # If we receive a kill signal we begin to shut the worker down. # Note that the kill signal is explicitly not a type of task as we don't diff --git a/tests/conftest.py b/tests/conftest.py index 838d4b219..7ff58847e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,16 @@ import asyncio +from typing import cast # Based on https://docs.pytest.org/en/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option # noqa: E501 import pytest from bluesky import RunEngine from bluesky.run_engine import TransitionError +from observability_utils.tracing import setup_tracing +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import get_tracer_provider + +from tests.unit_tests.utils.test_tracing import JsonObjectSpanExporter @pytest.fixture(scope="function") @@ -24,3 +31,16 @@ def clean_event_loop(): request.addfinalizer(clean_event_loop) return RE + + +@pytest.fixture +def provider() -> TracerProvider: + setup_tracing("test", False) + return cast(TracerProvider, get_tracer_provider()) + + +@pytest.fixture +def exporter(provider: TracerProvider) -> JsonObjectSpanExporter: + exporter = JsonObjectSpanExporter() + provider.add_span_processor(BatchSpanProcessor(exporter)) + return exporter diff --git a/tests/unit_tests/client/test_client.py b/tests/unit_tests/client/test_client.py index f2ea986fd..1cff09ace 100644 --- a/tests/unit_tests/client/test_client.py +++ b/tests/unit_tests/client/test_client.py @@ -3,6 +3,7 @@ import pytest from bluesky_stomp.messaging import MessageContext +from tests.unit_tests.utils.test_tracing import JsonObjectSpanExporter, span_exporter from blueapi.client.client import BlueapiClient from blueapi.client.event_bus import AnyEvent, BlueskyStreamingError, EventBusClient @@ -98,10 +99,20 @@ def test_get_plans(client: BlueapiClient): assert client.get_plans() == PLANS +def test_get_plans_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with span_exporter(exporter, "get_plans"): + assert client.get_plans() == PLANS + + def test_get_plan(client: BlueapiClient): assert client.get_plan("foo") == PLAN +def test_get_plan_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with span_exporter(exporter, "get_plan", "name"): + assert client.get_plan("foo") == PLAN + + def test_get_nonexistant_plan( client: BlueapiClient, mock_rest: Mock, @@ -115,10 +126,20 @@ def test_get_devices(client: BlueapiClient): assert client.get_devices() == DEVICES +def test_get_devices_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with span_exporter(exporter, "get_devices"): + assert client.get_devices() == DEVICES + + def test_get_device(client: BlueapiClient): assert client.get_device("foo") == DEVICE +def test_get_device_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with span_exporter(exporter, "get_device", "name"): + assert client.get_device("foo") == DEVICE + + def test_get_nonexistant_device( client: BlueapiClient, mock_rest: Mock, @@ -132,10 +153,20 @@ def test_get_state(client: BlueapiClient): assert client.get_state() == WorkerState.IDLE +def test_get_state_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with span_exporter(exporter, "get_state"): + assert client.get_state() == WorkerState.IDLE + + def test_get_task(client: BlueapiClient): assert client.get_task("foo") == TASK +def test_get_task_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with span_exporter(exporter, "get_task", "task_id"): + assert client.get_task("foo") == TASK + + def test_get_nonexistent_task( client: BlueapiClient, mock_rest: Mock, @@ -157,6 +188,14 @@ def test_get_all_tasks( assert client.get_all_tasks() == TASKS +def test_get_all_tasks_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, +): + with span_exporter(exporter, "get_all_tasks"): + assert client.get_all_tasks() == TASKS + + def test_create_task( client: BlueapiClient, mock_rest: Mock, @@ -165,6 +204,16 @@ def test_create_task( mock_rest.create_task.assert_called_once_with(Task(name="foo")) +def test_create_task_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with span_exporter(exporter, "create_task", "task"): + client.create_task(task=Task(name="foo")) + mock_rest.create_task.assert_called_once_with(Task(name="foo")) + + def test_create_task_does_not_start_task( client: BlueapiClient, mock_rest: Mock, @@ -181,10 +230,27 @@ def test_clear_task( mock_rest.clear_task.assert_called_once_with("foo") +def test_clear_task_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with span_exporter(exporter, "clear_task"): + client.clear_task(task_id="foo") + mock_rest.clear_task.assert_called_once_with("foo") + + def test_get_active_task(client: BlueapiClient): assert client.get_active_task() == ACTIVE_TASK +def test_get_active_task_span_ok( + exporter: JsonObjectSpanExporter, client: BlueapiClient +): + with span_exporter(exporter, "get_active_task"): + assert client.get_active_task() == ACTIVE_TASK + + def test_start_task( client: BlueapiClient, mock_rest: Mock, @@ -193,6 +259,16 @@ def test_start_task( mock_rest.update_worker_task.assert_called_once_with(WorkerTask(task_id="bar")) +def test_start_task_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with span_exporter(exporter, "start_task", "task"): + client.start_task(task=WorkerTask(task_id="bar")) + mock_rest.update_worker_task.assert_called_once_with(WorkerTask(task_id="bar")) + + def test_start_nonexistant_task( client: BlueapiClient, mock_rest: Mock, @@ -213,6 +289,19 @@ def test_create_and_start_task_calls_both_creating_and_starting_endpoints( mock_rest.update_worker_task.assert_called_once_with(WorkerTask(task_id="baz")) +def test_create_and_start_task_calls_both_creating_and_starting_endpoints_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + mock_rest.create_task.return_value = TaskResponse(task_id="baz") + mock_rest.update_worker_task.return_value = TaskResponse(task_id="baz") + with span_exporter(exporter, "create_and_start_task", "task"): + client.create_and_start_task(Task(name="baz")) + mock_rest.create_task.assert_called_once_with(Task(name="baz")) + mock_rest.update_worker_task.assert_called_once_with(WorkerTask(task_id="baz")) + + def test_create_and_start_task_fails_if_task_creation_fails( client: BlueapiClient, mock_rest: Mock, @@ -246,6 +335,13 @@ def test_get_environment(client: BlueapiClient): assert client.get_environment() == ENV +def test_get_environment_span_ok( + exporter: JsonObjectSpanExporter, client: BlueapiClient +): + with span_exporter(exporter, "get_environment"): + assert client.get_environment() == ENV + + def test_reload_environment( client: BlueapiClient, mock_rest: Mock, @@ -255,6 +351,17 @@ def test_reload_environment( mock_rest.delete_environment.assert_called_once() +def test_reload_environment_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with span_exporter(exporter, "reload_environment"): + client.reload_environment() + mock_rest.get_environment.assert_called_once() + mock_rest.delete_environment.assert_called_once() + + def test_reload_environment_failure( client: BlueapiClient, mock_rest: Mock, @@ -277,6 +384,19 @@ def test_abort( ) +def test_abort_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with span_exporter(exporter, "abort", "reason"): + client.abort(reason="foo") + mock_rest.cancel_current_task.assert_called_once_with( + WorkerState.ABORTING, + reason="foo", + ) + + def test_stop( client: BlueapiClient, mock_rest: Mock, @@ -285,6 +405,16 @@ def test_stop( mock_rest.cancel_current_task.assert_called_once_with(WorkerState.STOPPING) +def test_stop_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with span_exporter(exporter, "stop"): + client.stop() + mock_rest.cancel_current_task.assert_called_once_with(WorkerState.STOPPING) + + def test_pause( client: BlueapiClient, mock_rest: Mock, @@ -296,6 +426,19 @@ def test_pause( ) +def test_pause_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with span_exporter(exporter, "pause"): + client.pause(defer=True) + mock_rest.set_state.assert_called_once_with( + WorkerState.PAUSED, + defer=True, + ) + + def test_resume( client: BlueapiClient, mock_rest: Mock, @@ -307,6 +450,19 @@ def test_resume( ) +def test_resume_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with span_exporter(exporter, "resume"): + client.resume() + mock_rest.set_state.assert_called_once_with( + WorkerState.RUNNING, + defer=False, + ) + + def test_cannot_run_task_without_message_bus(client: BlueapiClient): with pytest.raises( RuntimeError, @@ -315,6 +471,17 @@ def test_cannot_run_task_without_message_bus(client: BlueapiClient): client.run_task(Task(name="foo")) +def test_cannot_run_task_without_message_bus_span_ok( + exporter: JsonObjectSpanExporter, client: BlueapiClient +): + with pytest.raises( + RuntimeError, + match="Cannot run plans without Stomp configuration to track progress", + ): + with span_exporter(exporter, "grun_task"): + client.run_task(Task(name="foo")) + + def test_run_task_sets_up_control( client_with_events: BlueapiClient, mock_rest: Mock, diff --git a/tests/unit_tests/service/test_runner.py b/tests/unit_tests/service/test_runner.py index af65e9b0a..2a308003c 100644 --- a/tests/unit_tests/service/test_runner.py +++ b/tests/unit_tests/service/test_runner.py @@ -5,6 +5,7 @@ import pytest from ophyd import Callable from pydantic import BaseModel, ValidationError +from tests.unit_tests.utils.test_tracing import JsonObjectSpanExporter, span_exporter from blueapi.service import interface from blueapi.service.model import EnvironmentResponse @@ -55,6 +56,14 @@ def test_raises_if_used_before_started(runner: WorkerDispatcher): runner.run(interface.get_plans) +def test_raises_if_used_before_started_span_ok( + exporter: JsonObjectSpanExporter, runner: WorkerDispatcher +): + with pytest.raises(InvalidRunnerStateError): + with span_exporter(exporter, "run", "function", "args", "kwargs"): + runner.run(interface.get_plans) + + def test_error_on_runner_setup(local_runner: WorkerDispatcher): expected_state = EnvironmentResponse( initialized=False, diff --git a/tests/unit_tests/utils/test_tracing.py b/tests/unit_tests/utils/test_tracing.py new file mode 100644 index 000000000..c94b3cde4 --- /dev/null +++ b/tests/unit_tests/utils/test_tracing.py @@ -0,0 +1,65 @@ +from collections.abc import Callable, Sequence +from concurrent.futures import Future +from contextlib import contextmanager +from typing import IO + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import ( + SpanExporter, + SpanExportResult, +) + + +class JsonObjectSpanExporter(SpanExporter): + """A custom span exporter to allow spans created by open telemetry tracing code to + be examined and verified during normal testing + """ + + def __init__( + self, + service_name: str | None = "Test", + out: IO | None = None, + formatter: Callable[[ReadableSpan], str] | None = None, + ): + self.service_name = service_name + self.top_span = Future() + + # def prime(self): + # if self.top_span is not None: + # try: + # self.top_span.result(timeout=0.0) + # except: + # pass + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + if self.top_span is not None and not self.top_span.done(): + self.top_span.set_result(spans[-1]) + return SpanExportResult.SUCCESS + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + +# setup_tracing("test", False) +# PROVIDER = cast(TracerProvider, get_tracer_provider()) +# EXPORTER = JsonObjectSpanExporter() +# PROVIDER.add_span_processor(BatchSpanProcessor(EXPORTER)) + + +@contextmanager +def span_exporter(exporter: JsonObjectSpanExporter, func_name: str, *span_args: str): + """Use as a with block around the function under test decorated with + start_as_current_span to check span creation and content. + + params: + func_name: The name of the function being tested + span_args: The arguments specified in its start_as_current_span decorator + """ + # EXPORTER.prime() + yield + if exporter.top_span is not None: + span = exporter.top_span.result(10) + exporter.top_span = None + assert span.name == func_name + for param in span_args: + assert param in span.attributes.keys() diff --git a/tests/unit_tests/worker/test_task_worker.py b/tests/unit_tests/worker/test_task_worker.py index 601a38d27..afa9b2902 100644 --- a/tests/unit_tests/worker/test_task_worker.py +++ b/tests/unit_tests/worker/test_task_worker.py @@ -7,6 +7,7 @@ from unittest.mock import ANY, MagicMock, patch import pytest +from tests.unit_tests.utils.test_tracing import JsonObjectSpanExporter, span_exporter from blueapi.config import EnvironmentConfig, Source, SourceKind from blueapi.core import BlueskyContext, EventStream, MsgGenerator @@ -87,6 +88,15 @@ def test_stop_doesnt_hang(inert_worker: TaskWorker) -> None: inert_worker.stop() +def test_stop_doesnt_hang_span_ok( + exporter: JsonObjectSpanExporter, inert_worker: TaskWorker +) -> None: + with span_exporter(exporter, "start"): + inert_worker.start() + with span_exporter(exporter, "stop"): + inert_worker.stop() + + def test_stop_is_idempontent_if_worker_not_started(inert_worker: TaskWorker) -> None: inert_worker.stop() @@ -111,7 +121,9 @@ def test_multi_start(inert_worker: TaskWorker) -> None: inert_worker.stop() -def test_submit_task(worker: TaskWorker) -> None: +def test_submit_task( + worker: TaskWorker, +) -> None: assert worker.get_tasks() == [] task_id = worker.submit_task(_SIMPLE_TASK) assert worker.get_tasks() == [ @@ -121,6 +133,20 @@ def test_submit_task(worker: TaskWorker) -> None: ] +def test_submit_task_span_ok( + exporter: JsonObjectSpanExporter, + worker: TaskWorker, +) -> None: + assert worker.get_tasks() == [] + with span_exporter(exporter, "submit_task", "task.name", "task.params"): + task_id = worker.submit_task(_SIMPLE_TASK) + assert worker.get_tasks() == [ + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) + ] + + def test_submit_multiple_tasks(worker: TaskWorker) -> None: assert worker.get_tasks() == [] task_id_1 = worker.submit_task(_SIMPLE_TASK) @@ -194,6 +220,14 @@ def test_clear_nonexistent_task(worker: TaskWorker) -> None: worker.clear_task("foo") +def test_clear_nonexistent_task_span_ok( + exporter: JsonObjectSpanExporter, worker: TaskWorker +) -> None: + with pytest.raises(KeyError): + with span_exporter(exporter, "clear_task", "task_id"): + worker.clear_task("foo") + + def test_does_not_allow_simultaneous_running_tasks( worker: TaskWorker, fake_device: FakeDevice,