Skip to content

Commit

Permalink
Add correlation_id to context to follow distributed tasks (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervdw authored Nov 1, 2023
1 parent 4cb8b6e commit 8815a2a
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 24 deletions.
8 changes: 7 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
0.6.10 (unreleased)
-------------------

- Nothing changed yet.
- Add correlation_id to logging and accept X-Correlation-Id header in
fastapi service.

- Add `SyncFluentbitGateway`.

- Log the nanosecond-precision "time" instead of the second-precision logtime
in `[Sync]FluentbitGateway`.


0.6.9 (2023-10-11)
Expand Down
12 changes: 12 additions & 0 deletions clean_python/base/domain/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from contextvars import ContextVar
from typing import FrozenSet
from typing import Optional
from uuid import UUID

from pydantic import AnyUrl
from pydantic import FileUrl
Expand Down Expand Up @@ -45,6 +46,9 @@ def __init__(self):
self._tenant_value: ContextVar[Optional[Tenant]] = ContextVar(
"tenant_value", default=None
)
self._correlation_id_value: ContextVar[Optional[UUID]] = ContextVar(
"correlation_id", default=None
)

@property
def path(self) -> AnyUrl:
Expand All @@ -70,5 +74,13 @@ def tenant(self) -> Optional[Tenant]:
def tenant(self, value: Optional[Tenant]) -> None:
self._tenant_value.set(value)

@property
def correlation_id(self) -> Optional[UUID]:
return self._correlation_id_value.get()

@correlation_id.setter
def correlation_id(self, value: Optional[UUID]) -> None:
self._correlation_id_value.set(value)


ctx = Context()
8 changes: 8 additions & 0 deletions clean_python/dramatiq/dramatiq_task_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dramatiq.message import Message
from dramatiq.middleware import SkipMessage

from clean_python import ctx
from clean_python import Gateway
from clean_python.fluentbit import FluentbitGateway

Expand Down Expand Up @@ -72,6 +73,11 @@ async def stop(self, message: Message, result=None, exception=None):
except AttributeError:
duration = 0

try:
start_time = self.local.start_time
except AttributeError:
start_time = None

log_dict = {
"tag_suffix": "task_log",
"task_id": message.message_id,
Expand All @@ -83,5 +89,7 @@ async def stop(self, message: Message, result=None, exception=None):
"argsrepr": self.encoder.encode(message.args),
"kwargsrepr": self.encoder.encode(message.kwargs),
"result": result,
"time": start_time,
"correlation_id": str(ctx.correlation_id) if ctx.correlation_id else None,
}
return await self.gateway.add(log_dict)
9 changes: 3 additions & 6 deletions clean_python/fastapi/fastapi_access_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import os
import time
from datetime import datetime
from typing import Awaitable
from typing import Callable
from typing import Optional
Expand All @@ -12,6 +11,7 @@
from starlette.requests import Request
from starlette.responses import Response

from clean_python import ctx
from clean_python import Gateway
from clean_python.fluentbit import FluentbitGateway

Expand Down Expand Up @@ -44,10 +44,6 @@ async def __call__(
return response


def fmt_timestamp(timestamp: float) -> str:
return datetime.utcfromtimestamp(timestamp).isoformat() + "Z"


async def log_access(
gateway: Gateway,
request: Request,
Expand Down Expand Up @@ -81,7 +77,8 @@ async def log_access(
"status": response.status_code,
"content_type": response.headers.get("content-type"),
"content_length": content_length,
"time": fmt_timestamp(time_received),
"time": time_received,
"request_time": request_time,
"correlation_id": str(ctx.correlation_id) if ctx.correlation_id else None,
}
await gateway.add(item)
10 changes: 9 additions & 1 deletion clean_python/fastapi/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
from typing import List
from typing import Optional
from typing import Set
from uuid import UUID
from uuid import uuid4

from fastapi import Depends
from fastapi import FastAPI
from fastapi import Header
from fastapi import Request
from fastapi.exceptions import RequestValidationError
from starlette.types import ASGIApp
Expand Down Expand Up @@ -62,10 +65,15 @@ def get_auth_kwargs(auth_client: Optional[OAuth2SPAClientSettings]) -> Dict[str,
}


async def set_context(request: Request, token: Token = Depends(get_token)) -> None:
async def set_context(
request: Request,
token: Token = Depends(get_token),
x_correlation_id: UUID = Header(default_factory=uuid4),
) -> None:
ctx.path = request.url
ctx.user = token.user
ctx.tenant = token.tenant
ctx.correlation_id = x_correlation_id


async def health_check():
Expand Down
37 changes: 32 additions & 5 deletions clean_python/fluentbit/fluentbit_gateway.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,46 @@
# (c) Nelen & Schuurmans

import time
from typing import Tuple

from asgiref.sync import sync_to_async
from fluent.sender import FluentSender

from clean_python import Gateway
from clean_python import Json
from clean_python import SyncGateway

__all__ = ["FluentbitGateway", "SyncFluentbitGateway"]


def unpack_item(item: Json) -> Tuple[str, float, Json]:
data = item.copy()
label = data.pop("tag_suffix", "")
timestamp = data.pop("time", None)
if timestamp is None:
timestamp = time.time()
return label, timestamp, data

__all__ = ["FluentbitGateway"]

class SyncFluentbitGateway(SyncGateway):
def __init__(self, tag: str, host: str, port: int):
self._sender = FluentSender(
tag, host=host, port=port, nanosecond_precision=True
)

def add(self, item: Json):
label, timestamp, data = unpack_item(item)
self._sender.emit_with_time(label, timestamp, data)
return {**data, "time": timestamp, "tag_suffix": label}


class FluentbitGateway(Gateway):
def __init__(self, tag: str, host: str, port: int):
self._sender = FluentSender(tag, host=host, port=port)
self._sync_gateway = SyncFluentbitGateway(tag, host, port)

@sync_to_async
def add(self, item: Json) -> Json:
self._sender.emit(item.pop("tag_suffix", ""), item)
return item
def _add(self, item: Json) -> Json:
return self._sync_gateway.add(item)

async def add(self, item: Json) -> Json:
return await self._add(item)
20 changes: 17 additions & 3 deletions tests/fastapi/test_fastapi_access_logger.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from unittest import mock
from uuid import uuid4

import pytest
from fastapi.routing import APIRoute
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.responses import StreamingResponse

from clean_python import ctx
from clean_python import InMemoryGateway
from clean_python.fastapi import FastAPIAccessLogger

Expand Down Expand Up @@ -67,8 +69,18 @@ async def func(request):
return func


@pytest.fixture
def correlation_id():
uid = uuid4()
ctx.correlation_id = uid
yield uid
ctx.correlation_id = None


@mock.patch("time.time", return_value=0.0)
async def test_logging(time, fastapi_access_logger, req, response, call_next):
async def test_logging(
time, fastapi_access_logger, req, response, call_next, correlation_id
):
await fastapi_access_logger(req, call_next)
assert len(fastapi_access_logger.gateway.data) == 0
await response.background()
Expand All @@ -87,8 +99,9 @@ async def test_logging(time, fastapi_access_logger, req, response, call_next):
"status": 200,
"content_type": "application/json",
"content_length": 13,
"time": "1970-01-01T00:00:00Z",
"time": 0.0,
"request_time": 0.0,
"correlation_id": str(correlation_id),
}


Expand Down Expand Up @@ -149,6 +162,7 @@ async def test_logging_minimal(
"status": 200,
"content_type": "text/html; charset=utf-8",
"content_length": None,
"time": "1970-01-01T00:00:00Z",
"time": 0.0,
"request_time": 0.0,
"correlation_id": None,
}
70 changes: 70 additions & 0 deletions tests/fastapi/test_service_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from http import HTTPStatus
from uuid import UUID
from uuid import uuid4

import pytest
from fastapi.testclient import TestClient

from clean_python import ctx
from clean_python import InMemoryGateway
from clean_python.fastapi import get
from clean_python.fastapi import Resource
from clean_python.fastapi import Service
from clean_python.fastapi import v


class FooResource(Resource, version=v(1), name="testing"):
@get("/context")
def context(self):
return {
"path": str(ctx.path),
"user": ctx.user,
"tenant": ctx.tenant,
"correlation_id": str(ctx.correlation_id),
}


@pytest.fixture
def app():
return Service(FooResource()).create_app(
title="test",
description="testing",
hostname="testserver",
access_logger_gateway=InMemoryGateway([]),
)


@pytest.fixture
def client(app):
return TestClient(app)


def test_default_context(app, client: TestClient):
response = client.get(app.url_path_for("v1/context"))

assert response.status_code == HTTPStatus.OK

body = response.json()

assert body["path"] == "http://testserver/v1/context"
assert body["user"] == {"id": "DEV", "name": "dev"}
assert body["tenant"] is None
UUID(body["correlation_id"]) # randomly generated uuid

assert ctx.correlation_id is None


def test_x_correlation_id_header(app, client: TestClient):
uid = str(uuid4())
response = client.get(
app.url_path_for("v1/context"),
headers={"X-Correlation-Id": uid},
)

assert response.status_code == HTTPStatus.OK

body = response.json()

assert body["correlation_id"] == uid

assert ctx.correlation_id is None
File renamed without changes.
Loading

0 comments on commit 8815a2a

Please sign in to comment.