Skip to content

Commit

Permalink
Merge pull request #902 from Aiven-Open/nosahama/prometheus-metrics
Browse files Browse the repository at this point in the history
feature, observability: Added Prometheus Metrics & Instrumentation
  • Loading branch information
eliax1996 authored Jun 19, 2024
2 parents eca0090 + addaaa2 commit 370ee46
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 4 deletions.
7 changes: 7 additions & 0 deletions container/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,10 @@ services:
KARAPACE_REGISTRY_PORT: 8081
KARAPACE_ADMIN_METADATA_MAX_AGE: 0
KARAPACE_LOG_LEVEL: WARNING

prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- 9090:9090
18 changes: 18 additions & 0 deletions container/prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
global:
scrape_interval: 10s # How frequently to scrape targets by default.
scrape_timeout: 5s # How long until a scrape request times out.
evaluation_interval: 60s # How frequently to evaluate rules.

# A scrape configuration
scrape_configs:
- job_name: karapace-registry
metrics_path: /metrics
static_configs:
- targets:
- karapace-registry:8081

- job_name: karapace-rest
metrics_path: /metrics
static_configs:
- targets:
- karapace-rest:8082
5 changes: 3 additions & 2 deletions karapace/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async def get(
headers: Optional[Headers] = None,
auth: Optional[BasicAuth] = None,
params: Optional[Mapping[str, str]] = None,
json_response: bool = True,
) -> Result:
path = self.path_for(path)
if not headers:
Expand All @@ -105,8 +106,8 @@ async def get(
params=params,
) as res:
# required for forcing the response body conversion to json despite missing valid Accept headers
json_result = await res.json(content_type=None)
return Result(res.status, json_result, headers=res.headers)
result = await res.json(content_type=None) if json_response else await res.text()
return Result(res.status, result, headers=res.headers)

async def delete(
self,
Expand Down
Empty file.
104 changes: 104 additions & 0 deletions karapace/instrumentation/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
karapace - prometheus instrumentation
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
# mypy: disable-error-code="call-overload"

from __future__ import annotations

from aiohttp.web import middleware, Request, Response
from karapace.rapu import RestApp
from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest, Histogram
from typing import Awaitable, Callable, Final

import logging
import time

LOG = logging.getLogger(__name__)


class PrometheusInstrumentation:
METRICS_ENDPOINT_PATH: Final[str] = "/metrics"
START_TIME_REQUEST_KEY: Final[str] = "start_time"

registry: Final[CollectorRegistry] = CollectorRegistry()

karapace_http_requests_total: Final[Counter] = Counter(
registry=registry,
name="karapace_http_requests_total",
documentation="Total Request Count for HTTP/TCP Protocol",
labelnames=("method", "path", "status"),
)

karapace_http_requests_duration_seconds: Final[Histogram] = Histogram(
registry=registry,
name="karapace_http_requests_duration_seconds",
documentation="Request Duration for HTTP/TCP Protocol",
labelnames=("method", "path"),
)

karapace_http_requests_in_progress: Final[Gauge] = Gauge(
registry=registry,
name="karapace_http_requests_in_progress",
documentation="In-progress requests for HTTP/TCP Protocol",
labelnames=("method", "path"),
)

@classmethod
def setup_metrics(cls, *, app: RestApp) -> None:
LOG.info("Setting up prometheus metrics")
app.route(
cls.METRICS_ENDPOINT_PATH,
callback=cls.serve_metrics,
method="GET",
schema_request=False,
with_request=False,
json_body=False,
auth=None,
)
app.app.middlewares.insert(0, cls.http_request_metrics_middleware) # type: ignore[arg-type]

# disable-error-code="call-overload" is used at the top of this file to allow mypy checks.
# the issue is in the type difference (Counter, Gauge, etc) of the arguments which we are
# passing to `__setitem__()`, but we need to keep these objects in the `app.app` dict.
app.app[cls.karapace_http_requests_total] = cls.karapace_http_requests_total
app.app[cls.karapace_http_requests_duration_seconds] = cls.karapace_http_requests_duration_seconds
app.app[cls.karapace_http_requests_in_progress] = cls.karapace_http_requests_in_progress

@classmethod
async def serve_metrics(cls) -> bytes:
return generate_latest(cls.registry)

@classmethod
@middleware
async def http_request_metrics_middleware(
cls,
request: Request,
handler: Callable[[Request], Awaitable[Response]],
) -> Response:
request[cls.START_TIME_REQUEST_KEY] = time.time()

# Extract request labels
path = request.path
method = request.method

# Increment requests in progress before handler
request.app[cls.karapace_http_requests_in_progress].labels(method, path).inc()

# Call request handler
response: Response = await handler(request)

# Instrument request duration
request.app[cls.karapace_http_requests_duration_seconds].labels(method, path).observe(
time.time() - request[cls.START_TIME_REQUEST_KEY]
)

# Instrument total requests
request.app[cls.karapace_http_requests_total].labels(method, path, response.status).inc()

# Decrement requests in progress after handler
request.app[cls.karapace_http_requests_in_progress].labels(method, path).dec()

return response
5 changes: 3 additions & 2 deletions karapace/karapace_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from contextlib import closing
from karapace import version as karapace_version
from karapace.config import read_config
from karapace.instrumentation.prometheus import PrometheusInstrumentation
from karapace.kafka_rest_apis import KafkaRest
from karapace.rapu import RestApp
from karapace.schema_registry_apis import KarapaceSchemaRegistryController
Expand Down Expand Up @@ -62,8 +63,8 @@ def main() -> int:
logging.log(logging.DEBUG, "Config %r", config_without_secrets)

try:
# `close` will be called by the callback `close_by_app` set by `KarapaceBase`
app.run()
PrometheusInstrumentation.setup_metrics(app=app)
app.run() # `close` will be called by the callback `close_by_app` set by `KarapaceBase`
except Exception as ex: # pylint: disable-broad-except
app.stats.unexpected_exception(ex=ex, where="karapace")
raise
Expand Down
2 changes: 2 additions & 0 deletions requirements/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ pkgutil-resolve-name==1.3.10
# jsonschema
pluggy==1.5.0
# via pytest
prometheus-client==0.20.0
# via -r requirements.txt
protobuf==3.20.3
# via -r requirements.txt
psutil==5.9.8
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ ujson<6
watchfiles<1
xxhash~=3.3
zstandard
prometheus-client==0.20.0

# Patched dependencies
#
Expand Down
2 changes: 2 additions & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ packaging==24.0
# via aiokafka
pkgutil-resolve-name==1.3.10
# via jsonschema
prometheus-client==0.20.0
# via -r requirements.in
protobuf==3.20.3
# via -r requirements.in
pygments==2.18.0
Expand Down
Empty file.
30 changes: 30 additions & 0 deletions tests/integration/instrumentation/test_prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
karapace - prometheus instrumentation tests
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""

from http import HTTPStatus
from karapace.client import Client, Result
from karapace.instrumentation.prometheus import PrometheusInstrumentation
from prometheus_client.parser import text_string_to_metric_families


async def test_metrics_endpoint(registry_async_client: Client) -> None:
result: Result = await registry_async_client.get(
PrometheusInstrumentation.METRICS_ENDPOINT_PATH,
json_response=False,
)
assert result.status_code == HTTPStatus.OK.value


async def test_metrics_endpoint_parsed_response(registry_async_client: Client) -> None:
result: Result = await registry_async_client.get(
PrometheusInstrumentation.METRICS_ENDPOINT_PATH,
json_response=False,
)
metrics = [family.name for family in text_string_to_metric_families(result.json_result)]
assert "karapace_http_requests" in metrics
assert "karapace_http_requests_duration_seconds" in metrics
assert "karapace_http_requests_in_progress" in metrics
120 changes: 120 additions & 0 deletions tests/unit/instrumentation/test_prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
karapace - prometheus instrumentation tests
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""

from _pytest.logging import LogCaptureFixture
from karapace.instrumentation.prometheus import PrometheusInstrumentation
from karapace.rapu import RestApp
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
from unittest.mock import AsyncMock, call, MagicMock, patch

import aiohttp.web
import logging
import pytest


class TestPrometheusInstrumentation:
@pytest.fixture
def prometheus(self) -> PrometheusInstrumentation:
return PrometheusInstrumentation()

def test_constants(self, prometheus: PrometheusInstrumentation) -> None:
assert prometheus.START_TIME_REQUEST_KEY == "start_time"
assert isinstance(prometheus.registry, CollectorRegistry)

def test_metric_types(self, prometheus: PrometheusInstrumentation) -> None:
assert isinstance(prometheus.karapace_http_requests_total, Counter)
assert isinstance(prometheus.karapace_http_requests_duration_seconds, Histogram)
assert isinstance(prometheus.karapace_http_requests_in_progress, Gauge)

def test_metric_values(self, prometheus: PrometheusInstrumentation) -> None:
# `_total` suffix is stripped off the metric name for `Counters`, but needed for clarity.
assert repr(prometheus.karapace_http_requests_total) == "prometheus_client.metrics.Counter(karapace_http_requests)"
assert (
repr(prometheus.karapace_http_requests_duration_seconds)
== "prometheus_client.metrics.Histogram(karapace_http_requests_duration_seconds)"
)
assert (
repr(prometheus.karapace_http_requests_in_progress)
== "prometheus_client.metrics.Gauge(karapace_http_requests_in_progress)"
)

def test_setup_metrics(self, caplog: LogCaptureFixture, prometheus: PrometheusInstrumentation) -> None:
app = AsyncMock(spec=RestApp, app=AsyncMock(spec=aiohttp.web.Application))

with caplog.at_level(logging.INFO, logger="karapace.instrumentation.prometheus"):
prometheus.setup_metrics(app=app)

app.route.assert_called_once_with(
prometheus.METRICS_ENDPOINT_PATH,
callback=prometheus.serve_metrics,
method="GET",
schema_request=False,
with_request=False,
json_body=False,
auth=None,
)
app.app.middlewares.insert.assert_called_once_with(0, prometheus.http_request_metrics_middleware)
app.app.__setitem__.assert_has_calls(
[
call(prometheus.karapace_http_requests_total, prometheus.karapace_http_requests_total),
call(
prometheus.karapace_http_requests_duration_seconds,
prometheus.karapace_http_requests_duration_seconds,
),
call(prometheus.karapace_http_requests_in_progress, prometheus.karapace_http_requests_in_progress),
]
)
for log in caplog.records:
assert log.name == "karapace.instrumentation.prometheus"
assert log.levelname == "INFO"
assert log.message == "Setting up prometheus metrics"

@patch("karapace.instrumentation.prometheus.generate_latest")
async def test_serve_metrics(self, generate_latest: MagicMock, prometheus: PrometheusInstrumentation) -> None:
await prometheus.serve_metrics()
generate_latest.assert_called_once_with(prometheus.registry)

@patch("karapace.instrumentation.prometheus.time")
async def test_http_request_metrics_middleware(
self,
mock_time: MagicMock,
prometheus: PrometheusInstrumentation,
) -> None:
mock_time.time.return_value = 10
request = AsyncMock(
spec=aiohttp.web.Request, app=AsyncMock(spec=aiohttp.web.Application), path="/path", method="GET"
)
handler = AsyncMock(spec=aiohttp.web.RequestHandler, return_value=MagicMock(status=200))

await prometheus.http_request_metrics_middleware(request=request, handler=handler)

assert handler.assert_awaited_once # extra assert is to ignore pylint [pointless-statement]
request.__setitem__.assert_called_once_with(prometheus.START_TIME_REQUEST_KEY, 10)
request.app[prometheus.karapace_http_requests_in_progress].labels.assert_has_calls(
[
call("GET", "/path"),
call().inc(),
]
)
request.app[prometheus.karapace_http_requests_duration_seconds].labels.assert_has_calls(
[
call("GET", "/path"),
call().observe(request.__getitem__.return_value.__rsub__.return_value),
]
)
request.app[prometheus.karapace_http_requests_total].labels.assert_has_calls(
[
call("GET", "/path", 200),
call().inc(),
]
)
request.app[prometheus.karapace_http_requests_in_progress].labels.assert_has_calls(
[
call("GET", "/path"),
call().dec(),
]
)

0 comments on commit 370ee46

Please sign in to comment.