forked from Sunbird-AIAssistant/sakhi-api-service
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtelemetry_middleware.py
62 lines (52 loc) · 2.16 KB
/
telemetry_middleware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import time
import json
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import Message
from fastapi import Request
from logger import logger
from utils import get_from_env_or_config
from telemetry_logger import TelemetryLogger
# https://github.com/tiangolo/fastapi/issues/394
# Stream response does not work => https://github.com/tiangolo/fastapi/issues/394#issuecomment-994665859
async def set_body(request: Request, body: bytes):
async def receive() -> Message:
return {"type": "http.request", "body": body}
request._receive = receive
async def get_body(request: Request) -> bytes:
body = await request.body()
await set_body(request, body)
return body
telemetryLogger = TelemetryLogger()
telemetry_log_enabled = get_from_env_or_config('telemetry', 'telemetry_log_enabled', None).lower() == "true"
class TelemetryMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app
):
super().__init__(app)
async def dispatch(self, request: Request, call_next):
start_time = time.time()
await set_body(request, await request.body())
body = await get_body(request)
if body.decode("utf-8"):
body = json.loads(body)
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
if "v1" in str(request.url):
event: dict = {
"status_code": response.status_code,
"duration": round(process_time * 1000),
"body": body,
"method": request.method,
"url": request.url
}
event.update(request.headers)
logger.info({"label": "api_call", "event": event})
if telemetry_log_enabled:
if response.status_code == 200:
event = telemetryLogger.prepare_log_event(eventInput=event, message="success")
else:
event = telemetryLogger.prepare_log_event(eventInput=event, elevel="ERROR", message="failed")
telemetryLogger.add_event(event)
return response