Skip to content

Commit

Permalink
Test failure diagnosis
Browse files Browse the repository at this point in the history
  • Loading branch information
keithralphs committed Oct 31, 2024
1 parent 820c84d commit 4c1198a
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/blueapi/service/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from importlib import import_module
from multiprocessing import Pool, set_start_method
from multiprocessing.pool import Pool as PoolClass
from typing import Any, Concatenate, ParamSpec, TypeVar
from typing import Any, ParamSpec, TypeVar

from observability_utils.tracing import (
add_span_attributes,
Expand Down Expand Up @@ -104,7 +104,7 @@ def stop(self):
@start_as_current_span(TRACER, "function", "args", "kwargs")
def run(
self,
function: Callable[Concatenate[dict[str, Any], P], T],
function: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> T:
Expand Down
25 changes: 17 additions & 8 deletions src/blueapi/worker/task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def mark_task_as_started(event: WorkerEvent, _: str | None) -> None:

LOGGER.info(f"Submitting: {trackable_task}")
try:
self._current_task_otel_context = get_trace_context()
sub = self.worker_events.subscribe(mark_task_as_started)
""" Cache the current trace context as the one for this task id """
self._task_channel.put_nowait(trackable_task)
Expand Down Expand Up @@ -314,14 +315,22 @@ def _cycle(self) -> None:
LOGGER.info("Awaiting task")
next_task: TrackableTask | KillSignal = self._task_channel.get()
if isinstance(next_task, TrackableTask):
LOGGER.info(f"Got new task: {next_task}")
self._current = next_task # Informing mypy that the task is not None

self._current_task_otel_context = get_trace_context()
add_span_attributes({"next_task.task_id": next_task.task_id})

self._current.is_pending = False
self._current.task.do_task(self._ctx)
if self._current_task_otel_context is not None:
with TRACER.start_as_current_span(
"_cycle",
context=self._current_task_otel_context,
kind=SpanKind.SERVER,
):
LOGGER.info(f"Got new task: {next_task}")
self._current = (
next_task # Informing mypy that the task is not None
)

self._current_task_otel_context = get_trace_context()
add_span_attributes({"next_task.task_id": next_task.task_id})

self._current.is_pending = False
self._current.task.do_task(self._ctx)
elif isinstance(next_task, KillSignal):
# If we receive a kill signal we begin to shut the worker down.
# Note that the kill signal is explicitly not a type of task as we don't
Expand Down
20 changes: 20 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import asyncio
from typing import cast

# Based on https://docs.pytest.org/en/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option # noqa: E501
import pytest
from bluesky import RunEngine
from bluesky.run_engine import TransitionError
from observability_utils.tracing import setup_tracing
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import get_tracer_provider

from tests.unit_tests.utils.test_tracing import JsonObjectSpanExporter


@pytest.fixture(scope="function")
Expand All @@ -24,3 +31,16 @@ def clean_event_loop():

request.addfinalizer(clean_event_loop)
return RE


@pytest.fixture
def provider() -> TracerProvider:
setup_tracing("test", False)
return cast(TracerProvider, get_tracer_provider())


@pytest.fixture
def exporter(provider: TracerProvider) -> JsonObjectSpanExporter:
exporter = JsonObjectSpanExporter()
provider.add_span_processor(BatchSpanProcessor(exporter))
return exporter
167 changes: 167 additions & 0 deletions tests/unit_tests/client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
from bluesky_stomp.messaging import MessageContext
from tests.unit_tests.utils.test_tracing import JsonObjectSpanExporter, span_exporter

from blueapi.client.client import BlueapiClient
from blueapi.client.event_bus import AnyEvent, BlueskyStreamingError, EventBusClient
Expand Down Expand Up @@ -98,10 +99,20 @@ def test_get_plans(client: BlueapiClient):
assert client.get_plans() == PLANS


def test_get_plans_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient):
with span_exporter(exporter, "get_plans"):
assert client.get_plans() == PLANS


def test_get_plan(client: BlueapiClient):
assert client.get_plan("foo") == PLAN


def test_get_plan_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient):
with span_exporter(exporter, "get_plan", "name"):
assert client.get_plan("foo") == PLAN


def test_get_nonexistant_plan(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -115,10 +126,20 @@ def test_get_devices(client: BlueapiClient):
assert client.get_devices() == DEVICES


def test_get_devices_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient):
with span_exporter(exporter, "get_devices"):
assert client.get_devices() == DEVICES


def test_get_device(client: BlueapiClient):
assert client.get_device("foo") == DEVICE


def test_get_device_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient):
with span_exporter(exporter, "get_device", "name"):
assert client.get_device("foo") == DEVICE


def test_get_nonexistant_device(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -132,10 +153,20 @@ def test_get_state(client: BlueapiClient):
assert client.get_state() == WorkerState.IDLE


def test_get_state_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient):
with span_exporter(exporter, "get_state"):
assert client.get_state() == WorkerState.IDLE


def test_get_task(client: BlueapiClient):
assert client.get_task("foo") == TASK


def test_get_task_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient):
with span_exporter(exporter, "get_task", "task_id"):
assert client.get_task("foo") == TASK


def test_get_nonexistent_task(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -157,6 +188,14 @@ def test_get_all_tasks(
assert client.get_all_tasks() == TASKS


def test_get_all_tasks_span_ok(
exporter: JsonObjectSpanExporter,
client: BlueapiClient,
):
with span_exporter(exporter, "get_all_tasks"):
assert client.get_all_tasks() == TASKS


def test_create_task(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -165,6 +204,16 @@ def test_create_task(
mock_rest.create_task.assert_called_once_with(Task(name="foo"))


def test_create_task_span_ok(
exporter: JsonObjectSpanExporter,
client: BlueapiClient,
mock_rest: Mock,
):
with span_exporter(exporter, "create_task", "task"):
client.create_task(task=Task(name="foo"))
mock_rest.create_task.assert_called_once_with(Task(name="foo"))


def test_create_task_does_not_start_task(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -181,10 +230,27 @@ def test_clear_task(
mock_rest.clear_task.assert_called_once_with("foo")


def test_clear_task_span_ok(
exporter: JsonObjectSpanExporter,
client: BlueapiClient,
mock_rest: Mock,
):
with span_exporter(exporter, "clear_task"):
client.clear_task(task_id="foo")
mock_rest.clear_task.assert_called_once_with("foo")


def test_get_active_task(client: BlueapiClient):
assert client.get_active_task() == ACTIVE_TASK


def test_get_active_task_span_ok(
exporter: JsonObjectSpanExporter, client: BlueapiClient
):
with span_exporter(exporter, "get_active_task"):
assert client.get_active_task() == ACTIVE_TASK


def test_start_task(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -193,6 +259,16 @@ def test_start_task(
mock_rest.update_worker_task.assert_called_once_with(WorkerTask(task_id="bar"))


def test_start_task_span_ok(
exporter: JsonObjectSpanExporter,
client: BlueapiClient,
mock_rest: Mock,
):
with span_exporter(exporter, "start_task", "task"):
client.start_task(task=WorkerTask(task_id="bar"))
mock_rest.update_worker_task.assert_called_once_with(WorkerTask(task_id="bar"))


def test_start_nonexistant_task(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -213,6 +289,19 @@ def test_create_and_start_task_calls_both_creating_and_starting_endpoints(
mock_rest.update_worker_task.assert_called_once_with(WorkerTask(task_id="baz"))


def test_create_and_start_task_calls_both_creating_and_starting_endpoints_span_ok(
exporter: JsonObjectSpanExporter,
client: BlueapiClient,
mock_rest: Mock,
):
mock_rest.create_task.return_value = TaskResponse(task_id="baz")
mock_rest.update_worker_task.return_value = TaskResponse(task_id="baz")
with span_exporter(exporter, "create_and_start_task", "task"):
client.create_and_start_task(Task(name="baz"))
mock_rest.create_task.assert_called_once_with(Task(name="baz"))
mock_rest.update_worker_task.assert_called_once_with(WorkerTask(task_id="baz"))


def test_create_and_start_task_fails_if_task_creation_fails(
client: BlueapiClient,
mock_rest: Mock,
Expand Down Expand Up @@ -246,6 +335,13 @@ def test_get_environment(client: BlueapiClient):
assert client.get_environment() == ENV


def test_get_environment_span_ok(
exporter: JsonObjectSpanExporter, client: BlueapiClient
):
with span_exporter(exporter, "get_environment"):
assert client.get_environment() == ENV


def test_reload_environment(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -255,6 +351,17 @@ def test_reload_environment(
mock_rest.delete_environment.assert_called_once()


def test_reload_environment_span_ok(
exporter: JsonObjectSpanExporter,
client: BlueapiClient,
mock_rest: Mock,
):
with span_exporter(exporter, "reload_environment"):
client.reload_environment()
mock_rest.get_environment.assert_called_once()
mock_rest.delete_environment.assert_called_once()


def test_reload_environment_failure(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -277,6 +384,19 @@ def test_abort(
)


def test_abort_span_ok(
exporter: JsonObjectSpanExporter,
client: BlueapiClient,
mock_rest: Mock,
):
with span_exporter(exporter, "abort", "reason"):
client.abort(reason="foo")
mock_rest.cancel_current_task.assert_called_once_with(
WorkerState.ABORTING,
reason="foo",
)


def test_stop(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -285,6 +405,16 @@ def test_stop(
mock_rest.cancel_current_task.assert_called_once_with(WorkerState.STOPPING)


def test_stop_span_ok(
exporter: JsonObjectSpanExporter,
client: BlueapiClient,
mock_rest: Mock,
):
with span_exporter(exporter, "stop"):
client.stop()
mock_rest.cancel_current_task.assert_called_once_with(WorkerState.STOPPING)


def test_pause(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -296,6 +426,19 @@ def test_pause(
)


def test_pause_span_ok(
exporter: JsonObjectSpanExporter,
client: BlueapiClient,
mock_rest: Mock,
):
with span_exporter(exporter, "pause"):
client.pause(defer=True)
mock_rest.set_state.assert_called_once_with(
WorkerState.PAUSED,
defer=True,
)


def test_resume(
client: BlueapiClient,
mock_rest: Mock,
Expand All @@ -307,6 +450,19 @@ def test_resume(
)


def test_resume_span_ok(
exporter: JsonObjectSpanExporter,
client: BlueapiClient,
mock_rest: Mock,
):
with span_exporter(exporter, "resume"):
client.resume()
mock_rest.set_state.assert_called_once_with(
WorkerState.RUNNING,
defer=False,
)


def test_cannot_run_task_without_message_bus(client: BlueapiClient):
with pytest.raises(
RuntimeError,
Expand All @@ -315,6 +471,17 @@ def test_cannot_run_task_without_message_bus(client: BlueapiClient):
client.run_task(Task(name="foo"))


def test_cannot_run_task_without_message_bus_span_ok(
exporter: JsonObjectSpanExporter, client: BlueapiClient
):
with pytest.raises(
RuntimeError,
match="Cannot run plans without Stomp configuration to track progress",
):
with span_exporter(exporter, "grun_task"):
client.run_task(Task(name="foo"))


def test_run_task_sets_up_control(
client_with_events: BlueapiClient,
mock_rest: Mock,
Expand Down
Loading

0 comments on commit 4c1198a

Please sign in to comment.