From 8815a2acb295051636eecbc32d29be96d79a686d Mon Sep 17 00:00:00 2001 From: Casper van der Wel Date: Wed, 1 Nov 2023 09:19:22 +0100 Subject: [PATCH] Add correlation_id to context to follow distributed tasks (#27) --- CHANGES.md | 8 ++- clean_python/base/domain/context.py | 12 ++++ clean_python/dramatiq/dramatiq_task_logger.py | 8 +++ clean_python/fastapi/fastapi_access_logger.py | 9 +-- clean_python/fastapi/service.py | 10 ++- clean_python/fluentbit/fluentbit_gateway.py | 37 ++++++++-- tests/fastapi/test_fastapi_access_logger.py | 20 +++++- tests/fastapi/test_service_context.py | 70 +++++++++++++++++++ .../{test_service.py => test_service_init.py} | 0 tests/test_dramatiq_task_logger.py | 37 +++++++--- 10 files changed, 187 insertions(+), 24 deletions(-) create mode 100644 tests/fastapi/test_service_context.py rename tests/fastapi/{test_service.py => test_service_init.py} (100%) diff --git a/CHANGES.md b/CHANGES.md index 03287ad..5e2696a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,7 +4,13 @@ 0.6.10 (unreleased) ------------------- -- Nothing changed yet. +- Add correlation_id to logging and accept X-Correlation-Id header in + fastapi service. + +- Add `SyncFluentbitGateway`. + +- Log the nanosecond-precision "time" instead of the second-precision logtime + in `[Sync]FluentbitGateway`. 0.6.9 (2023-10-11) diff --git a/clean_python/base/domain/context.py b/clean_python/base/domain/context.py index 034d274..0498eb0 100644 --- a/clean_python/base/domain/context.py +++ b/clean_python/base/domain/context.py @@ -4,6 +4,7 @@ from contextvars import ContextVar from typing import FrozenSet from typing import Optional +from uuid import UUID from pydantic import AnyUrl from pydantic import FileUrl @@ -45,6 +46,9 @@ def __init__(self): self._tenant_value: ContextVar[Optional[Tenant]] = ContextVar( "tenant_value", default=None ) + self._correlation_id_value: ContextVar[Optional[UUID]] = ContextVar( + "correlation_id", default=None + ) @property def path(self) -> AnyUrl: @@ -70,5 +74,13 @@ def tenant(self) -> Optional[Tenant]: def tenant(self, value: Optional[Tenant]) -> None: self._tenant_value.set(value) + @property + def correlation_id(self) -> Optional[UUID]: + return self._correlation_id_value.get() + + @correlation_id.setter + def correlation_id(self, value: Optional[UUID]) -> None: + self._correlation_id_value.set(value) + ctx = Context() diff --git a/clean_python/dramatiq/dramatiq_task_logger.py b/clean_python/dramatiq/dramatiq_task_logger.py index e834301..8688196 100644 --- a/clean_python/dramatiq/dramatiq_task_logger.py +++ b/clean_python/dramatiq/dramatiq_task_logger.py @@ -13,6 +13,7 @@ from dramatiq.message import Message from dramatiq.middleware import SkipMessage +from clean_python import ctx from clean_python import Gateway from clean_python.fluentbit import FluentbitGateway @@ -72,6 +73,11 @@ async def stop(self, message: Message, result=None, exception=None): except AttributeError: duration = 0 + try: + start_time = self.local.start_time + except AttributeError: + start_time = None + log_dict = { "tag_suffix": "task_log", "task_id": message.message_id, @@ -83,5 +89,7 @@ async def stop(self, message: Message, result=None, exception=None): "argsrepr": self.encoder.encode(message.args), "kwargsrepr": self.encoder.encode(message.kwargs), "result": result, + "time": start_time, + "correlation_id": str(ctx.correlation_id) if ctx.correlation_id else None, } return await self.gateway.add(log_dict) diff --git a/clean_python/fastapi/fastapi_access_logger.py b/clean_python/fastapi/fastapi_access_logger.py index 3e2fa02..8c09c17 100644 --- a/clean_python/fastapi/fastapi_access_logger.py +++ b/clean_python/fastapi/fastapi_access_logger.py @@ -2,7 +2,6 @@ import os import time -from datetime import datetime from typing import Awaitable from typing import Callable from typing import Optional @@ -12,6 +11,7 @@ from starlette.requests import Request from starlette.responses import Response +from clean_python import ctx from clean_python import Gateway from clean_python.fluentbit import FluentbitGateway @@ -44,10 +44,6 @@ async def __call__( return response -def fmt_timestamp(timestamp: float) -> str: - return datetime.utcfromtimestamp(timestamp).isoformat() + "Z" - - async def log_access( gateway: Gateway, request: Request, @@ -81,7 +77,8 @@ async def log_access( "status": response.status_code, "content_type": response.headers.get("content-type"), "content_length": content_length, - "time": fmt_timestamp(time_received), + "time": time_received, "request_time": request_time, + "correlation_id": str(ctx.correlation_id) if ctx.correlation_id else None, } await gateway.add(item) diff --git a/clean_python/fastapi/service.py b/clean_python/fastapi/service.py index bd4c60c..0cc6ffe 100644 --- a/clean_python/fastapi/service.py +++ b/clean_python/fastapi/service.py @@ -6,9 +6,12 @@ from typing import List from typing import Optional from typing import Set +from uuid import UUID +from uuid import uuid4 from fastapi import Depends from fastapi import FastAPI +from fastapi import Header from fastapi import Request from fastapi.exceptions import RequestValidationError from starlette.types import ASGIApp @@ -62,10 +65,15 @@ def get_auth_kwargs(auth_client: Optional[OAuth2SPAClientSettings]) -> Dict[str, } -async def set_context(request: Request, token: Token = Depends(get_token)) -> None: +async def set_context( + request: Request, + token: Token = Depends(get_token), + x_correlation_id: UUID = Header(default_factory=uuid4), +) -> None: ctx.path = request.url ctx.user = token.user ctx.tenant = token.tenant + ctx.correlation_id = x_correlation_id async def health_check(): diff --git a/clean_python/fluentbit/fluentbit_gateway.py b/clean_python/fluentbit/fluentbit_gateway.py index c217912..7aa7b1f 100644 --- a/clean_python/fluentbit/fluentbit_gateway.py +++ b/clean_python/fluentbit/fluentbit_gateway.py @@ -1,19 +1,46 @@ # (c) Nelen & Schuurmans +import time +from typing import Tuple + from asgiref.sync import sync_to_async from fluent.sender import FluentSender from clean_python import Gateway from clean_python import Json +from clean_python import SyncGateway + +__all__ = ["FluentbitGateway", "SyncFluentbitGateway"] + + +def unpack_item(item: Json) -> Tuple[str, float, Json]: + data = item.copy() + label = data.pop("tag_suffix", "") + timestamp = data.pop("time", None) + if timestamp is None: + timestamp = time.time() + return label, timestamp, data -__all__ = ["FluentbitGateway"] + +class SyncFluentbitGateway(SyncGateway): + def __init__(self, tag: str, host: str, port: int): + self._sender = FluentSender( + tag, host=host, port=port, nanosecond_precision=True + ) + + def add(self, item: Json): + label, timestamp, data = unpack_item(item) + self._sender.emit_with_time(label, timestamp, data) + return {**data, "time": timestamp, "tag_suffix": label} class FluentbitGateway(Gateway): def __init__(self, tag: str, host: str, port: int): - self._sender = FluentSender(tag, host=host, port=port) + self._sync_gateway = SyncFluentbitGateway(tag, host, port) @sync_to_async - def add(self, item: Json) -> Json: - self._sender.emit(item.pop("tag_suffix", ""), item) - return item + def _add(self, item: Json) -> Json: + return self._sync_gateway.add(item) + + async def add(self, item: Json) -> Json: + return await self._add(item) diff --git a/tests/fastapi/test_fastapi_access_logger.py b/tests/fastapi/test_fastapi_access_logger.py index 7e6e1f1..5412d71 100644 --- a/tests/fastapi/test_fastapi_access_logger.py +++ b/tests/fastapi/test_fastapi_access_logger.py @@ -1,4 +1,5 @@ from unittest import mock +from uuid import uuid4 import pytest from fastapi.routing import APIRoute @@ -6,6 +7,7 @@ from starlette.responses import JSONResponse from starlette.responses import StreamingResponse +from clean_python import ctx from clean_python import InMemoryGateway from clean_python.fastapi import FastAPIAccessLogger @@ -67,8 +69,18 @@ async def func(request): return func +@pytest.fixture +def correlation_id(): + uid = uuid4() + ctx.correlation_id = uid + yield uid + ctx.correlation_id = None + + @mock.patch("time.time", return_value=0.0) -async def test_logging(time, fastapi_access_logger, req, response, call_next): +async def test_logging( + time, fastapi_access_logger, req, response, call_next, correlation_id +): await fastapi_access_logger(req, call_next) assert len(fastapi_access_logger.gateway.data) == 0 await response.background() @@ -87,8 +99,9 @@ async def test_logging(time, fastapi_access_logger, req, response, call_next): "status": 200, "content_type": "application/json", "content_length": 13, - "time": "1970-01-01T00:00:00Z", + "time": 0.0, "request_time": 0.0, + "correlation_id": str(correlation_id), } @@ -149,6 +162,7 @@ async def test_logging_minimal( "status": 200, "content_type": "text/html; charset=utf-8", "content_length": None, - "time": "1970-01-01T00:00:00Z", + "time": 0.0, "request_time": 0.0, + "correlation_id": None, } diff --git a/tests/fastapi/test_service_context.py b/tests/fastapi/test_service_context.py new file mode 100644 index 0000000..08b0e48 --- /dev/null +++ b/tests/fastapi/test_service_context.py @@ -0,0 +1,70 @@ +from http import HTTPStatus +from uuid import UUID +from uuid import uuid4 + +import pytest +from fastapi.testclient import TestClient + +from clean_python import ctx +from clean_python import InMemoryGateway +from clean_python.fastapi import get +from clean_python.fastapi import Resource +from clean_python.fastapi import Service +from clean_python.fastapi import v + + +class FooResource(Resource, version=v(1), name="testing"): + @get("/context") + def context(self): + return { + "path": str(ctx.path), + "user": ctx.user, + "tenant": ctx.tenant, + "correlation_id": str(ctx.correlation_id), + } + + +@pytest.fixture +def app(): + return Service(FooResource()).create_app( + title="test", + description="testing", + hostname="testserver", + access_logger_gateway=InMemoryGateway([]), + ) + + +@pytest.fixture +def client(app): + return TestClient(app) + + +def test_default_context(app, client: TestClient): + response = client.get(app.url_path_for("v1/context")) + + assert response.status_code == HTTPStatus.OK + + body = response.json() + + assert body["path"] == "http://testserver/v1/context" + assert body["user"] == {"id": "DEV", "name": "dev"} + assert body["tenant"] is None + UUID(body["correlation_id"]) # randomly generated uuid + + assert ctx.correlation_id is None + + +def test_x_correlation_id_header(app, client: TestClient): + uid = str(uuid4()) + response = client.get( + app.url_path_for("v1/context"), + headers={"X-Correlation-Id": uid}, + ) + + assert response.status_code == HTTPStatus.OK + + body = response.json() + + assert body["correlation_id"] == uid + + assert ctx.correlation_id is None diff --git a/tests/fastapi/test_service.py b/tests/fastapi/test_service_init.py similarity index 100% rename from tests/fastapi/test_service.py rename to tests/fastapi/test_service_init.py diff --git a/tests/test_dramatiq_task_logger.py b/tests/test_dramatiq_task_logger.py index 3dd5367..fb54384 100644 --- a/tests/test_dramatiq_task_logger.py +++ b/tests/test_dramatiq_task_logger.py @@ -1,10 +1,12 @@ import os from unittest import mock +from uuid import uuid4 import pytest from dramatiq.errors import Retry from dramatiq.message import Message +from clean_python import ctx from clean_python import InMemoryGateway from clean_python.dramatiq import DramatiqTaskLogger @@ -22,6 +24,20 @@ def task_logger(in_memory_gateway): ) +@pytest.fixture +def correlation_id(): + uid = uuid4() + ctx.correlation_id = uid + yield uid + ctx.correlation_id = None + + +@pytest.fixture +def patched_time(): + with mock.patch("time.time", side_effect=(0, 123.456)): + yield + + @pytest.fixture def message(): return Message( @@ -36,32 +52,36 @@ def message(): @pytest.fixture -def expected(): +def expected(correlation_id): return { "id": 1, "tag_suffix": "task_log", "task_id": "abc123", "name": "my_task", "state": "SUCCESS", - "duration": 0, + "duration": 123.456, "retries": 0, "origin": f"host-{os.getpid()}", "argsrepr": b"[1,2]", "kwargsrepr": b'{"foo":"bar"}', "result": None, + "time": 0.0, + "correlation_id": str(correlation_id), } -@mock.patch("time.time", return_value=123) -async def test_log_success(time, task_logger, in_memory_gateway, message, expected): +async def test_log_success( + patched_time, task_logger, in_memory_gateway, message, expected +): await task_logger.start() await task_logger.stop(message) assert in_memory_gateway.data[1] == expected -@mock.patch("time.time", new=mock.Mock(return_value=123)) -async def test_log_fail(task_logger, in_memory_gateway, message, expected): +async def test_log_fail( + patched_time, task_logger, in_memory_gateway, message, expected +): await task_logger.start() await task_logger.stop(message, exception=ValueError("test")) @@ -72,8 +92,9 @@ async def test_log_fail(task_logger, in_memory_gateway, message, expected): } -@mock.patch("time.time", return_value=123) -async def test_log_retry(time, task_logger, in_memory_gateway, message, expected): +async def test_log_retry( + patched_time, task_logger, in_memory_gateway, message, expected +): await task_logger.start() await task_logger.stop(message, exception=Retry("test"))