Skip to content

Commit

Permalink
Make Orchestrator metrics singleton
Browse files Browse the repository at this point in the history
So that even if applications instantiate multiple Orchestrators,
"megaservice_*" metrics collect data from all of them.

Another option would be to add arguments for passing Orchestrator
instance names as metric prefixes, to name and differentiate metrics
for each Orchestrator instance.

However, that would have needed changes in 3 OPEA projects instead of
just this one, and dashboards would then need to hard-code those
per-application prefixes.

Signed-off-by: Eero Tamminen <[email protected]>
  • Loading branch information
eero-t committed Feb 18, 2025
1 parent 7727235 commit 929a652
Showing 1 changed file with 21 additions and 17 deletions.
38 changes: 21 additions & 17 deletions comps/cores/mega/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,7 @@


class OrchestratorMetrics:
# Need an static class-level ID for metric prefix because:
# - Prometheus requires metrics (their names) to be unique
_instance_id = 0

def __init__(self) -> None:
OrchestratorMetrics._instance_id += 1
if OrchestratorMetrics._instance_id > 1:
self._prefix = f"megaservice{self._instance_id}"
else:
self._prefix = "megaservice"

self.request_pending = Gauge(f"{self._prefix}_request_pending", "Count of currently pending requests (gauge)")

# locking for latency metric creation / method change
self._lock = threading.Lock()

Expand All @@ -50,20 +38,22 @@ def __init__(self) -> None:
self.first_token_latency = None
self.inter_token_latency = None
self.request_latency = None
self.request_pending = None

# initial methods to create the metrics
self.token_update = self._token_update_create
self.request_update = self._request_update_create
self.pending_update = self._pending_update_create

def _token_update_create(self, token_start: float, is_first: bool) -> float:
with self._lock:
# in case another thread already got here
if self.token_update == self._token_update_create:
self.first_token_latency = Histogram(
f"{self._prefix}_first_token_latency", "First token latency (histogram)"
"megaservice_first_token_latency", "First token latency (histogram)"
)
self.inter_token_latency = Histogram(
f"{self._prefix}_inter_token_latency", "Inter-token latency (histogram)"
"megaservice_inter_token_latency", "Inter-token latency (histogram)"
)
self.token_update = self._token_update_real
return self.token_update(token_start, is_first)
Expand All @@ -73,11 +63,21 @@ def _request_update_create(self, req_start: float) -> None:
# in case another thread already got here
if self.request_update == self._request_update_create:
self.request_latency = Histogram(
f"{self._prefix}_request_latency", "Whole LLM request/reply latency (histogram)"
"megaservice_request_latency", "Whole LLM request/reply latency (histogram)"
)
self.request_update = self._request_update_real
self.request_update(req_start)

def _pending_update_create(self, increase: bool) -> None:
with self._lock:
# in case another thread already got here
if self.pending_update == self._pending_update_create:
self.request_pending = Gauge(
"megaservice_request_pending", "Count of currently pending requests (gauge)"
)
self.pending_update = self._pending_update_real
self.pending_update(increase)

def _token_update_real(self, token_start: float, is_first: bool) -> float:
now = time.time()
if is_first:
Expand All @@ -89,18 +89,22 @@ def _token_update_real(self, token_start: float, is_first: bool) -> float:
def _request_update_real(self, req_start: float) -> None:
self.request_latency.observe(time.time() - req_start)

def pending_update(self, increase: bool) -> None:
def _pending_update_real(self, increase: bool) -> None:
if increase:
self.request_pending.inc()
else:
self.request_pending.dec()


# Prometheus metrics need to be singletons, not per Orchestrator
_metrics = OrchestratorMetrics()


class ServiceOrchestrator(DAG):
"""Manage 1 or N micro services in a DAG through Python API."""

def __init__(self) -> None:
self.metrics = OrchestratorMetrics()
self.metrics = _metrics
self.services = {} # all services, id -> service
super().__init__()

Expand Down

0 comments on commit 929a652

Please sign in to comment.