Skip to content

Commit

Permalink
Add default prometheus metrics to clients (#652)
Browse files Browse the repository at this point in the history
  • Loading branch information
krav authored Apr 27, 2022
1 parent 297cc85 commit f6da834
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 10 deletions.
31 changes: 29 additions & 2 deletions baseplate/clients/memcache/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Tuple
from typing import Union

from prometheus_client import Gauge
from pymemcache.client.base import PooledClient

from baseplate import Span
Expand Down Expand Up @@ -112,7 +113,7 @@ def parse(self, key_path: str, raw_config: config.RawConfig) -> "MemcacheContext
serializer=self.serializer,
deserializer=self.deserializer,
)
return MemcacheContextFactory(pool)
return MemcacheContextFactory(pool, key_path)


class MemcacheContextFactory(ContextFactory):
Expand All @@ -129,9 +130,35 @@ class MemcacheContextFactory(ContextFactory):
"""

def __init__(self, pooled_client: PooledClient):
PROM_PREFIX = "bp_memcached_pool"
PROM_LABELS = ["pool"]

pool_size_gauge = Gauge(
f"{PROM_PREFIX}_max_size",
"Maximum number of connections allowed in this pool",
PROM_LABELS,
)

used_connections_gauge = Gauge(
f"{PROM_PREFIX}_active_connections",
"Number of connections in this pool currently in use",
PROM_LABELS,
)

free_connections_gauge = Gauge(
f"{PROM_PREFIX}_free_connections",
"Number of free connections in this pool",
PROM_LABELS,
)

def __init__(self, pooled_client: PooledClient, name: str = "default"):
self.pooled_client = pooled_client

pool = self.pooled_client.client_pool
self.pool_size_gauge.labels(name).set_function(lambda: pool.max_size)
self.free_connections_gauge.labels(name).set_function(lambda: len(pool.free))
self.used_connections_gauge.labels(name).set_function(lambda: len(pool.used))

def report_memcache_runtime_metrics(self, batch: metrics.Client) -> None:
pool = self.pooled_client.client_pool
batch.gauge("pool.in_use").replace(len(pool.used))
Expand Down
32 changes: 30 additions & 2 deletions baseplate/clients/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import redis

from prometheus_client import Gauge

# redis.client.StrictPipeline was renamed to redis.client.Pipeline in version 3.0
try:
from redis.client import StrictPipeline as Pipeline # type: ignore
Expand Down Expand Up @@ -75,7 +77,7 @@ def __init__(self, **kwargs: Any):

def parse(self, key_path: str, raw_config: config.RawConfig) -> "RedisContextFactory":
connection_pool = pool_from_config(raw_config, f"{key_path}.", **self.kwargs)
return RedisContextFactory(connection_pool)
return RedisContextFactory(connection_pool, key_path)


class RedisContextFactory(ContextFactory):
Expand All @@ -92,9 +94,35 @@ class RedisContextFactory(ContextFactory):
"""

def __init__(self, connection_pool: redis.ConnectionPool):
PROM_PREFIX = "bp_redis_pool"
PROM_LABELS = ["pool"]

max_connections = Gauge(
f"{PROM_PREFIX}_max_size",
"Maximum number of connections allowed in this redisbp pool",
PROM_LABELS,
)
idle_connections = Gauge(
f"{PROM_PREFIX}_idle_connections",
"Number of idle connections in this redisbp pool",
PROM_LABELS,
)
open_connections = Gauge(
f"{PROM_PREFIX}_active_connections",
"Number of open connections in this redisbp pool",
PROM_LABELS,
)

def __init__(self, connection_pool: redis.ConnectionPool, name: str = "redis"):
self.connection_pool = connection_pool

if isinstance(connection_pool, redis.BlockingConnectionPool):
self.max_connections.labels(name).set_function(lambda: connection_pool.max_connections)
self.idle_connections.labels(name).set_function(connection_pool.pool.qsize)
self.open_connections.labels(name).set_function(
lambda: len(connection_pool._connections) # type: ignore
)

def report_runtime_metrics(self, batch: metrics.Client) -> None:
if not isinstance(self.connection_pool, redis.BlockingConnectionPool):
return
Expand Down
29 changes: 27 additions & 2 deletions baseplate/clients/redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import rediscluster

from prometheus_client import Gauge
from rediscluster.pipeline import ClusterPipeline

from baseplate import Span
Expand Down Expand Up @@ -331,7 +332,7 @@ def __init__(self, **kwargs: Any):

def parse(self, key_path: str, raw_config: config.RawConfig) -> "ClusterRedisContextFactory":
connection_pool = cluster_pool_from_config(raw_config, f"{key_path}.", **self.kwargs)
return ClusterRedisContextFactory(connection_pool)
return ClusterRedisContextFactory(connection_pool, key_path)


class ClusterRedisContextFactory(ContextFactory):
Expand All @@ -346,9 +347,33 @@ class ClusterRedisContextFactory(ContextFactory):
:param connection_pool: A connection pool.
"""

def __init__(self, connection_pool: rediscluster.ClusterConnectionPool):
PROM_PREFIX = "bp_redis_cluster_pool"
PROM_LABELS = ["pool"]

max_connections_gauge = Gauge(
f"{PROM_PREFIX}_max_size",
"Maximum number of connections allowed in this redis cluster pool",
PROM_LABELS,
)
open_connections_gauge = Gauge(
f"{PROM_PREFIX}_open_connections",
"Number of open connections in this redis cluster pool",
PROM_LABELS,
)

def __init__(
self, connection_pool: rediscluster.ClusterConnectionPool, name: str = "redis_cluster"
):
self.connection_pool = connection_pool

if isinstance(connection_pool, rediscluster.ClusterBlockingConnectionPool):
self.max_connections_gauge.labels(name).set_function(
lambda: connection_pool.max_connections
)
self.open_connections_gauge.labels(name).set_function(
lambda: len(connection_pool._connections)
)

def report_runtime_metrics(self, batch: metrics.Client) -> None:
if not isinstance(self.connection_pool, rediscluster.ClusterBlockingConnectionPool):
return
Expand Down
39 changes: 37 additions & 2 deletions baseplate/clients/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Tuple
from typing import Union

from prometheus_client import Gauge
from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy.engine import Connection
Expand Down Expand Up @@ -125,7 +126,7 @@ def parse(
engine = engine_from_config(
raw_config, secrets=self.secrets, prefix=f"{key_path}.", **self.kwargs
)
return SQLAlchemySessionContextFactory(engine)
return SQLAlchemySessionContextFactory(engine, key_path)


Parameters = Optional[Union[Dict[str, Any], Sequence[Any]]]
Expand Down Expand Up @@ -155,12 +156,46 @@ class SQLAlchemyEngineContextFactory(ContextFactory):
"""

def __init__(self, engine: Engine):
PROM_PREFIX = "bp_sqlalchemy_pool"
PROM_LABELS = ["pool"]

max_connections_gauge = Gauge(
f"{PROM_PREFIX}_max_size",
"Maximum number of connections allowed in this pool",
PROM_LABELS,
)

checked_in_connections_gauge = Gauge(
f"{PROM_PREFIX}_idle_connections",
"Number of available, checked in, connections in this pool",
PROM_LABELS,
)

checked_out_connections_gauge = Gauge(
f"{PROM_PREFIX}_active_connections",
"Number of connections in use, or checked out, in this pool",
PROM_LABELS,
)

overflow_connections_gauge = Gauge(
f"{PROM_PREFIX}_overflow_connections",
"Number of connections over the desired size of this pool",
PROM_LABELS,
)

def __init__(self, engine: Engine, name: str = "sqlalchemy"):
self.engine = engine.execution_options()
event.listen(self.engine, "before_cursor_execute", self.on_before_execute, retval=True)
event.listen(self.engine, "after_cursor_execute", self.on_after_execute)
event.listen(self.engine, "handle_error", self.on_error)

pool = self.engine.pool
if isinstance(pool, QueuePool):
self.max_connections_gauge.labels(name).set_function(pool.size)
self.checked_in_connections_gauge.labels(name).set_function(pool.checkedin)
self.checked_out_connections_gauge.labels(name).set_function(pool.checkedout)
self.overflow_connections_gauge.labels(name).set_function(pool.overflow)

def report_runtime_metrics(self, batch: metrics.Client) -> None:
pool = self.engine.pool
if not isinstance(pool, QueuePool):
Expand Down
20 changes: 20 additions & 0 deletions baseplate/clients/thrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Iterator
from typing import Optional

from prometheus_client import Gauge
from thrift.protocol.TProtocol import TProtocolException
from thrift.Thrift import TApplicationException
from thrift.Thrift import TException
Expand Down Expand Up @@ -70,6 +71,21 @@ class ThriftContextFactory(ContextFactory):
"""

PROM_PREFIX = "bp_thrift_pool"
PROM_LABELS = ["client_cls"]

max_connections_gauge = Gauge(
f"{PROM_PREFIX}_max_size",
"Maximum number of connections in this thrift pool before blocking",
PROM_LABELS,
)

active_connections_gauge = Gauge(
f"{PROM_PREFIX}_active_connections",
"Number of connections currently in use in this thrift pool",
PROM_LABELS,
)

def __init__(self, pool: ThriftConnectionPool, client_cls: Any):
self.pool = pool
self.client_cls = client_cls
Expand All @@ -83,6 +99,10 @@ def __init__(self, pool: ThriftConnectionPool, client_cls: Any):
},
)

pool_name = type(self.client_cls).__name__
self.max_connections_gauge.labels(pool_name).set_function(lambda: self.pool.size)
self.active_connections_gauge.labels(pool_name).set_function(lambda: self.pool.checkedout)

def report_runtime_metrics(self, batch: metrics.Client) -> None:
batch.gauge("pool.size").replace(self.pool.size)
batch.gauge("pool.in_use").replace(self.pool.checkedout)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"requests>=2.21.0,<3.0",
"thrift-unofficial>=0.14.1,<1.0",
"gevent>=20.5.0",
"prometheus-client>=0.12.0",
],
extras_require=extras_require,
scripts=[
Expand Down
13 changes: 12 additions & 1 deletion tests/unit/clients/memcache_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
del pymemcache

from baseplate.lib.config import ConfigurationError
from baseplate.clients.memcache import pool_from_config
from baseplate.clients.memcache import pool_from_config, MemcacheContextFactory
from baseplate.clients.memcache import lib as memcache_lib


Expand Down Expand Up @@ -54,6 +54,17 @@ def test_nodelay(self):
)
self.assertEqual(pool.no_delay, False)

def test_metrics(self):
max_pool_size = "123"
ctx = MemcacheContextFactory(
pool_from_config(
{"memcache.endpoint": "localhost:1234", "memcache.max_pool_size": max_pool_size}
)
)
metric = ctx.pool_size_gauge.collect()[0]
sample = [sample for sample in metric.samples if sample.labels["pool"] == "default"][0]
self.assertEqual(sample.value, float(max_pool_size))


class SerdeTests(unittest.TestCase):
def test_serialize_str(self):
Expand Down
12 changes: 11 additions & 1 deletion tests/unit/clients/redis_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
del redis

from baseplate.lib.config import ConfigurationError
from baseplate.clients.redis import pool_from_config
from baseplate.clients.redis import pool_from_config, RedisContextFactory


class PoolFromConfigTests(unittest.TestCase):
Expand All @@ -23,6 +23,16 @@ def test_basic_url(self):
self.assertEqual(pool.connection_kwargs["port"], 1234)
self.assertEqual(pool.connection_kwargs["db"], 0)

def test_metrics(self):
max_connections = "123"
ctx = RedisContextFactory(
pool_from_config(
{"redis.url": "redis://localhost:1234/0", "redis.max_connections": max_connections}
)
)
metric = ctx.max_connections.collect()
self.assertEqual(metric[0].samples[0].value, float(max_connections))

def test_timeouts(self):
pool = pool_from_config(
{
Expand Down

0 comments on commit f6da834

Please sign in to comment.